X-Git-Url: https://git.quassel-irc.org/?p=quassel.git;a=blobdiff_plain;f=src%2Fcommon%2Fsignalproxy.cpp;h=94aa063f55e6b909298ac5ceff66f92cdbd96bc3;hp=76fccb73eb7013865015f9a5b0d5d3b6e2f2468a;hb=cc0cad583046c34cb1296329016e1834995d171d;hpb=fe313ca8ab4c082c63b27f5e1c23541975c7f47f diff --git a/src/common/signalproxy.cpp b/src/common/signalproxy.cpp index 76fccb73..94aa063f 100644 --- a/src/common/signalproxy.cpp +++ b/src/common/signalproxy.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -32,12 +33,28 @@ #include #include #include +#include +#include +#include #include "syncableobject.h" #include "util.h" -class SignalRelay: public QObject { +// ================================================== +// PeerSignalEvent +// ================================================== +class PeerSignalEvent : public QEvent { +public: + PeerSignalEvent(SignalProxy *sender, SignalProxy::RequestType requestType, const QVariantList ¶ms) : QEvent(QEvent::User), sender(sender), requestType(requestType), params(params) {} + SignalProxy *sender; + SignalProxy::RequestType requestType; + QVariantList params; +}; +// ================================================== +// SIGNALRELAY +// ================================================== +class SignalRelay: public QObject { /* Q_OBJECT is not necessary or even allowed, because we implement qt_metacall ourselves (and don't use any other features of the meta object system) @@ -178,24 +195,52 @@ void SignalRelay::attachSignal(int methodId, const QByteArray &func) { } sigNames.insert(methodId, fn); } -// ==================== -// /SIGNALRELAY -// ==================== +// ================================================== +// Peers +// ================================================== +void SignalProxy::IODevicePeer::dispatchSignal(const RequestType &requestType, const QVariantList ¶ms) { + QVariantList packedFunc; + packedFunc << (qint16)requestType + << params; + dispatchPackedFunc(QVariant(packedFunc)); +} + +QString SignalProxy::IODevicePeer::address() const { + QAbstractSocket *socket = qobject_cast(_device); + if(socket) + return socket->peerAddress().toString(); + else + return QString(); +} + +void SignalProxy::SignalProxyPeer::dispatchSignal(const RequestType &requestType, const QVariantList ¶ms) { + Qt::ConnectionType type = QThread::currentThread() == receiver->thread() + ? Qt::DirectConnection + : Qt::QueuedConnection; -// ==================== + if(type == Qt::DirectConnection) { + receiver->receivePeerSignal(sender, requestType, params); + } else { + QCoreApplication::postEvent(receiver, new PeerSignalEvent(sender, requestType, params)); + } +} + +// ================================================== // SignalProxy -// ==================== +// ================================================== SignalProxy::SignalProxy(QObject* parent) : QObject(parent) { setProxyMode(Client); + init(); } SignalProxy::SignalProxy(ProxyMode mode, QObject* parent) : QObject(parent) { setProxyMode(mode); + init(); } SignalProxy::SignalProxy(ProxyMode mode, QIODevice* device, QObject* parent) @@ -203,27 +248,33 @@ SignalProxy::SignalProxy(ProxyMode mode, QIODevice* device, QObject* parent) { setProxyMode(mode); addPeer(device); + init(); } SignalProxy::~SignalProxy() { QList senders = _relayHash.keys(); foreach(QObject* sender, senders) detachObject(sender); - - // close peer connections - foreach(QIODevice *device, _peers.keys()) { - device->close(); - delete device; - } + removeAllPeers(); } void SignalProxy::setProxyMode(ProxyMode mode) { - foreach(QIODevice* peer, _peers.keys()) { - if(peer->isOpen()) { - qWarning() << "SignalProxy: Cannot change proxy mode while connected"; + PeerHash::iterator peer = _peers.begin(); + while(peer != _peers.end()) { + if((*peer)->type() != AbstractPeer::IODevicePeer) { + IODevicePeer *ioPeer = static_cast(*peer); + if(ioPeer->isOpen()) { + qWarning() << "SignalProxy: Cannot change proxy mode while connected"; + return; + } + } + if((*peer)->type() != AbstractPeer::SignalProxyPeer) { + qWarning() << "SignalProxy: Cannot change proxy mode while connected to another internal SignalProxy"; return; } + peer++; } + _proxyMode = mode; if(mode == Server) initServer(); @@ -231,20 +282,16 @@ void SignalProxy::setProxyMode(ProxyMode mode) { initClient(); } -SignalProxy::ProxyMode SignalProxy::proxyMode() const { - return _proxyMode; +void SignalProxy::init() { + connect(&_heartBeatTimer, SIGNAL(timeout()), this, SLOT(sendHeartBeat())); + _heartBeatTimer.start(30 * 1000); } void SignalProxy::initServer() { - disconnect(&_heartBeatTimer, 0, this, 0); - _heartBeatTimer.stop(); } void SignalProxy::initClient() { attachSlot("__objectRenamed__", this, SLOT(objectRenamed(QByteArray, QString, QString))); - connect(&_heartBeatTimer, SIGNAL(timeout()), - this, SLOT(sendHeartBeat())); - _heartBeatTimer.start(60 * 1000); // msecs: one beep per minute } bool SignalProxy::addPeer(QIODevice* iodev) { @@ -270,9 +317,7 @@ bool SignalProxy::addPeer(QIODevice* iodev) { connect(sock, SIGNAL(disconnected()), this, SLOT(removePeerBySender())); } - _peers[iodev] = peerInfo(); - if(iodev->property("UseCompression").toBool()) - _peers[iodev].usesCompression = true; + _peers[iodev] = new IODevicePeer(iodev, iodev->property("UseCompression").toBool()); if(_peers.count() == 1) emit connected(); @@ -280,43 +325,70 @@ bool SignalProxy::addPeer(QIODevice* iodev) { return true; } -void SignalProxy::removePeer(QIODevice* iodev) { - if(_peers.isEmpty()) { - qWarning() << "SignalProxy::removePeer(): No peers in use!"; - return; +bool SignalProxy::addPeer(SignalProxy* proxy) { + if(!proxy) + return false; + + if(proxyMode() == proxy->proxyMode()) { + qWarning() << "SignalProxy::addPeer(): adding a SignalProxy as peer requires one proxy to be server and one client!"; + return false; } - if(proxyMode() == Server && !iodev) { - // disconnect all - QList peers = _peers.keys(); - foreach(QIODevice *peer, peers) - removePeer(peer); + if(_peers.contains(proxy)) { + return true; + } + + if(proxyMode() == Client && !_peers.isEmpty()) { + qWarning("SignalProxy: only one peer allowed in client mode!"); + return false; } - if(proxyMode() != Server && !iodev) - iodev = _peers.keys().first(); + _peers[proxy] = new SignalProxyPeer(this, proxy); + + proxy->addPeer(this); - Q_ASSERT(iodev); + if(_peers.count() == 1) + emit connected(); + + return true; +} - if(!_peers.contains(iodev)) { - qWarning() << "SignalProxy: unknown QIODevice" << iodev; +void SignalProxy::removeAllPeers() { + Q_ASSERT(proxyMode() == Server || _peers.count() <= 1); + // wee need to copy that list since we modify it in the loop + QList peers = _peers.keys(); + foreach(QObject *peer, peers) { + removePeer(peer); + } +} + +void SignalProxy::removePeer(QObject* dev) { + if(_peers.isEmpty()) { + qWarning() << "SignalProxy::removePeer(): No peers in use!"; return; } - _peers.remove(iodev); + Q_ASSERT(dev); + if(!_peers.contains(dev)) { + qWarning() << "SignalProxy: unknown Peer" << dev; + return; + } - disconnect(iodev, 0, this, 0); - emit peerRemoved(iodev); + AbstractPeer *peer = _peers[dev]; + _peers.remove(dev); + + disconnect(dev, 0, this, 0); + if(peer->type() == AbstractPeer::IODevicePeer) + emit peerRemoved(static_cast(dev)); + + delete peer; if(_peers.isEmpty()) emit disconnected(); } void SignalProxy::removePeerBySender() { - // OK we're brutal here... but since it's a private slot we know what we've got connected to it... - // this Slot is not triggered by destroyed, so the object is still alive and can be used! - QIODevice *ioDev = (QIODevice *)(sender()); - removePeer(ioDev); + removePeer(sender()); } void SignalProxy::objectRenamed(const QString &newname, const QString &oldname) { @@ -616,15 +688,6 @@ void SignalProxy::synchronize(SyncableObject *obj) { } } -// void SignalProxy::setInitialized(SyncableObject *obj) { -// obj->setInitialized(); -// emit objectInitialized(obj); -// } - -// bool SignalProxy::isInitialized(SyncableObject *obj) const { -// return obj->isInitialized(); -// } - void SignalProxy::requestInit(SyncableObject *obj) { if(proxyMode() == Server || obj->isInitialized()) return; @@ -677,58 +740,84 @@ void SignalProxy::stopSync(SyncableObject* obj) { } } -void SignalProxy::dispatchSignal(QIODevice *receiver, const RequestType &requestType, const QVariantList ¶ms) { - Q_ASSERT(_peers.contains(receiver)); - QVariantList packedFunc; - packedFunc << (qint16)requestType; - packedFunc << params; - writeDataToDevice(receiver, QVariant(packedFunc), _peers[receiver].usesCompression); -} - void SignalProxy::dispatchSignal(const RequestType &requestType, const QVariantList ¶ms) { - // yes I know we have a little code duplication here... it's for the sake of performance - QVariantList packedFunc; - packedFunc << (qint16)requestType; - packedFunc << params; - foreach(QIODevice* dev, _peers.keys()) { - Q_ASSERT(_peers.contains(dev)); - writeDataToDevice(dev, QVariant(packedFunc), _peers[dev].usesCompression); + QVariant packedFunc(QVariantList() << (qint16)requestType << params); + PeerHash::iterator peer = _peers.begin(); + while(peer != _peers.end()) { + switch((*peer)->type()) { + case AbstractPeer::IODevicePeer: + { + IODevicePeer *ioPeer = static_cast(*peer); + ioPeer->dispatchPackedFunc(packedFunc); + } + break; + case AbstractPeer::SignalProxyPeer: + (*peer)->dispatchSignal(requestType, params); + break; + default: + Q_ASSERT(false); // there shouldn't be any peers with wrong / unknown type + } + peer++; } } -void SignalProxy::receivePeerSignal(QIODevice *sender, const QVariant &packedFunc) { +void SignalProxy::receivePackedFunc(AbstractPeer *sender, const QVariant &packedFunc) { QVariantList params(packedFunc.toList()); if(params.isEmpty()) { qWarning() << "SignalProxy::receivePeerSignal(): received incompatible Data:" << packedFunc; return; } - - int callType = params.takeFirst().value(); - switch(callType) { + RequestType requestType = (RequestType)params.takeFirst().value(); + receivePeerSignal(sender, requestType, params); +} + +void SignalProxy::receivePeerSignal(AbstractPeer *sender, const RequestType &requestType, const QVariantList ¶ms) { + switch(requestType) { case RpcCall: - if(params.empty()) { + if(params.empty()) qWarning() << "SignalProxy::receivePeerSignal(): received empty RPC-Call"; - return; - } else { - return handleSignal(params.takeFirst().toByteArray(), params); - } + else + handleSignal(params); + //handleSignal(params.takeFirst().toByteArray(), params); + break; + case Sync: - return handleSync(sender, params); + handleSync(sender, params); + break; + case InitRequest: - return handleInitRequest(sender, params); + handleInitRequest(sender, params); + break; + case InitData: - return handleInitData(sender, params); + handleInitData(sender, params); + break; + case HeartBeat: - return; + receiveHeartBeat(sender, params); + break; + + case HeartBeatReply: + receiveHeartBeatReply(sender, params); + break; + default: - qWarning() << "SignalProxy::receivePeerSignal(): received undefined CallType" << callType << params; + qWarning() << "SignalProxy::receivePeerSignal(): received undefined CallType" << requestType << params; + } +} + +void SignalProxy::receivePeerSignal(SignalProxy *sender, const RequestType &requestType, const QVariantList ¶ms) { + if(!_peers.contains(sender)) { + // we output only the pointer value. otherwise Qt would try to pretty print. As the object might already been destroyed, this is not a good idea. + qWarning() << "SignalProxy::receivePeerSignal(): received Signal from unknown Proxy" << reinterpret_cast(sender); return; } + receivePeerSignal(_peers[sender], requestType, params); } -void SignalProxy::handleSync(QIODevice *sender, QVariantList params) { +void SignalProxy::handleSync(AbstractPeer *sender, QVariantList params) { if(params.count() < 3) { qWarning() << "received invalid Sync call" << params; return; @@ -768,14 +857,14 @@ void SignalProxy::handleSync(QIODevice *sender, QVariantList params) { if(argTypes(receiver, receiverId).count() > 1) returnParams << params; returnParams << returnValue; - dispatchSignal(sender, Sync, returnParams); + sender->dispatchSignal(Sync, returnParams); } // send emit update signal invokeSlot(receiver, updatedRemotelyId(receiver)); } -void SignalProxy::handleInitRequest(QIODevice *sender, const QVariantList ¶ms) { +void SignalProxy::handleInitRequest(AbstractPeer *sender, const QVariantList ¶ms) { if(params.count() != 2) { qWarning() << "SignalProxy::handleInitRequest() received initRequest with invalid param Count:" << params; @@ -804,10 +893,10 @@ void SignalProxy::handleInitRequest(QIODevice *sender, const QVariantList ¶m << objectName << initData(obj); - dispatchSignal(sender, InitData, params_); + sender->dispatchSignal(InitData, params_); } -void SignalProxy::handleInitData(QIODevice *sender, const QVariantList ¶ms) { +void SignalProxy::handleInitData(AbstractPeer *sender, const QVariantList ¶ms) { Q_UNUSED(sender) if(params.count() != 3) { qWarning() << "SignalProxy::handleInitData() received initData with invalid param Count:" @@ -835,7 +924,11 @@ void SignalProxy::handleInitData(QIODevice *sender, const QVariantList ¶ms) setInitData(obj, propertyMap); } -void SignalProxy::handleSignal(const QByteArray &funcName, const QVariantList ¶ms) { +//void SignalProxy::handleSignal(const QByteArray &funcName, const QVariantList ¶ms) { +void SignalProxy::handleSignal(const QVariantList &data) { + QVariantList params = data; + QByteArray funcName = params.takeFirst().toByteArray(); + QObject* receiver; int methodId; SlotHash::const_iterator slot = _attachedSlots.constFind(funcName); @@ -902,16 +995,17 @@ bool SignalProxy::invokeSlot(QObject *receiver, int methodId, const QVariantList void SignalProxy::dataAvailable() { // yet again. it's a private slot. no need for checks. QIODevice* ioDev = qobject_cast(sender()); - Q_ASSERT(_peers.contains(ioDev)); + Q_ASSERT(_peers.contains(ioDev) && _peers[ioDev]->type() == AbstractPeer::IODevicePeer); + IODevicePeer *peer = static_cast(_peers[ioDev]); QVariant var; - while(readDataFromDevice(ioDev, _peers[ioDev].byteCount, var, _peers[ioDev].usesCompression)) - receivePeerSignal(ioDev, var); + while(peer->readData(var)) + receivePackedFunc(peer, var); } void SignalProxy::writeDataToDevice(QIODevice *dev, const QVariant &item, bool compressed) { QAbstractSocket* sock = qobject_cast(dev); if(!dev->isOpen() || (sock && sock->state()!=QAbstractSocket::ConnectedState)) { - qWarning("SignalProxy: Can't call on a closed device"); + qWarning("SignalProxy: Can't call write on a closed device"); return; } @@ -949,12 +1043,29 @@ bool SignalProxy::readDataFromDevice(QIODevice *dev, quint32 &blockSize, QVarian in >> blockSize; } + if(blockSize > 1 << 22) { + qWarning() << qPrintable(tr("Client tried to send package larger than max package size!")); + QAbstractSocket *sock = qobject_cast(dev); + qWarning() << qPrintable(tr("Disconnecting")) << (sock ? qPrintable(sock->peerAddress().toString()) : qPrintable(tr("local client"))); + dev->close(); + return false; + } + if(dev->bytesAvailable() < blockSize) return false; if(compressed) { QByteArray rawItem; in >> rawItem; + // debug check + int nbytes = rawItem.size(); + if (nbytes <= 4) { + const char *data = rawItem.constData(); + if (nbytes < 4 || (data[0]!=0 || data[1]!=0 || data[2]!=0 || data[3]!=0)) + qWarning() << "receieved corrupted compressed data:" + << blockSize << rawItem << rawItem.size() << dev; + } + // end rawItem = qUncompress(rawItem); QDataStream itemStream(&rawItem, QIODevice::ReadOnly); @@ -1025,9 +1136,72 @@ void SignalProxy::setInitData(SyncableObject *obj, const QVariantMap &properties } void SignalProxy::sendHeartBeat() { - dispatchSignal(SignalProxy::HeartBeat, QVariantList()); + QVariantList heartBeatParams; + heartBeatParams << QTime::currentTime(); + PeerHash::iterator peer = _peers.begin(); + while(peer != _peers.end()) { + if((*peer)->type() == AbstractPeer::IODevicePeer) { + IODevicePeer *ioPeer = static_cast(*peer); + ioPeer->dispatchSignal(SignalProxy::HeartBeat, heartBeatParams); + if(ioPeer->sentHeartBeats > 0) { + updateLag(ioPeer, ioPeer->sentHeartBeats * _heartBeatTimer.interval()); + } + if(ioPeer->sentHeartBeats > 1) { + qWarning() << "SignalProxy: Disconnecting peer:" << ioPeer->address() + << "(didn't receive a heartbeat for over" << ioPeer->sentHeartBeats * _heartBeatTimer.interval() / 1000 << "seconds)"; + ioPeer->close(); + } else { + ioPeer->sentHeartBeats++; + } + } + peer++; + } +} + +void SignalProxy::receiveHeartBeat(AbstractPeer *peer, const QVariantList ¶ms) { + peer->dispatchSignal(SignalProxy::HeartBeatReply, params); +} + +void SignalProxy::receiveHeartBeatReply(AbstractPeer *peer, const QVariantList ¶ms) { + if(peer->type() != AbstractPeer::IODevicePeer) { + qWarning() << "SignalProxy::receiveHeartBeatReply: received heart beat from a non IODevicePeer!"; + return; + } + + IODevicePeer *ioPeer = static_cast(peer); + ioPeer->sentHeartBeats = 0; + + if(params.isEmpty()) { + qWarning() << "SignalProxy: received heart beat reply with less params then sent from:" << ioPeer->address(); + return; + } + + QTime sendTime = params[0].value(); + updateLag(ioPeer, sendTime.msecsTo(QTime::currentTime()) / 2); } +void SignalProxy::customEvent(QEvent *event) { + switch(event->type()) { + case QEvent::User: + { + PeerSignalEvent *sig = static_cast(event); + receivePeerSignal(sig->sender, sig->requestType, sig->params); + } + event->accept(); + break; + default: + return; + } +} + +void SignalProxy::updateLag(IODevicePeer *peer, int lag) { + peer->lag = lag; + if(proxyMode() == Client) { + emit lagUpdated(lag); + } +} + + void SignalProxy::dumpProxyStats() { QString mode; if(proxyMode() == Server)