X-Git-Url: https://git.quassel-irc.org/?p=quassel.git;a=blobdiff_plain;f=src%2Fcommon%2Fsignalproxy.cpp;h=0350ac6542e9160ec1398858bcaf470b2bb96b62;hp=9806acfcea9c029e171c00db445fdc60b5f1c2e6;hb=f12ff6555fc60d165d5057902a91cdff355816b9;hpb=bc8d12b721ab61014fe7700982dbc408d98d54f1 diff --git a/src/common/signalproxy.cpp b/src/common/signalproxy.cpp index 9806acfc..0350ac65 100644 --- a/src/common/signalproxy.cpp +++ b/src/common/signalproxy.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -32,6 +33,7 @@ #include #include #include +#include #include "syncableobject.h" #include "util.h" @@ -190,12 +192,14 @@ SignalProxy::SignalProxy(QObject* parent) : QObject(parent) { setProxyMode(Client); + init(); } SignalProxy::SignalProxy(ProxyMode mode, QObject* parent) : QObject(parent) { setProxyMode(mode); + init(); } SignalProxy::SignalProxy(ProxyMode mode, QIODevice* device, QObject* parent) @@ -203,6 +207,7 @@ SignalProxy::SignalProxy(ProxyMode mode, QIODevice* device, QObject* parent) { setProxyMode(mode); addPeer(device); + init(); } SignalProxy::~SignalProxy() { @@ -231,20 +236,16 @@ void SignalProxy::setProxyMode(ProxyMode mode) { initClient(); } -SignalProxy::ProxyMode SignalProxy::proxyMode() const { - return _proxyMode; +void SignalProxy::init() { + connect(&_heartBeatTimer, SIGNAL(timeout()), this, SLOT(sendHeartBeat())); + _heartBeatTimer.start(30 * 1000); } void SignalProxy::initServer() { - disconnect(&_heartBeatTimer, 0, this, 0); - _heartBeatTimer.stop(); } void SignalProxy::initClient() { attachSlot("__objectRenamed__", this, SLOT(objectRenamed(QByteArray, QString, QString))); - connect(&_heartBeatTimer, SIGNAL(timeout()), - this, SLOT(sendHeartBeat())); - _heartBeatTimer.start(60 * 1000); // msecs: one beep per minute } bool SignalProxy::addPeer(QIODevice* iodev) { @@ -606,23 +607,18 @@ void SignalProxy::synchronize(SyncableObject *obj) { if(proxyMode() == Server) { connect(obj, SIGNAL(objectRenamed(QString, QString)), this, SLOT(objectRenamed(QString, QString))); - setInitialized(obj); + obj->setInitialized(); + emit objectInitialized(obj); } else { - requestInit(obj); + if(obj->isInitialized()) + emit objectInitialized(obj); + else + requestInit(obj); } } -void SignalProxy::setInitialized(SyncableObject *obj) { - obj->setInitialized(); - emit objectInitialized(obj); -} - -bool SignalProxy::isInitialized(SyncableObject *obj) const { - return obj->isInitialized(); -} - void SignalProxy::requestInit(SyncableObject *obj) { - if(proxyMode() == Server || isInitialized(obj)) + if(proxyMode() == Server || obj->isInitialized()) return; QVariantList params; @@ -704,23 +700,34 @@ void SignalProxy::receivePeerSignal(QIODevice *sender, const QVariant &packedFun switch(callType) { case RpcCall: - if(params.empty()) { + if(params.empty()) qWarning() << "SignalProxy::receivePeerSignal(): received empty RPC-Call"; - return; - } else { - return handleSignal(params.takeFirst().toByteArray(), params); - } + else + handleSignal(params.takeFirst().toByteArray(), params); + break; + case Sync: - return handleSync(sender, params); + handleSync(sender, params); + break; + case InitRequest: - return handleInitRequest(sender, params); + handleInitRequest(sender, params); + break; + case InitData: - return handleInitData(sender, params); + handleInitData(sender, params); + break; + case HeartBeat: - return; + receiveHeartBeat(sender, params); + break; + + case HeartBeatReply: + receiveHeartBeatReply(sender, params); + break; + default: qWarning() << "SignalProxy::receivePeerSignal(): received undefined CallType" << callType << params; - return; } } @@ -907,7 +914,7 @@ void SignalProxy::dataAvailable() { void SignalProxy::writeDataToDevice(QIODevice *dev, const QVariant &item, bool compressed) { QAbstractSocket* sock = qobject_cast(dev); if(!dev->isOpen() || (sock && sock->state()!=QAbstractSocket::ConnectedState)) { - qWarning("SignalProxy: Can't call on a closed device"); + qWarning("SignalProxy: Can't call write on a closed device"); return; } @@ -945,6 +952,14 @@ bool SignalProxy::readDataFromDevice(QIODevice *dev, quint32 &blockSize, QVarian in >> blockSize; } + if(blockSize > 1 << 22) { + qWarning() << qPrintable(tr("Client tried to send package larger than max package size!")); + QAbstractSocket *sock = qobject_cast(dev); + qWarning() << qPrintable(tr("Disconnecting")) << (sock ? qPrintable(sock->peerAddress().toString()) : qPrintable(tr("local client"))); + dev->close(); + return false; + } + if(dev->bytesAvailable() < blockSize) return false; @@ -1012,17 +1027,68 @@ QVariantMap SignalProxy::initData(SyncableObject *obj) const { } void SignalProxy::setInitData(SyncableObject *obj, const QVariantMap &properties) { - if(isInitialized(obj)) + if(obj->isInitialized()) return; obj->fromVariantMap(properties); - setInitialized(obj); + obj->setInitialized(); + emit objectInitialized(obj); invokeSlot(obj, updatedRemotelyId(obj)); } void SignalProxy::sendHeartBeat() { - dispatchSignal(SignalProxy::HeartBeat, QVariantList()); + dispatchSignal(SignalProxy::HeartBeat, QVariantList() << QTime::currentTime()); + QHash::iterator peerIter = _peers.begin(); + QHash::iterator peerIterEnd = _peers.end(); + while(peerIter != peerIterEnd) { + if(peerIter->sentHeartBeats > 0) { + updateLag(peerIter.key(), _heartBeatTimer.interval()); + } + if(peerIter->sentHeartBeats > 1) { + QAbstractSocket *socket = qobject_cast(peerIter.key()); + qWarning() << "SignalProxy: Disconnecting peer:" + << (socket ? qPrintable(socket->peerAddress().toString()) : "local client") + << "(didn't receive a heartbeat for over" << peerIter->sentHeartBeats * _heartBeatTimer.interval() / 1000 << "seconds)"; + peerIter.key()->close(); + } else { + peerIter->sentHeartBeats++; + } + peerIter++; + } +} + +void SignalProxy::receiveHeartBeat(QIODevice *dev, const QVariantList ¶ms) { + if(!_peers.contains(dev)) { + qWarning() << "SignalProxy: received heart beat from unknown Device:" << dev; + } + dispatchSignal(dev, SignalProxy::HeartBeatReply, params); +} + +void SignalProxy::receiveHeartBeatReply(QIODevice *dev, const QVariantList ¶ms) { + if(!_peers.contains(dev)) { + qWarning() << "SignalProxy: received heart beat reply from unknown Device:" << dev; + return; + } + + _peers[dev].sentHeartBeats = 0; + + if(params.isEmpty()) { + qWarning() << "SignalProxy: received heart beat reply with less params then sent from:" << dev; + return; + } + + QTime sendTime = params[0].value(); + updateLag(dev, sendTime.msecsTo(QTime::currentTime()) / 2); } +void SignalProxy::updateLag(QIODevice *dev, int lag) { + Q_ASSERT(_peers.contains(dev)); + _peers[dev].lag = lag; + if(proxyMode() == Client) { + emit lagUpdated(lag); + } +} + + void SignalProxy::dumpProxyStats() { QString mode; if(proxyMode() == Server)