From: Marcus Eggenberger Date: Sun, 19 Oct 2008 14:51:17 +0000 (+0200) Subject: SignalProxy internal redesign. X-Git-Tag: 0.3.1~149 X-Git-Url: https://git.quassel-irc.org/?p=quassel.git;a=commitdiff_plain;h=f24b79010368ac773923d29187c15c33aea69bc9 SignalProxy internal redesign. New Peer type that allows direct communication between two signalproxies without the need of a QIODevice. --- diff --git a/src/common/signalproxy.cpp b/src/common/signalproxy.cpp index 014e4b35..1b05ab43 100644 --- a/src/common/signalproxy.cpp +++ b/src/common/signalproxy.cpp @@ -38,8 +38,11 @@ #include "syncableobject.h" #include "util.h" -class SignalRelay: public QObject { +// ================================================== +// SIGNALRELAY +// ================================================== +class SignalRelay: public QObject { /* Q_OBJECT is not necessary or even allowed, because we implement qt_metacall ourselves (and don't use any other features of the meta object system) @@ -180,14 +183,20 @@ void SignalRelay::attachSignal(int methodId, const QByteArray &func) { } sigNames.insert(methodId, fn); } -// ==================== -// /SIGNALRELAY -// ==================== +// ================================================== +// Peers +// ================================================== +void SignalProxy::IODevicePeer::dispatchSignal(const RequestType &requestType, const QVariantList ¶ms) { + QVariantList packedFunc; + packedFunc << (qint16)requestType + << params; + dispatchPackedFunc(QVariant(packedFunc)); +} -// ==================== +// ================================================== // SignalProxy -// ==================== +// ================================================== SignalProxy::SignalProxy(QObject* parent) : QObject(parent) { @@ -214,21 +223,25 @@ SignalProxy::~SignalProxy() { QList senders = _relayHash.keys(); foreach(QObject* sender, senders) detachObject(sender); - - // close peer connections - foreach(QIODevice *device, _peers.keys()) { - device->close(); - delete device; - } } void SignalProxy::setProxyMode(ProxyMode mode) { - foreach(QIODevice* peer, _peers.keys()) { - if(peer->isOpen()) { - qWarning() << "SignalProxy: Cannot change proxy mode while connected"; + PeerHash::iterator peer = _peers.begin(); + while(peer != _peers.end()) { + if((*peer)->type() != AbstractPeer::IODevicePeer) { + IODevicePeer *ioPeer = static_cast(*peer); + if(ioPeer->isOpen()) { + qWarning() << "SignalProxy: Cannot change proxy mode while connected"; + return; + } + } + if((*peer)->type() != AbstractPeer::SignalProxyPeer) { + qWarning() << "SignalProxy: Cannot change proxy mode while connected to another internal SignalProxy"; return; } + peer++; } + _proxyMode = mode; if(mode == Server) initServer(); @@ -271,9 +284,10 @@ bool SignalProxy::addPeer(QIODevice* iodev) { connect(sock, SIGNAL(disconnected()), this, SLOT(removePeerBySender())); } - _peers[iodev] = peerInfo(); - if(iodev->property("UseCompression").toBool()) - _peers[iodev].usesCompression = true; + // we take ownership of that device + iodev->setParent(this); + + _peers[iodev] = new IODevicePeer(iodev, iodev->property("UseCompression").toBool()); if(_peers.count() == 1) emit connected(); @@ -281,30 +295,39 @@ bool SignalProxy::addPeer(QIODevice* iodev) { return true; } +void SignalProxy::removeAllPeers() { + Q_ASSERT(proxyMode() == Server || _peers.count() <= 1); + // wee need to copy that list since we modify it in the loop + QList peers = _peers.keys(); + foreach(QObject *peer, peers) { + switch(_peers[peer]->type()) { + case AbstractPeer::IODevicePeer: + removePeer(static_cast(peer)); + break; + case AbstractPeer::SignalProxyPeer: + removePeer(static_cast(peer)); + break; + default: + Q_ASSERT(false); // there shouldn't be any peers with wrong / unknown type + } + } +} + void SignalProxy::removePeer(QIODevice* iodev) { if(_peers.isEmpty()) { qWarning() << "SignalProxy::removePeer(): No peers in use!"; return; } - if(proxyMode() == Server && !iodev) { - // disconnect all - QList peers = _peers.keys(); - foreach(QIODevice *peer, peers) - removePeer(peer); - } - - if(proxyMode() != Server && !iodev) - iodev = _peers.keys().first(); - Q_ASSERT(iodev); - if(!_peers.contains(iodev)) { qWarning() << "SignalProxy: unknown QIODevice" << iodev; return; } - _peers.remove(iodev); + AbstractPeer *peer = _peers[iodev]; + _peers.remove(iodev); + delete peer; disconnect(iodev, 0, this, 0); emit peerRemoved(iodev); @@ -313,6 +336,19 @@ void SignalProxy::removePeer(QIODevice* iodev) { emit disconnected(); } +void SignalProxy::removePeer(SignalProxy *proxy) { + if(!_peers.contains(proxy)) { + qWarning() << "SignalProxy: unknown QIODevice" << proxy; + return; + } + + _peers.remove(proxy); + + if(_peers.isEmpty()) + emit disconnected(); +} + + void SignalProxy::removePeerBySender() { // OK we're brutal here... but since it's a private slot we know what we've got connected to it... // this Slot is not triggered by destroyed, so the object is still alive and can be used! @@ -669,41 +705,47 @@ void SignalProxy::stopSync(SyncableObject* obj) { } } -void SignalProxy::dispatchSignal(QIODevice *receiver, const RequestType &requestType, const QVariantList ¶ms) { - Q_ASSERT(_peers.contains(receiver)); - QVariantList packedFunc; - packedFunc << (qint16)requestType; - packedFunc << params; - writeDataToDevice(receiver, QVariant(packedFunc), _peers[receiver].usesCompression); -} - void SignalProxy::dispatchSignal(const RequestType &requestType, const QVariantList ¶ms) { - // yes I know we have a little code duplication here... it's for the sake of performance - QVariantList packedFunc; - packedFunc << (qint16)requestType; - packedFunc << params; - foreach(QIODevice* dev, _peers.keys()) { - Q_ASSERT(_peers.contains(dev)); - writeDataToDevice(dev, QVariant(packedFunc), _peers[dev].usesCompression); + QVariant packedFunc(QVariantList() << (qint16)requestType << params); + PeerHash::iterator peer = _peers.begin(); + while(peer != _peers.end()) { + switch((*peer)->type()) { + case AbstractPeer::IODevicePeer: + { + IODevicePeer *ioPeer = static_cast(*peer); + ioPeer->dispatchPackedFunc(packedFunc); + } + break; + case AbstractPeer::SignalProxyPeer: + (*peer)->dispatchSignal(requestType, params); + break; + default: + Q_ASSERT(false); // there shouldn't be any peers with wrong / unknown type + } + peer++; } } -void SignalProxy::receivePeerSignal(QIODevice *sender, const QVariant &packedFunc) { +void SignalProxy::receivePackedFunc(AbstractPeer *sender, const QVariant &packedFunc) { QVariantList params(packedFunc.toList()); if(params.isEmpty()) { qWarning() << "SignalProxy::receivePeerSignal(): received incompatible Data:" << packedFunc; return; } - - int callType = params.takeFirst().value(); - switch(callType) { + RequestType requestType = (RequestType)params.takeFirst().value(); + receivePeerSignal(sender, requestType, params); +} + +void SignalProxy::receivePeerSignal(AbstractPeer *sender, const RequestType &requestType, const QVariantList ¶ms) { + switch(requestType) { case RpcCall: if(params.empty()) qWarning() << "SignalProxy::receivePeerSignal(): received empty RPC-Call"; else - handleSignal(params.takeFirst().toByteArray(), params); + handleSignal(params); + //handleSignal(params.takeFirst().toByteArray(), params); break; case Sync: @@ -727,11 +769,11 @@ void SignalProxy::receivePeerSignal(QIODevice *sender, const QVariant &packedFun break; default: - qWarning() << "SignalProxy::receivePeerSignal(): received undefined CallType" << callType << params; + qWarning() << "SignalProxy::receivePeerSignal(): received undefined CallType" << requestType << params; } } -void SignalProxy::handleSync(QIODevice *sender, QVariantList params) { +void SignalProxy::handleSync(AbstractPeer *sender, QVariantList params) { if(params.count() < 3) { qWarning() << "received invalid Sync call" << params; return; @@ -771,14 +813,14 @@ void SignalProxy::handleSync(QIODevice *sender, QVariantList params) { if(argTypes(receiver, receiverId).count() > 1) returnParams << params; returnParams << returnValue; - dispatchSignal(sender, Sync, returnParams); + sender->dispatchSignal(Sync, returnParams); } // send emit update signal invokeSlot(receiver, updatedRemotelyId(receiver)); } -void SignalProxy::handleInitRequest(QIODevice *sender, const QVariantList ¶ms) { +void SignalProxy::handleInitRequest(AbstractPeer *sender, const QVariantList ¶ms) { if(params.count() != 2) { qWarning() << "SignalProxy::handleInitRequest() received initRequest with invalid param Count:" << params; @@ -807,10 +849,10 @@ void SignalProxy::handleInitRequest(QIODevice *sender, const QVariantList ¶m << objectName << initData(obj); - dispatchSignal(sender, InitData, params_); + sender->dispatchSignal(InitData, params_); } -void SignalProxy::handleInitData(QIODevice *sender, const QVariantList ¶ms) { +void SignalProxy::handleInitData(AbstractPeer *sender, const QVariantList ¶ms) { Q_UNUSED(sender) if(params.count() != 3) { qWarning() << "SignalProxy::handleInitData() received initData with invalid param Count:" @@ -838,7 +880,11 @@ void SignalProxy::handleInitData(QIODevice *sender, const QVariantList ¶ms) setInitData(obj, propertyMap); } -void SignalProxy::handleSignal(const QByteArray &funcName, const QVariantList ¶ms) { +//void SignalProxy::handleSignal(const QByteArray &funcName, const QVariantList ¶ms) { +void SignalProxy::handleSignal(const QVariantList &data) { + QVariantList params = data; + QByteArray funcName = params.takeFirst().toByteArray(); + QObject* receiver; int methodId; SlotHash::const_iterator slot = _attachedSlots.constFind(funcName); @@ -905,10 +951,11 @@ bool SignalProxy::invokeSlot(QObject *receiver, int methodId, const QVariantList void SignalProxy::dataAvailable() { // yet again. it's a private slot. no need for checks. QIODevice* ioDev = qobject_cast(sender()); - Q_ASSERT(_peers.contains(ioDev)); + Q_ASSERT(_peers.contains(ioDev) && _peers[ioDev]->type() == AbstractPeer::IODevicePeer); + IODevicePeer *peer = static_cast(_peers[ioDev]); QVariant var; - while(readDataFromDevice(ioDev, _peers[ioDev].byteCount, var, _peers[ioDev].usesCompression)) - receivePeerSignal(ioDev, var); + while(peer->readData(var)) + receivePackedFunc(peer, var); } void SignalProxy::writeDataToDevice(QIODevice *dev, const QVariant &item, bool compressed) { @@ -1046,52 +1093,52 @@ void SignalProxy::setInitData(SyncableObject *obj, const QVariantMap &properties void SignalProxy::sendHeartBeat() { dispatchSignal(SignalProxy::HeartBeat, QVariantList() << QTime::currentTime()); - QHash::iterator peerIter = _peers.begin(); - QHash::iterator peerIterEnd = _peers.end(); - while(peerIter != peerIterEnd) { - if(peerIter->sentHeartBeats > 0) { - updateLag(peerIter.key(), _heartBeatTimer.interval()); - } - if(peerIter->sentHeartBeats > 1) { - QAbstractSocket *socket = qobject_cast(peerIter.key()); - qWarning() << "SignalProxy: Disconnecting peer:" - << (socket ? qPrintable(socket->peerAddress().toString()) : "local client") - << "(didn't receive a heartbeat for over" << peerIter->sentHeartBeats * _heartBeatTimer.interval() / 1000 << "seconds)"; - peerIter.key()->close(); - } else { - peerIter->sentHeartBeats++; + PeerHash::iterator peer = _peers.begin(); + while(peer != _peers.end()) { + if((*peer)->type() == AbstractPeer::IODevicePeer) { + IODevicePeer *ioPeer = static_cast(*peer); + if(ioPeer->sentHeartBeats > 0) { + updateLag(ioPeer, ioPeer->sentHeartBeats * _heartBeatTimer.interval()); + } + if(ioPeer->sentHeartBeats > 1) { + //FIXME: proper disconnect. +// QAbstractSocket *socket = qobject_cast(peerIter.key()); +// qWarning() << "SignalProxy: Disconnecting peer:" +// << (socket ? qPrintable(socket->peerAddress().toString()) : "local client") +// << "(didn't receive a heartbeat for over" << peerIter->sentHeartBeats * _heartBeatTimer.interval() / 1000 << "seconds)"; +// peerIter.key()->close(); + } else { + ioPeer->sentHeartBeats++; + } } - peerIter++; + peer++; } } -void SignalProxy::receiveHeartBeat(QIODevice *dev, const QVariantList ¶ms) { - if(!_peers.contains(dev)) { - qWarning() << "SignalProxy: received heart beat from unknown Device:" << dev; - } - dispatchSignal(dev, SignalProxy::HeartBeatReply, params); +void SignalProxy::receiveHeartBeat(AbstractPeer *peer, const QVariantList ¶ms) { + peer->dispatchSignal(SignalProxy::HeartBeatReply, params); } -void SignalProxy::receiveHeartBeatReply(QIODevice *dev, const QVariantList ¶ms) { - if(!_peers.contains(dev)) { - qWarning() << "SignalProxy: received heart beat reply from unknown Device:" << dev; +void SignalProxy::receiveHeartBeatReply(AbstractPeer *peer, const QVariantList ¶ms) { + if(peer->type() != AbstractPeer::IODevicePeer) { + qWarning() << "SignalProxy::receiveHeartBeatReply: received heart beat from a non IODevicePeer!"; return; } - _peers[dev].sentHeartBeats = 0; + IODevicePeer *ioPeer = static_cast(peer); + ioPeer->sentHeartBeats = 0; if(params.isEmpty()) { - qWarning() << "SignalProxy: received heart beat reply with less params then sent from:" << dev; + qWarning() << "SignalProxy: received heart beat reply with less params then sent from:" << ioPeer->device(); return; } QTime sendTime = params[0].value(); - updateLag(dev, sendTime.msecsTo(QTime::currentTime()) / 2); + updateLag(ioPeer, sendTime.msecsTo(QTime::currentTime()) / 2); } -void SignalProxy::updateLag(QIODevice *dev, int lag) { - Q_ASSERT(_peers.contains(dev)); - _peers[dev].lag = lag; +void SignalProxy::updateLag(IODevicePeer *peer, int lag) { + peer->lag = lag; if(proxyMode() == Client) { emit lagUpdated(lag); } diff --git a/src/common/signalproxy.h b/src/common/signalproxy.h index 18038093..0dc33325 100644 --- a/src/common/signalproxy.h +++ b/src/common/signalproxy.h @@ -18,8 +18,8 @@ * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * ***************************************************************************/ -#ifndef _SIGNALPROXY_H_ -#define _SIGNALPROXY_H_ +#ifndef SIGNALPROXY_H +#define SIGNALPROXY_H #include #include @@ -37,6 +37,9 @@ struct QMetaObject; class SignalProxy : public QObject { Q_OBJECT + class AbstractPeer; + class IODevicePeer; + public: enum ProxyMode { Server, @@ -61,8 +64,11 @@ public: inline ProxyMode proxyMode() const { return _proxyMode; } bool addPeer(QIODevice *iodev); - void removePeer(QIODevice *iodev = 0); - + void removePeer(QIODevice *iodev); + bool addPeer(SignalProxy *proxy); + void removePeer(SignalProxy *proxy); + void removeAllPeers(); + bool attachSignal(QObject *sender, const char *signal, const QByteArray& sigName = QByteArray()); bool attachSlot(const QByteArray& sigName, QObject *recv, const char *slot); @@ -121,8 +127,8 @@ private slots: void objectRenamed(const QString &newname, const QString &oldname); void objectRenamed(const QByteArray &classname, const QString &newname, const QString &oldname); void sendHeartBeat(); - void receiveHeartBeat(QIODevice *dev, const QVariantList ¶ms); - void receiveHeartBeatReply(QIODevice *dev, const QVariantList ¶ms); + void receiveHeartBeat(AbstractPeer *peer, const QVariantList ¶ms); + void receiveHeartBeatReply(AbstractPeer *peer, const QVariantList ¶ms); signals: void peerRemoved(QIODevice *dev); @@ -150,12 +156,13 @@ private: void dispatchSignal(QIODevice *receiver, const RequestType &requestType, const QVariantList ¶ms); void dispatchSignal(const RequestType &requestType, const QVariantList ¶ms); - - void receivePeerSignal(QIODevice *sender, const QVariant &packedFunc); - void handleSync(QIODevice *sender, QVariantList params); - void handleInitRequest(QIODevice *sender, const QVariantList ¶ms); - void handleInitData(QIODevice *sender, const QVariantList ¶ms); - void handleSignal(const QByteArray &funcName, const QVariantList ¶ms); + + void receivePackedFunc(AbstractPeer *sender, const QVariant &packedFunc); + void receivePeerSignal(AbstractPeer *sender, const RequestType &requestType, const QVariantList ¶ms); + void handleSync(AbstractPeer *sender, QVariantList params); + void handleInitRequest(AbstractPeer *sender, const QVariantList ¶ms); + void handleInitData(AbstractPeer *sender, const QVariantList ¶ms); + void handleSignal(const QVariantList &data); bool invokeSlot(QObject *receiver, int methodId, const QVariantList ¶ms, QVariant &returnValue); bool invokeSlot(QObject *receiver, int methodId, const QVariantList ¶ms = QVariantList()); @@ -163,23 +170,68 @@ private: QVariantMap initData(SyncableObject *obj) const; void setInitData(SyncableObject *obj, const QVariantMap &properties); - void updateLag(QIODevice *dev, int lag); + void updateLag(IODevicePeer *peer, int lag); public: void dumpSyncMap(SyncableObject *object); inline int peerCount() const { return _peers.size(); } private: - // Hash of used QIODevices - struct peerInfo { + class AbstractPeer { + public: + enum PeerType { + NotAPeer = 0, + IODevicePeer = 1, + SignalProxyPeer = 2 + }; + AbstractPeer() : _type(NotAPeer) {} + AbstractPeer(PeerType type) : _type(type) {} + virtual ~AbstractPeer() {} + inline PeerType type() const { return _type; } + virtual void dispatchSignal(const RequestType &requestType, const QVariantList ¶ms) = 0; + private: + PeerType _type; + }; + + class IODevicePeer : public AbstractPeer { + public: + IODevicePeer(QIODevice *device, bool compress) : AbstractPeer(AbstractPeer::IODevicePeer), _device(device), byteCount(0), usesCompression(compress), sentHeartBeats(0), lag(0) {} + virtual void dispatchSignal(const RequestType &requestType, const QVariantList ¶ms); + inline void dispatchPackedFunc(const QVariant &packedFunc) { SignalProxy::writeDataToDevice(_device, packedFunc, usesCompression); } + inline QIODevice *device() const { return _device; } + inline bool isOpen() const { return _device->isOpen(); } + inline bool readData(QVariant &item) { return SignalProxy::readDataFromDevice(_device, byteCount, item, usesCompression); } + private: + QIODevice *_device; quint32 byteCount; bool usesCompression; + public: int sentHeartBeats; int lag; - peerInfo() : byteCount(0), usesCompression(false), sentHeartBeats(0) {} }; - //QHash _peerByteCount; - QHash _peers; + + class SignalProxyPeer : public AbstractPeer { + public: + SignalProxyPeer(SignalProxy *proxy) : AbstractPeer(AbstractPeer::SignalProxyPeer), proxy(proxy) {} + virtual void dispatchSignal(const RequestType &requestType, const QVariantList ¶ms); + private: + SignalProxy *proxy; + }; + + // a Hash of the actual used communication object to it's corresponding peer + // currently a communication object can either be an arbitrary QIODevice or another SignalProxy + typedef QHash PeerHash; + PeerHash _peers; + +// // Hash of used QIODevices +// struct peerInfo { +// quint32 byteCount; +// bool usesCompression; +// int sentHeartBeats; +// int lag; +// peerInfo() : byteCount(0), usesCompression(false), sentHeartBeats(0) {} +// }; +// QHash _peers; // containg a list of argtypes for fast access QHash _classInfo;