Make SignalProxy heartbeat configurable
authorManuel Nickschas <sputnick@quassel-irc.org>
Mon, 7 Dec 2009 18:09:46 +0000 (19:09 +0100)
committerManuel Nickschas <sputnick@quassel-irc.org>
Mon, 7 Dec 2009 21:42:54 +0000 (22:42 +0100)
This adds the ability to set both the interval and maximum for sigproxy's heartbeat.
Also my IDE cleaned up a ton of whitespace :)

src/common/signalproxy.cpp
src/common/signalproxy.h

index 17a44c4..b3410e6 100644 (file)
@@ -120,7 +120,7 @@ void SignalProxy::SignalRelay::detachSignal(QObject *sender, int signalId) {
     if(slotIter->sender == sender && (signalId == -1 || slotIter->signalId == signalId)) {
       slotIter = _slots.erase(slotIter);
       if(signalId != -1)
     if(slotIter->sender == sender && (signalId == -1 || slotIter->signalId == signalId)) {
       slotIter = _slots.erase(slotIter);
       if(signalId != -1)
-       break;
+        break;
     } else {
       slotIter++;
     }
     } else {
       slotIter++;
     }
@@ -146,12 +146,12 @@ int SignalProxy::SignalRelay::qt_metacall(QMetaObject::Call _c, int _id, void **
 
       const QList<int> &argTypes = eMeta->argTypes(signal.signalId);
       for(int i = 0; i < argTypes.size(); i++) {
 
       const QList<int> &argTypes = eMeta->argTypes(signal.signalId);
       for(int i = 0; i < argTypes.size(); i++) {
-       if(argTypes[i] == 0) {
-         qWarning() << "SignalRelay::qt_metacall(): received invalid data for argument number" << i << "of signal" << QString("%1::%2").arg(caller->metaObject()->className()).arg(caller->metaObject()->method(_id).signature());
-         qWarning() << "                            - make sure all your data types are known by the Qt MetaSystem";
-         return _id;
-       }
-       params << QVariant(argTypes[i], _a[i+1]);
+        if(argTypes[i] == 0) {
+          qWarning() << "SignalRelay::qt_metacall(): received invalid data for argument number" << i << "of signal" << QString("%1::%2").arg(caller->metaObject()->className()).arg(caller->metaObject()->method(_id).signature());
+          qWarning() << "                            - make sure all your data types are known by the Qt MetaSystem";
+          return _id;
+        }
+        params << QVariant(argTypes[i], _a[i+1]);
       }
 
       proxy()->dispatchSignal(SignalProxy::RpcCall, params);
       }
 
       proxy()->dispatchSignal(SignalProxy::RpcCall, params);
@@ -167,7 +167,7 @@ int SignalProxy::SignalRelay::qt_metacall(QMetaObject::Call _c, int _id, void **
 void SignalProxy::IODevicePeer::dispatchSignal(const RequestType &requestType, const QVariantList &params) {
   QVariantList packedFunc;
   packedFunc << (qint16)requestType
 void SignalProxy::IODevicePeer::dispatchSignal(const RequestType &requestType, const QVariantList &params) {
   QVariantList packedFunc;
   packedFunc << (qint16)requestType
-            << params;
+             << params;
   dispatchPackedFunc(QVariant(packedFunc));
 }
 
   dispatchPackedFunc(QVariant(packedFunc));
 }
 
@@ -252,8 +252,8 @@ void SignalProxy::setProxyMode(ProxyMode mode) {
     if((*peer)->type() != AbstractPeer::IODevicePeer) {
       IODevicePeer *ioPeer = static_cast<IODevicePeer *>(*peer);
       if(ioPeer->isOpen()) {
     if((*peer)->type() != AbstractPeer::IODevicePeer) {
       IODevicePeer *ioPeer = static_cast<IODevicePeer *>(*peer);
       if(ioPeer->isOpen()) {
-       qWarning() << "SignalProxy: Cannot change proxy mode while connected";
-       return;
+        qWarning() << "SignalProxy: Cannot change proxy mode while connected";
+        return;
       }
     }
     if((*peer)->type() != AbstractPeer::SignalProxyPeer) {
       }
     }
     if((*peer)->type() != AbstractPeer::SignalProxyPeer) {
@@ -273,7 +273,8 @@ void SignalProxy::setProxyMode(ProxyMode mode) {
 void SignalProxy::init() {
   _signalRelay = new SignalRelay(this);
   connect(&_heartBeatTimer, SIGNAL(timeout()), this, SLOT(sendHeartBeat()));
 void SignalProxy::init() {
   _signalRelay = new SignalRelay(this);
   connect(&_heartBeatTimer, SIGNAL(timeout()), this, SLOT(sendHeartBeat()));
-  _heartBeatTimer.start(30 * 1000);
+  setHeartBeatInterval(30);
+  setMaxHeartBeatCount(2);
   _secure = false;
   updateSecureState();
 }
   _secure = false;
   updateSecureState();
 }
@@ -324,6 +325,23 @@ bool SignalProxy::addPeer(QIODevice* iodev) {
   return true;
 }
 
   return true;
 }
 
+void SignalProxy::setHeartBeatInterval(int secs) {
+  if(secs != _heartBeatInterval) {
+    _heartBeatInterval = secs;
+    _heartBeatTimer.setInterval(secs * 1000);
+  }
+}
+
+void SignalProxy::setMaxHeartBeatCount(int max) {
+  if(max < 0)
+    _heartBeatTimer.stop();
+  else {
+    _maxHeartBeatCount = max;
+    if(!_heartBeatTimer.isActive())
+      _heartBeatTimer.start();
+  }
+}
+
 bool SignalProxy::addPeer(SignalProxy* proxy) {
   if(!proxy)
     return false;
 bool SignalProxy::addPeer(SignalProxy* proxy) {
   if(!proxy)
     return false;
@@ -537,11 +555,11 @@ void SignalProxy::dispatchSignal(const RequestType &requestType, const QVariantL
     switch((*peer)->type()) {
     case AbstractPeer::IODevicePeer:
       {
     switch((*peer)->type()) {
     case AbstractPeer::IODevicePeer:
       {
-       IODevicePeer *ioPeer = static_cast<IODevicePeer *>(*peer);
-       if(ioPeer->isOpen())
-         ioPeer->dispatchPackedFunc(packedFunc);
-       else
-         QCoreApplication::postEvent(this, new RemovePeerEvent(peer.key()));
+        IODevicePeer *ioPeer = static_cast<IODevicePeer *>(*peer);
+        if(ioPeer->isOpen())
+          ioPeer->dispatchPackedFunc(packedFunc);
+        else
+          QCoreApplication::postEvent(this, new RemovePeerEvent(peer.key()));
       }
       break;
     case AbstractPeer::SignalProxyPeer:
       }
       break;
     case AbstractPeer::SignalProxyPeer:
@@ -634,7 +652,7 @@ void SignalProxy::handleSync(AbstractPeer *sender, QVariantList params) {
 
   if(!_syncSlave.contains(className) || !_syncSlave[className].contains(objectName)) {
     qWarning() << QString("no registered receiver for sync call: %1::%2 (objectName=\"%3\"). Params are:").arg(QString(className)).arg(QString(slot)).arg(objectName)
 
   if(!_syncSlave.contains(className) || !_syncSlave[className].contains(objectName)) {
     qWarning() << QString("no registered receiver for sync call: %1::%2 (objectName=\"%3\"). Params are:").arg(QString(className)).arg(QString(slot)).arg(objectName)
-              << params;
+               << params;
     return;
   }
 
     return;
   }
 
@@ -642,7 +660,7 @@ void SignalProxy::handleSync(AbstractPeer *sender, QVariantList params) {
   ExtendedMetaObject *eMeta = extendedMetaObject(receiver);
   if(!eMeta->slotMap().contains(slot)) {
     qWarning() << QString("no matching slot for sync call: %1::%2 (objectName=\"%3\"). Params are:").arg(QString(className)).arg(QString(slot)).arg(objectName)
   ExtendedMetaObject *eMeta = extendedMetaObject(receiver);
   if(!eMeta->slotMap().contains(slot)) {
     qWarning() << QString("no matching slot for sync call: %1::%2 (objectName=\"%3\"). Params are:").arg(QString(className)).arg(QString(slot)).arg(objectName)
-              << params;
+               << params;
     return;
   }
 
     return;
   }
 
@@ -663,8 +681,8 @@ void SignalProxy::handleSync(AbstractPeer *sender, QVariantList params) {
     int receiverId = eMeta->receiveMap()[slotId];
     QVariantList returnParams;
     returnParams << className
     int receiverId = eMeta->receiveMap()[slotId];
     QVariantList returnParams;
     returnParams << className
-                << objectName
-                << eMeta->methodName(receiverId);
+                 << objectName
+                 << eMeta->methodName(receiverId);
     //QByteArray(receiver->metaObject()->method(receiverId).signature());
     if(eMeta->argTypes(receiverId).count() > 1)
       returnParams << params;
     //QByteArray(receiver->metaObject()->method(receiverId).signature());
     if(eMeta->argTypes(receiverId).count() > 1)
       returnParams << params;
@@ -679,7 +697,7 @@ void SignalProxy::handleSync(AbstractPeer *sender, QVariantList params) {
 void SignalProxy::handleInitRequest(AbstractPeer *sender, const QVariantList &params) {
   if(params.count() != 2) {
     qWarning() << "SignalProxy::handleInitRequest() received initRequest with invalid param Count:"
 void SignalProxy::handleInitRequest(AbstractPeer *sender, const QVariantList &params) {
   if(params.count() != 2) {
     qWarning() << "SignalProxy::handleInitRequest() received initRequest with invalid param Count:"
-              << params;
+               << params;
     return;
   }
 
     return;
   }
 
@@ -688,13 +706,13 @@ void SignalProxy::handleInitRequest(AbstractPeer *sender, const QVariantList &pa
 
   if(!_syncSlave.contains(className)) {
     qWarning() << "SignalProxy::handleInitRequest() received initRequest for unregistered Class:"
 
   if(!_syncSlave.contains(className)) {
     qWarning() << "SignalProxy::handleInitRequest() received initRequest for unregistered Class:"
-              << className;
+               << className;
     return;
   }
 
   if(!_syncSlave[className].contains(objectName)) {
     qWarning() << "SignalProxy::handleInitRequest() received initRequest for unregistered Object:"
     return;
   }
 
   if(!_syncSlave[className].contains(objectName)) {
     qWarning() << "SignalProxy::handleInitRequest() received initRequest for unregistered Object:"
-              << className << objectName;
+               << className << objectName;
     return;
   }
 
     return;
   }
 
@@ -702,8 +720,8 @@ void SignalProxy::handleInitRequest(AbstractPeer *sender, const QVariantList &pa
 
   QVariantList params_;
   params_ << className
 
   QVariantList params_;
   params_ << className
-         << objectName
-         << initData(obj);
+          << objectName
+          << initData(obj);
 
   sender->dispatchSignal(InitData, params_);
 }
 
   sender->dispatchSignal(InitData, params_);
 }
@@ -712,7 +730,7 @@ void SignalProxy::handleInitData(AbstractPeer *sender, const QVariantList &param
   Q_UNUSED(sender)
   if(params.count() != 3) {
     qWarning() << "SignalProxy::handleInitData() received initData with invalid param Count:"
   Q_UNUSED(sender)
   if(params.count() != 3) {
     qWarning() << "SignalProxy::handleInitData() received initData with invalid param Count:"
-              << params;
+               << params;
     return;
   }
 
     return;
   }
 
@@ -722,13 +740,13 @@ void SignalProxy::handleInitData(AbstractPeer *sender, const QVariantList &param
 
   if(!_syncSlave.contains(className)) {
     qWarning() << "SignalProxy::handleInitData() received initData for unregistered Class:"
 
   if(!_syncSlave.contains(className)) {
     qWarning() << "SignalProxy::handleInitData() received initData for unregistered Class:"
-              << className;
+               << className;
     return;
   }
 
   if(!_syncSlave[className].contains(objectName)) {
     qWarning() << "SignalProxy::handleInitData() received initData for unregistered Object:"
     return;
   }
 
   if(!_syncSlave[className].contains(objectName)) {
     qWarning() << "SignalProxy::handleInitData() received initData for unregistered Object:"
-              << className << objectName;
+               << className << objectName;
     return;
   }
 
     return;
   }
 
@@ -768,8 +786,8 @@ bool SignalProxy::invokeSlot(QObject *receiver, int methodId, const QVariantList
   }
 
   void *_a[] = {0,              // return type...
   }
 
   void *_a[] = {0,              // return type...
-               0, 0, 0, 0 , 0, // and 10 args - that's the max size qt can handle with signals and slots
-               0, 0, 0, 0 , 0};
+                0, 0, 0, 0 , 0, // and 10 args - that's the max size qt can handle with signals and slots
+                0, 0, 0, 0 , 0};
 
   // check for argument compatibility and build params array
   for(int i = 0; i < numArgs; i++) {
 
   // check for argument compatibility and build params array
   for(int i = 0; i < numArgs; i++) {
@@ -884,8 +902,8 @@ bool SignalProxy::readDataFromDevice(QIODevice *dev, quint32 &blockSize, QVarian
     if(nbytes <= 4) {
       const char *data = rawItem.constData();
       if(nbytes < 4 || (data[0]!=0 || data[1]!=0 || data[2]!=0 || data[3]!=0)) {
     if(nbytes <= 4) {
       const char *data = rawItem.constData();
       if(nbytes < 4 || (data[0]!=0 || data[1]!=0 || data[2]!=0 || data[3]!=0)) {
-       disconnectDevice(dev, tr("Peer sent corrupted compressed data!"));
-       return false;
+        disconnectDevice(dev, tr("Peer sent corrupted compressed data!"));
+        return false;
       }
     }
 
       }
     }
 
@@ -912,7 +930,7 @@ void SignalProxy::requestInit(SyncableObject *obj) {
 
   QVariantList params;
   params << obj->syncMetaObject()->className()
 
   QVariantList params;
   params << obj->syncMetaObject()->className()
-        << obj->objectName();
+         << obj->objectName();
   dispatchSignal(InitRequest, params);
 }
 
   dispatchSignal(InitRequest, params);
 }
 
@@ -940,12 +958,12 @@ void SignalProxy::sendHeartBeat() {
       IODevicePeer *ioPeer = static_cast<IODevicePeer *>(*peer);
       ioPeer->dispatchSignal(SignalProxy::HeartBeat, heartBeatParams);
       if(ioPeer->sentHeartBeats > 0) {
       IODevicePeer *ioPeer = static_cast<IODevicePeer *>(*peer);
       ioPeer->dispatchSignal(SignalProxy::HeartBeat, heartBeatParams);
       if(ioPeer->sentHeartBeats > 0) {
-       updateLag(ioPeer, ioPeer->sentHeartBeats * _heartBeatTimer.interval());
+        updateLag(ioPeer, ioPeer->sentHeartBeats * _heartBeatTimer.interval());
       }
       }
-      if(ioPeer->sentHeartBeats > 1)
+      if(ioPeer->sentHeartBeats >= maxHeartBeatCount())
         toClose.append(ioPeer);
       else
         toClose.append(ioPeer);
       else
-       ioPeer->sentHeartBeats++;
+        ioPeer->sentHeartBeats++;
     }
     ++peer;
   }
     }
     ++peer;
   }
@@ -1144,19 +1162,19 @@ const QHash<int, int> &SignalProxy::ExtendedMetaObject::receiveMap() {
     for(int i = 0; i < methodCount; i++) {
       requestSlot = _meta->method(i);
       if(requestSlot.methodType() != QMetaMethod::Slot)
     for(int i = 0; i < methodCount; i++) {
       requestSlot = _meta->method(i);
       if(requestSlot.methodType() != QMetaMethod::Slot)
-       continue;
+        continue;
 
       returnTypeName = requestSlot.typeName();
       if(QMetaType::Void == (QMetaType::Type)returnType(i))
 
       returnTypeName = requestSlot.typeName();
       if(QMetaType::Void == (QMetaType::Type)returnType(i))
-       continue;
+        continue;
 
       signature = QByteArray(requestSlot.signature());
       if(!signature.startsWith("request"))
 
       signature = QByteArray(requestSlot.signature());
       if(!signature.startsWith("request"))
-       continue;
+        continue;
 
       paramsPos = signature.indexOf('(');
       if(paramsPos == -1)
 
       paramsPos = signature.indexOf('(');
       if(paramsPos == -1)
-       continue;
+        continue;
 
       methodName = signature.left(paramsPos);
       params = signature.mid(paramsPos);
 
       methodName = signature.left(paramsPos);
       params = signature.mid(paramsPos);
@@ -1168,12 +1186,12 @@ const QHash<int, int> &SignalProxy::ExtendedMetaObject::receiveMap() {
       receiverId = _meta->indexOfSlot(signature);
 
       if(receiverId == -1) {
       receiverId = _meta->indexOfSlot(signature);
 
       if(receiverId == -1) {
-       signature = QMetaObject::normalizedSignature(methodName + "(" + returnTypeName + ")");
-       receiverId = _meta->indexOfSlot(signature);
+        signature = QMetaObject::normalizedSignature(methodName + "(" + returnTypeName + ")");
+        receiverId = _meta->indexOfSlot(signature);
       }
 
       if(receiverId != -1) {
       }
 
       if(receiverId != -1) {
-       receiveMap[i] = receiverId;
+        receiveMap[i] = receiverId;
       }
     }
     _receiveMap = receiveMap;
       }
     }
     _receiveMap = receiveMap;
index 3b8497c..867ffdb 100644 (file)
@@ -76,6 +76,11 @@ public:
   void setProxyMode(ProxyMode mode);
   inline ProxyMode proxyMode() const { return _proxyMode; }
 
   void setProxyMode(ProxyMode mode);
   inline ProxyMode proxyMode() const { return _proxyMode; }
 
+  void setHeartBeatInterval(int secs);
+  inline int heartBeatInterval() const { return _heartBeatInterval; }
+  void setMaxHeartBeatCount(int max);
+  inline int maxHeartBeatCount() const { return _maxHeartBeatCount; }
+
   bool addPeer(QIODevice *iodev);
   bool addPeer(SignalProxy *proxy);
   void removePeer(QObject *peer);
   bool addPeer(QIODevice *iodev);
   bool addPeer(SignalProxy *proxy);
   void removePeer(QObject *peer);
@@ -195,6 +200,8 @@ private:
 
   ProxyMode _proxyMode;
   QTimer _heartBeatTimer;
 
   ProxyMode _proxyMode;
   QTimer _heartBeatTimer;
+  int _heartBeatInterval;
+  int _maxHeartBeatCount;
 
   bool _secure; // determines if all connections are in a secured state (using ssl or internal connections)
 
 
   bool _secure; // determines if all connections are in a secured state (using ssl or internal connections)
 
@@ -238,7 +245,7 @@ public:
   inline int methodId(const QByteArray &methodName) { return _methodIds.contains(methodName) ? _methodIds[methodName] : -1; }
 
   inline int updatedRemotelyId() { return _updatedRemotelyId; }
   inline int methodId(const QByteArray &methodName) { return _methodIds.contains(methodName) ? _methodIds[methodName] : -1; }
 
   inline int updatedRemotelyId() { return _updatedRemotelyId; }
-  
+
   inline const QHash<QByteArray, int> &slotMap() { return _methodIds; }
   const QHash<int, int> &receiveMap();
 
   inline const QHash<QByteArray, int> &slotMap() { return _methodIds; }
   const QHash<int, int> &receiveMap();