From efe20e20080a0c22e1a7b8b84ef622130dbf116e Mon Sep 17 00:00:00 2001 From: Marcus Eggenberger Date: Mon, 20 Oct 2008 17:24:26 +0200 Subject: [PATCH] making signalproxy direct connection thread safe --- src/common/signalproxy.cpp | 75 ++++++++++++++++++++++++++++++++++++++ src/common/signalproxy.h | 9 ++++- 2 files changed, 82 insertions(+), 2 deletions(-) diff --git a/src/common/signalproxy.cpp b/src/common/signalproxy.cpp index 85f053b2..b3ea4ffb 100644 --- a/src/common/signalproxy.cpp +++ b/src/common/signalproxy.cpp @@ -34,10 +34,22 @@ #include #include #include +#include +#include #include "syncableobject.h" #include "util.h" +// ================================================== +// PeerSignalEvent +// ================================================== +class PeerSignalEvent : public QEvent { +public: + PeerSignalEvent(SignalProxy *sender, SignalProxy::RequestType requestType, const QVariantList ¶ms) : QEvent(QEvent::User), sender(sender), requestType(requestType), params(params) {} + SignalProxy *sender; + SignalProxy::RequestType requestType; + QVariantList params; +}; // ================================================== // SIGNALRELAY @@ -194,6 +206,18 @@ void SignalProxy::IODevicePeer::dispatchSignal(const RequestType &requestType, c dispatchPackedFunc(QVariant(packedFunc)); } +void SignalProxy::SignalProxyPeer::dispatchSignal(const RequestType &requestType, const QVariantList ¶ms) { + Qt::ConnectionType type = QThread::currentThread() == receiver->thread() + ? Qt::DirectConnection + : Qt::QueuedConnection; + + if(type == Qt::DirectConnection) { + receiver->receivePeerSignal(sender, requestType, params); + } else { + QCoreApplication::postEvent(receiver, new PeerSignalEvent(sender, requestType, params)); + } +} + // ================================================== // SignalProxy // ================================================== @@ -293,6 +317,34 @@ bool SignalProxy::addPeer(QIODevice* iodev) { return true; } +bool SignalProxy::addPeer(SignalProxy* proxy) { + if(!proxy) + return false; + + if(proxyMode() == proxy->proxyMode()) { + qWarning() << "SignalProxy::addPeer(): adding a SignalProxy as peer requires one proxy to be server and one client!"; + return false; + } + + if(_peers.contains(proxy)) { + return true; + } + + if(proxyMode() == Client && !_peers.isEmpty()) { + qWarning("SignalProxy: only one peer allowed in client mode!"); + return false; + } + + _peers[proxy] = new SignalProxyPeer(this, proxy); + + proxy->addPeer(this); + + if(_peers.count() == 1) + emit connected(); + + return true; +} + void SignalProxy::removeAllPeers() { Q_ASSERT(proxyMode() == Server || _peers.count() <= 1); // wee need to copy that list since we modify it in the loop @@ -748,6 +800,15 @@ void SignalProxy::receivePeerSignal(AbstractPeer *sender, const RequestType &req } } +void SignalProxy::receivePeerSignal(SignalProxy *sender, const RequestType &requestType, const QVariantList ¶ms) { + if(!_peers.contains(sender)) { + // we output only the pointer value. otherwise Qt would try to pretty print. As the object might already been destroyed, this is not a good idea. + qWarning() << "SignalProxy::receivePeerSignal(): received Signal from unknown Proxy" << reinterpret_cast(sender); + return; + } + receivePeerSignal(_peers[sender], requestType, params); +} + void SignalProxy::handleSync(AbstractPeer *sender, QVariantList params) { if(params.count() < 3) { qWarning() << "received invalid Sync call" << params; @@ -1112,6 +1173,20 @@ void SignalProxy::receiveHeartBeatReply(AbstractPeer *peer, const QVariantList & updateLag(ioPeer, sendTime.msecsTo(QTime::currentTime()) / 2); } +void SignalProxy::customEvent(QEvent *event) { + switch(event->type()) { + case QEvent::User: + { + PeerSignalEvent *sig = static_cast(event); + receivePeerSignal(sig->sender, sig->requestType, sig->params); + } + event->accept(); + break; + default: + return; + } +} + void SignalProxy::updateLag(IODevicePeer *peer, int lag) { peer->lag = lag; if(proxyMode() == Client) { diff --git a/src/common/signalproxy.h b/src/common/signalproxy.h index f2490ba5..a98e62dd 100644 --- a/src/common/signalproxy.h +++ b/src/common/signalproxy.h @@ -119,6 +119,9 @@ public: void dumpProxyStats(); +protected: + void customEvent(QEvent *event); + private slots: void dataAvailable(); void detachSender(); @@ -158,6 +161,7 @@ private: void receivePackedFunc(AbstractPeer *sender, const QVariant &packedFunc); void receivePeerSignal(AbstractPeer *sender, const RequestType &requestType, const QVariantList ¶ms); + void receivePeerSignal(SignalProxy *sender, const RequestType &requestType, const QVariantList ¶ms); void handleSync(AbstractPeer *sender, QVariantList params); void handleInitRequest(AbstractPeer *sender, const QVariantList ¶ms); void handleInitData(AbstractPeer *sender, const QVariantList ¶ms); @@ -211,10 +215,11 @@ private: class SignalProxyPeer : public AbstractPeer { public: - SignalProxyPeer(SignalProxy *proxy) : AbstractPeer(AbstractPeer::SignalProxyPeer), proxy(proxy) {} + SignalProxyPeer(SignalProxy *sender, SignalProxy *receiver) : AbstractPeer(AbstractPeer::SignalProxyPeer), sender(sender), receiver(receiver) {} virtual void dispatchSignal(const RequestType &requestType, const QVariantList ¶ms); private: - SignalProxy *proxy; + SignalProxy *sender; + SignalProxy *receiver; }; // a Hash of the actual used communication object to it's corresponding peer -- 2.20.1