SignanlProxy does no longer try to write to closed devices. Also closed devices can...
[quassel.git] / src / common / signalproxy.h
index 7ffb73b..755461e 100644 (file)
  *   59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.             *
  ***************************************************************************/
 
-#ifndef _SIGNALPROXY_H_
-#define _SIGNALPROXY_H_
+#ifndef SIGNALPROXY_H
+#define SIGNALPROXY_H
 
+#include <QEvent>
 #include <QList>
 #include <QHash>
 #include <QVariant>
@@ -37,6 +38,9 @@ struct QMetaObject;
 class SignalProxy : public QObject {
   Q_OBJECT
 
+  class AbstractPeer;
+  class IODevicePeer;
+
 public:
   enum ProxyMode {
     Server,
@@ -48,7 +52,18 @@ public:
     RpcCall,
     InitRequest,
     InitData,
-    HeartBeat
+    HeartBeat,
+    HeartBeatReply
+  };
+
+  enum ClientConnectionType {
+    SignalProxyConnection,
+    IODeviceConnection
+  };
+
+  enum CustomEvents {
+    PeerSignal = QEvent::User,
+    RemovePeer
   };
 
   SignalProxy(QObject *parent);
@@ -57,18 +72,20 @@ public:
   virtual ~SignalProxy();
 
   void setProxyMode(ProxyMode mode);
-  ProxyMode proxyMode() const;
+  inline ProxyMode proxyMode() const { return _proxyMode; }
 
   bool addPeer(QIODevice *iodev);
-  void removePeer(QIODevice *iodev = 0);
-
+  bool addPeer(SignalProxy *proxy);
+  void removePeer(QObject *peer);
+  void removeAllPeers();
+  
   bool attachSignal(QObject *sender, const char *signal, const QByteArray& sigName = QByteArray());
   bool attachSlot(const QByteArray& sigName, QObject *recv, const char *slot);
 
   void synchronize(SyncableObject *obj);
 
-  void setInitialized(SyncableObject *obj);
-  bool isInitialized(SyncableObject *obj) const;
+//   void setInitialized(SyncableObject *obj);
+//   bool isInitialized(SyncableObject *obj) const;
   void requestInit(SyncableObject *obj);
 
   void detachObject(QObject *obj);
@@ -113,6 +130,9 @@ public:
 
   void dumpProxyStats();
   
+protected:
+  void customEvent(QEvent *event);
+
 private slots:
   void dataAvailable();
   void detachSender();
@@ -120,14 +140,18 @@ private slots:
   void objectRenamed(const QString &newname, const QString &oldname);
   void objectRenamed(const QByteArray &classname, const QString &newname, const QString &oldname);
   void sendHeartBeat();
-
+  void receiveHeartBeat(AbstractPeer *peer, const QVariantList &params);
+  void receiveHeartBeatReply(AbstractPeer *peer, const QVariantList &params);
+  
 signals:
-  void peerRemoved(QIODevice *obj);
+  void peerRemoved(QIODevice *dev);
   void connected();
   void disconnected();
   void objectInitialized(SyncableObject *);
+  void lagUpdated(int lag);
   
 private:
+  void init();
   void initServer();
   void initClient();
   
@@ -145,12 +169,14 @@ private:
 
   void dispatchSignal(QIODevice *receiver, const RequestType &requestType, const QVariantList &params);
   void dispatchSignal(const RequestType &requestType, const QVariantList &params);
-  
-  void receivePeerSignal(QIODevice *sender, const QVariant &packedFunc);
-  void handleSync(QIODevice *sender, QVariantList params);
-  void handleInitRequest(QIODevice *sender, const QVariantList &params);
-  void handleInitData(QIODevice *sender, const QVariantList &params);
-  void handleSignal(const QByteArray &funcName, const QVariantList &params);
+
+  void receivePackedFunc(AbstractPeer *sender, const QVariant &packedFunc);
+  void receivePeerSignal(AbstractPeer *sender, const RequestType &requestType, const QVariantList &params);
+  void receivePeerSignal(SignalProxy *sender, const RequestType &requestType, const QVariantList &params);
+  void handleSync(AbstractPeer *sender, QVariantList params);
+  void handleInitRequest(AbstractPeer *sender, const QVariantList &params);
+  void handleInitData(AbstractPeer *sender, const QVariantList &params);
+  void handleSignal(const QVariantList &data);
 
   bool invokeSlot(QObject *receiver, int methodId, const QVariantList &params, QVariant &returnValue);
   bool invokeSlot(QObject *receiver, int methodId, const QVariantList &params = QVariantList());
@@ -158,18 +184,62 @@ private:
   QVariantMap initData(SyncableObject *obj) const;
   void setInitData(SyncableObject *obj, const QVariantMap &properties);
 
+  void updateLag(IODevicePeer *peer, int lag);
+
 public:
   void dumpSyncMap(SyncableObject *object);
+  inline int peerCount() const { return _peers.size(); }
   
 private:
-  // Hash of used QIODevices
-  struct peerInfo {
+  static void disconnectDevice(QIODevice *dev, const QString &reason = QString());
+
+  class AbstractPeer {
+  public:
+    enum PeerType {
+      NotAPeer = 0,
+      IODevicePeer = 1,
+      SignalProxyPeer = 2
+    };
+    AbstractPeer() : _type(NotAPeer) {}
+    AbstractPeer(PeerType type) : _type(type) {}
+    virtual ~AbstractPeer() {}
+    inline PeerType type() const { return _type; }
+    virtual void dispatchSignal(const RequestType &requestType, const QVariantList &params) = 0;
+  private:
+    PeerType _type;
+  };
+
+  class IODevicePeer : public AbstractPeer {
+  public:
+    IODevicePeer(QIODevice *device, bool compress) : AbstractPeer(AbstractPeer::IODevicePeer), _device(device), byteCount(0), usesCompression(compress), sentHeartBeats(0), lag(0) {}
+    virtual void dispatchSignal(const RequestType &requestType, const QVariantList &params);
+    inline void dispatchPackedFunc(const QVariant &packedFunc) { SignalProxy::writeDataToDevice(_device, packedFunc, usesCompression); }
+    QString address() const;
+    inline bool isOpen() const { return _device->isOpen(); }
+    inline void close() const { _device->close(); }
+    inline bool readData(QVariant &item) { return SignalProxy::readDataFromDevice(_device, byteCount, item, usesCompression); }
+  private:
+    QIODevice *_device;
     quint32 byteCount;
     bool usesCompression;
-    peerInfo() : byteCount(0), usesCompression(false) {};
+  public:
+    int sentHeartBeats;
+    int lag;
+  };
+
+  class SignalProxyPeer : public AbstractPeer {
+  public:
+    SignalProxyPeer(SignalProxy *sender, SignalProxy *receiver) : AbstractPeer(AbstractPeer::SignalProxyPeer), sender(sender), receiver(receiver) {}
+    virtual void dispatchSignal(const RequestType &requestType, const QVariantList &params);
+  private:
+    SignalProxy *sender;
+    SignalProxy *receiver;
   };
-  //QHash<QIODevice*, peerInfo> _peerByteCount;
-  QHash<QIODevice*, peerInfo> _peers;
+
+  // a Hash of the actual used communication object to it's corresponding peer
+  // currently a communication object can either be an arbitrary QIODevice or another SignalProxy
+  typedef QHash<QObject *, AbstractPeer *> PeerHash;
+  PeerHash _peers;
 
   // containg a list of argtypes for fast access
   QHash<const QMetaObject *, ClassInfo*> _classInfo;