making signalproxy direct connection thread safe
authorMarcus Eggenberger <egs@quassel-irc.org>
Mon, 20 Oct 2008 15:24:26 +0000 (17:24 +0200)
committerMarcus Eggenberger <egs@quassel-irc.org>
Tue, 21 Oct 2008 16:52:52 +0000 (18:52 +0200)
src/common/signalproxy.cpp
src/common/signalproxy.h

index 85f053b..b3ea4ff 100644 (file)
 #include <QRegExp>
 #include <QThread>
 #include <QTime>
 #include <QRegExp>
 #include <QThread>
 #include <QTime>
+#include <QEvent>
+#include <QCoreApplication>
 
 #include "syncableobject.h"
 #include "util.h"
 
 
 #include "syncableobject.h"
 #include "util.h"
 
+// ==================================================
+//  PeerSignalEvent
+// ==================================================
+class PeerSignalEvent : public QEvent {
+public:
+  PeerSignalEvent(SignalProxy *sender, SignalProxy::RequestType requestType, const QVariantList &params) : QEvent(QEvent::User), sender(sender), requestType(requestType), params(params) {}
+  SignalProxy *sender;
+  SignalProxy::RequestType requestType;
+  QVariantList params;
+};
 
 // ==================================================
 //  SIGNALRELAY
 
 // ==================================================
 //  SIGNALRELAY
@@ -194,6 +206,18 @@ void SignalProxy::IODevicePeer::dispatchSignal(const RequestType &requestType, c
   dispatchPackedFunc(QVariant(packedFunc));
 }
 
   dispatchPackedFunc(QVariant(packedFunc));
 }
 
+void SignalProxy::SignalProxyPeer::dispatchSignal(const RequestType &requestType, const QVariantList &params) {
+  Qt::ConnectionType type = QThread::currentThread() == receiver->thread()
+    ? Qt::DirectConnection
+    : Qt::QueuedConnection;
+
+  if(type == Qt::DirectConnection) {
+    receiver->receivePeerSignal(sender, requestType, params);
+  } else {
+    QCoreApplication::postEvent(receiver, new PeerSignalEvent(sender, requestType, params));
+  }
+}
+
 // ==================================================
 //  SignalProxy
 // ==================================================
 // ==================================================
 //  SignalProxy
 // ==================================================
@@ -293,6 +317,34 @@ bool SignalProxy::addPeer(QIODevice* iodev) {
   return true;
 }
 
   return true;
 }
 
+bool SignalProxy::addPeer(SignalProxy* proxy) {
+  if(!proxy)
+    return false;
+
+  if(proxyMode() == proxy->proxyMode()) {
+    qWarning() << "SignalProxy::addPeer(): adding a SignalProxy as peer requires one proxy to be server and one client!";
+    return false;
+  }
+
+  if(_peers.contains(proxy)) {
+    return true;
+  }
+
+  if(proxyMode() == Client && !_peers.isEmpty()) {
+    qWarning("SignalProxy: only one peer allowed in client mode!");
+    return false;
+  }
+
+  _peers[proxy] = new SignalProxyPeer(this, proxy);
+
+  proxy->addPeer(this);
+
+  if(_peers.count() == 1)
+    emit connected();
+
+  return true;
+}
+
 void SignalProxy::removeAllPeers() {
   Q_ASSERT(proxyMode() == Server || _peers.count() <= 1);
   // wee need to copy that list since we modify it in the loop
 void SignalProxy::removeAllPeers() {
   Q_ASSERT(proxyMode() == Server || _peers.count() <= 1);
   // wee need to copy that list since we modify it in the loop
@@ -748,6 +800,15 @@ void SignalProxy::receivePeerSignal(AbstractPeer *sender, const RequestType &req
   }
 }
 
   }
 }
 
