making signalproxy direct connection thread safe
[quassel.git] / src / common / signalproxy.cpp
index 1b05ab4..b3ea4ff 100644 (file)
 #include <QRegExp>
 #include <QThread>
 #include <QTime>
+#include <QEvent>
+#include <QCoreApplication>
 
 #include "syncableobject.h"
 #include "util.h"
 
+// ==================================================
+//  PeerSignalEvent
+// ==================================================
+class PeerSignalEvent : public QEvent {
+public:
+  PeerSignalEvent(SignalProxy *sender, SignalProxy::RequestType requestType, const QVariantList &params) : QEvent(QEvent::User), sender(sender), requestType(requestType), params(params) {}
+  SignalProxy *sender;
+  SignalProxy::RequestType requestType;
+  QVariantList params;
+};
 
 // ==================================================
 //  SIGNALRELAY
@@ -194,6 +206,18 @@ void SignalProxy::IODevicePeer::dispatchSignal(const RequestType &requestType, c
   dispatchPackedFunc(QVariant(packedFunc));
 }
 
+void SignalProxy::SignalProxyPeer::dispatchSignal(const RequestType &requestType, const QVariantList &params) {
+  Qt::ConnectionType type = QThread::currentThread() == receiver->thread()
+    ? Qt::DirectConnection
+    : Qt::QueuedConnection;
+
+  if(type == Qt::DirectConnection) {
+    receiver->receivePeerSignal(sender, requestType, params);
+  } else {
+    QCoreApplication::postEvent(receiver, new PeerSignalEvent(sender, requestType, params));
+  }
+}
+
 // ==================================================
 //  SignalProxy
 // ==================================================
@@ -223,6 +247,7 @@ SignalProxy::~SignalProxy() {
   QList<QObject*> senders = _relayHash.keys();
   foreach(QObject* sender, senders)
     detachObject(sender);
+  removeAllPeers();
 }
 
 void SignalProxy::setProxyMode(ProxyMode mode) {
@@ -284,9 +309,6 @@ bool SignalProxy::addPeer(QIODevice* iodev) {
     connect(sock, SIGNAL(disconnected()), this, SLOT(removePeerBySender()));
   }
 
-  // we take ownership of that device
-  iodev->setParent(this);
-
   _peers[iodev] = new IODevicePeer(iodev, iodev->property("UseCompression").toBool());
 
   if(_peers.count() == 1)
@@ -295,65 +317,70 @@ bool SignalProxy::addPeer(QIODevice* iodev) {
   return true;
 }
 
+bool SignalProxy::addPeer(SignalProxy* proxy) {
+  if(!proxy)
+    return false;
+
+  if(proxyMode() == proxy->proxyMode()) {
+    qWarning() << "SignalProxy::addPeer(): adding a SignalProxy as peer requires one proxy to be server and one client!";
+    return false;
+  }
+
+  if(_peers.contains(proxy)) {
+    return true;
+  }
+
+  if(proxyMode() == Client && !_peers.isEmpty()) {
+    qWarning("SignalProxy: only one peer allowed in client mode!");
+    return false;
+  }
+
+  _peers[proxy] = new SignalProxyPeer(this, proxy);
+
+  proxy->addPeer(this);
+
+  if(_peers.count() == 1)
+    emit connected();
+
+  return true;
+}
+
 void SignalProxy::removeAllPeers() {
   Q_ASSERT(proxyMode() == Server || _peers.count() <= 1);
   // wee need to copy that list since we modify it in the loop
   QList<QObject *> peers = _peers.keys();
   foreach(QObject *peer, peers) {
-    switch(_peers[peer]->type()) {
-    case AbstractPeer::IODevicePeer:
-      removePeer(static_cast<QIODevice *>(peer));
-      break;
-    case AbstractPeer::SignalProxyPeer:
-      removePeer(static_cast<SignalProxy *>(peer));
-      break;
-    default:
-      Q_ASSERT(false); // there shouldn't be any peers with wrong / unknown type
-    }
+    removePeer(peer);
   }
 }
 
-void SignalProxy::removePeer(QIODevice* iodev) {
+void SignalProxy::removePeer(QObject* dev) {
   if(_peers.isEmpty()) {
     qWarning() << "SignalProxy::removePeer(): No peers in use!";
     return;
   }
 
-  Q_ASSERT(iodev);
-  if(!_peers.contains(iodev)) {
-    qWarning() << "SignalProxy: unknown QIODevice" << iodev;
+  Q_ASSERT(dev);
+  if(!_peers.contains(dev)) {
+    qWarning() << "SignalProxy: unknown Peer" << dev;
     return;
   }
 
-  AbstractPeer *peer = _peers[iodev];
-  _peers.remove(iodev);  
-  delete peer;
-
-  disconnect(iodev, 0, this, 0);
-  emit peerRemoved(iodev);
+  AbstractPeer *peer = _peers[dev];
+  _peers.remove(dev);
 
-  if(_peers.isEmpty())
-    emit disconnected();
-}
+  disconnect(dev, 0, this, 0);
+  if(peer->type() == AbstractPeer::IODevicePeer)
+    emit peerRemoved(static_cast<QIODevice *>(dev));
 
-void SignalProxy::removePeer(SignalProxy *proxy) {
-  if(!_peers.contains(proxy)) {
-    qWarning() << "SignalProxy: unknown QIODevice" << proxy;
-    return;
-  }
-
-  _peers.remove(proxy);
+  delete peer;
 
   if(_peers.isEmpty())
     emit disconnected();
 }
 
-
 void SignalProxy::removePeerBySender() {
-  // OK we're brutal here... but since it's a private slot we know what we've got connected to it...
-  // this Slot is not triggered by destroyed, so the object is still alive and can be used!
-  QIODevice *ioDev = (QIODevice *)(sender());
-  removePeer(ioDev);
+  removePeer(sender());
 }
 
 void SignalProxy::objectRenamed(const QString &newname, const QString &oldname) {
@@ -773,6 +800,15 @@ void SignalProxy::receivePeerSignal(AbstractPeer *sender, const RequestType &req
   }
 }
 
+void SignalProxy::receivePeerSignal(SignalProxy *sender, const RequestType &requestType, const QVariantList &params) {
+  if(!_peers.contains(sender)) {
+    // we output only the pointer value. otherwise Qt would try to pretty print. As the object might already been destroyed, this is not a good idea.
+    qWarning() << "SignalProxy::receivePeerSignal(): received Signal from unknown Proxy" << reinterpret_cast<void *>(sender);
+    return;
+  }
+  receivePeerSignal(_peers[sender], requestType, params);
+}
+
 void SignalProxy::handleSync(AbstractPeer *sender, QVariantList params) {
   if(params.count() < 3) {
     qWarning() << "received invalid Sync call" << params;
@@ -1137,6 +1173,20 @@ void SignalProxy::receiveHeartBeatReply(AbstractPeer *peer, const QVariantList &
   updateLag(ioPeer, sendTime.msecsTo(QTime::currentTime()) / 2);
 }
 
+void SignalProxy::customEvent(QEvent *event) {
+  switch(event->type()) {
+  case QEvent::User:
+    {
+      PeerSignalEvent *sig = static_cast<PeerSignalEvent *>(event);
+      receivePeerSignal(sig->sender, sig->requestType, sig->params);
+    }
+    event->accept();
+    break;
+  default:
+    return;
+  }
+}
+
 void SignalProxy::updateLag(IODevicePeer *peer, int lag) {
   peer->lag = lag;
   if(proxyMode() == Client) {