From 68cbd5e48cb870f641e538aa69f46f4c9e31f3ae Mon Sep 17 00:00:00 2001 From: Marcus Eggenberger Date: Fri, 18 Jul 2008 15:25:45 +0200 Subject: [PATCH] improved connection loss detection. (breaks protocol) --- src/common/signalproxy.cpp | 74 +++++++++++++++++++++++++------------- src/common/signalproxy.h | 7 ++-- version.inc | 6 ++-- 3 files changed, 57 insertions(+), 30 deletions(-) diff --git a/src/common/signalproxy.cpp b/src/common/signalproxy.cpp index 20392081..9a3c5576 100644 --- a/src/common/signalproxy.cpp +++ b/src/common/signalproxy.cpp @@ -191,12 +191,14 @@ SignalProxy::SignalProxy(QObject* parent) : QObject(parent) { setProxyMode(Client); + init(); } SignalProxy::SignalProxy(ProxyMode mode, QObject* parent) : QObject(parent) { setProxyMode(mode); + init(); } SignalProxy::SignalProxy(ProxyMode mode, QIODevice* device, QObject* parent) @@ -204,6 +206,7 @@ SignalProxy::SignalProxy(ProxyMode mode, QIODevice* device, QObject* parent) { setProxyMode(mode); addPeer(device); + init(); } SignalProxy::~SignalProxy() { @@ -236,16 +239,16 @@ SignalProxy::ProxyMode SignalProxy::proxyMode() const { return _proxyMode; } +void SignalProxy::init() { + connect(&_heartBeatTimer, SIGNAL(timeout()), this, SLOT(sendHeartBeat())); + _heartBeatTimer.start(30 * 1000); +} + void SignalProxy::initServer() { - disconnect(&_heartBeatTimer, 0, this, 0); - _heartBeatTimer.stop(); } void SignalProxy::initClient() { attachSlot("__objectRenamed__", this, SLOT(objectRenamed(QByteArray, QString, QString))); - connect(&_heartBeatTimer, SIGNAL(timeout()), - this, SLOT(sendHeartBeat())); - _heartBeatTimer.start(60 * 1000); // msecs: one beep per minute } bool SignalProxy::addPeer(QIODevice* iodev) { @@ -617,15 +620,6 @@ void SignalProxy::synchronize(SyncableObject *obj) { } } -// void SignalProxy::setInitialized(SyncableObject *obj) { -// obj->setInitialized(); -// emit objectInitialized(obj); -// } - -// bool SignalProxy::isInitialized(SyncableObject *obj) const { -// return obj->isInitialized(); -// } - void SignalProxy::requestInit(SyncableObject *obj) { if(proxyMode() == Server || obj->isInitialized()) return; @@ -709,23 +703,30 @@ void SignalProxy::receivePeerSignal(QIODevice *sender, const QVariant &packedFun switch(callType) { case RpcCall: - if(params.empty()) { + if(params.empty()) qWarning() << "SignalProxy::receivePeerSignal(): received empty RPC-Call"; - return; - } else { - return handleSignal(params.takeFirst().toByteArray(), params); - } + else + handleSignal(params.takeFirst().toByteArray(), params); + break; + case Sync: - return handleSync(sender, params); + handleSync(sender, params); + break; + case InitRequest: - return handleInitRequest(sender, params); + handleInitRequest(sender, params); + break; + case InitData: - return handleInitData(sender, params); + handleInitData(sender, params); + break; + case HeartBeat: - return; + receiveHeartBeat(sender); + break; + default: qWarning() << "SignalProxy::receivePeerSignal(): received undefined CallType" << callType << params; - return; } } @@ -952,7 +953,7 @@ bool SignalProxy::readDataFromDevice(QIODevice *dev, quint32 &blockSize, QVarian if(blockSize > 1 << 22) { qWarning() << qPrintable(tr("Client tried to send package larger than max package size!")); - QAbstractSocket* sock = qobject_cast(dev); + QAbstractSocket *sock = qobject_cast(dev); qWarning() << qPrintable(tr("Disconnecting")) << (sock ? qPrintable(sock->peerAddress().toString()) : qPrintable(tr("local client"))); dev->close(); return false; @@ -1035,6 +1036,29 @@ void SignalProxy::setInitData(SyncableObject *obj, const QVariantMap &properties void SignalProxy::sendHeartBeat() { dispatchSignal(SignalProxy::HeartBeat, QVariantList()); + QHash::iterator peerIter = _peers.begin(); + QHash::iterator peerIterEnd = _peers.end(); + while(peerIter != peerIterEnd) { + if(peerIter->sentHeartBeats > 1) { + QAbstractSocket *socket = qobject_cast(peerIter.key()); + qWarning() << "SignalProxy: Disconnecting peer:" + << (socket ? qPrintable(socket->peerAddress().toString()) : "local client") + << "(didn't receive a heartbeat for over" << peerIter->sentHeartBeats * _heartBeatTimer.interval() / 1000 << "seconds)"; + peerIter.key()->close(); + } else { + peerIter->sentHeartBeats++; + } + peerIter++; + } +} + +void SignalProxy::receiveHeartBeat(QIODevice *dev) { + if(!_peers.contains(dev)) { + qWarning() << "SignalProxy: received heart beat from unknown Device:" << dev; + return; + } + + _peers[dev].sentHeartBeats = 0; } void SignalProxy::dumpProxyStats() { diff --git a/src/common/signalproxy.h b/src/common/signalproxy.h index b39e7ca8..557e8dd3 100644 --- a/src/common/signalproxy.h +++ b/src/common/signalproxy.h @@ -120,14 +120,16 @@ private slots: void objectRenamed(const QString &newname, const QString &oldname); void objectRenamed(const QByteArray &classname, const QString &newname, const QString &oldname); void sendHeartBeat(); + void receiveHeartBeat(QIODevice *dev); signals: - void peerRemoved(QIODevice *obj); + void peerRemoved(QIODevice *dev); void connected(); void disconnected(); void objectInitialized(SyncableObject *); private: + void init(); void initServer(); void initClient(); @@ -167,7 +169,8 @@ private: struct peerInfo { quint32 byteCount; bool usesCompression; - peerInfo() : byteCount(0), usesCompression(false) {}; + int sentHeartBeats; + peerInfo() : byteCount(0), usesCompression(false), sentHeartBeats(0) {} }; //QHash _peerByteCount; QHash _peers; diff --git a/version.inc b/version.inc index 7c981c8b..cb3734e0 100644 --- a/version.inc +++ b/version.inc @@ -3,7 +3,7 @@ //! This is the fallback version number in case we can't autogenerate one quasselBaseVersion = "0.3.0-pre"; -protocolVersion = 2; //< Version of the client/core protocol +protocolVersion = 3; //< Version of the client/core protocol -coreNeedsProtocol = 2; //< Minimum protocol version the core needs -clientNeedsProtocol = 2; //< Minimum protocol version the client needs +coreNeedsProtocol = 3; //< Minimum protocol version the core needs +clientNeedsProtocol = 3; //< Minimum protocol version the client needs -- 2.20.1