+void SignalProxy::receivePeerSignal(SignalProxy *sender, const RequestType &requestType, const QVariantList &params) {
+  if(!_peers.contains(sender)) {
+    // we output only the pointer value. otherwise Qt would try to pretty print. As the object might already been destroyed, this is not a good idea.
+    qWarning() << "SignalProxy::receivePeerSignal(): received Signal from unknown Proxy" << reinterpret_cast<void *>(sender);
+    return;
+  }
+  receivePeerSignal(_peers[sender], requestType, params);
+}
+
 void SignalProxy::handleSync(AbstractPeer *sender, QVariantList params) {
   if(params.count() < 3) {
     qWarning() << "received invalid Sync call" << params;
 void SignalProxy::handleSync(AbstractPeer *sender, QVariantList params) {
   if(params.count() < 3) {
     qWarning() << "received invalid Sync call" << params;
@@ -1112,6 +1173,20 @@ void SignalProxy::receiveHeartBeatReply(AbstractPeer *peer, const QVariantList &
   updateLag(ioPeer, sendTime.msecsTo(QTime::currentTime()) / 2);
 }
 
   updateLag(ioPeer, sendTime.msecsTo(QTime::currentTime()) / 2);
 }
 
+void SignalProxy::customEvent(QEvent *event) {
+  switch(event->type()) {
+  case QEvent::User:
+    {
+      PeerSignalEvent *sig = static_cast<PeerSignalEvent *>(event);
+      receivePeerSignal(sig->sender, sig->requestType, sig->params);
+    }
+    event->accept();
+    break;
+  default:
+    return;
+  }
+}
+
 void SignalProxy::updateLag(IODevicePeer *peer, int lag) {
   peer->lag = lag;
   if(proxyMode() == Client) {
 void SignalProxy::updateLag(IODevicePeer *peer, int lag) {
   peer->lag = lag;
   if(proxyMode() == Client) {
index f2490ba..a98e62d 100644 (file)
@@ -119,6 +119,9 @@ public:
 
   void dumpProxyStats();
   
 
   void dumpProxyStats();
   
+protected:
+  void customEvent(QEvent *event);
+
 private slots:
   void dataAvailable();
   void detachSender();
 private slots:
   void dataAvailable();
   void detachSender();
@@ -158,6 +161,7 @@ private:
 
   void receivePackedFunc(AbstractPeer *sender, const QVariant &packedFunc);
   void receivePeerSignal(AbstractPeer *sender, const RequestType &requestType, const QVariantList &params);
 
   void receivePackedFunc(AbstractPeer *sender, const QVariant &packedFunc);
   void receivePeerSignal(AbstractPeer *sender, const RequestType &requestType, const QVariantList &params);
+  void receivePeerSignal(SignalProxy *sender, const RequestType &requestType, const QVariantList &params);
   void handleSync(AbstractPeer *sender, QVariantList params);
   void handleInitRequest(AbstractPeer *sender, const QVariantList &params);
   void handleInitData(AbstractPeer *sender, const QVariantList &params);
   void handleSync(AbstractPeer *sender, QVariantList params);
   void handleInitRequest(AbstractPeer *sender, const QVariantList &params);
   void handleInitData(AbstractPeer *sender, const QVariantList &params);
@@ -211,10 +215,11 @@ private:
 
   class SignalProxyPeer : public AbstractPeer {
   public:
 
   class SignalProxyPeer : public AbstractPeer {
   public:
-    SignalProxyPeer(SignalProxy *proxy) : AbstractPeer(AbstractPeer::SignalProxyPeer), proxy(proxy) {}
+    SignalProxyPeer(SignalProxy *sender, SignalProxy *receiver) : AbstractPeer(AbstractPeer::SignalProxyPeer), sender(sender), receiver(receiver) {}
     virtual void dispatchSignal(const RequestType &requestType, const QVariantList &params);
   private:
     virtual void dispatchSignal(const RequestType &requestType, const QVariantList &params);
   private:
-    SignalProxy *proxy;
+    SignalProxy *sender;
+    SignalProxy *receiver;
   };
 
   // a Hash of the actual used communication object to it's corresponding peer
   };
 
   // a Hash of the actual used communication object to it's corresponding peer