Refactor SignalProxy, network and protocol code
authorManuel Nickschas <sputnick@quassel-irc.org>
Thu, 17 Jan 2013 22:31:34 +0000 (23:31 +0100)
committerManuel Nickschas <sputnick@quassel-irc.org>
Thu, 17 Jan 2013 23:00:50 +0000 (00:00 +0100)
Until now, SignalProxy also contained the protocol implementation and
handled the network data transfer. Besides being an architectural nightmare,
this also made it impossible to work on a new protocol.

As a nice sideeffect, the code size of SignalProxy shrunk by more than 30%.

This commit does the following:

* Provide a common abstraction for internal (mono) and remote connections,
  so we can remove the special casing for mono clients in the SignalProxy.
  This should also make it easier to bring back remote capabilities to the mono
  client in the future.

* Move the networking code into RemoteConnection, which also acts as an abstraction
  layer for the current and future core/client protocols.

* Move the protocol-specific code into LegacyConnection.

* Introduce explicit protocol message types (as opposed to QVariantMaps). This gives
  us type safety and allows for a cleaner and more efficient handling inside SignalProxy.
  Also, I finally got to use templates \o/

Note that the handshake/auth code still needs to be abstracted, so for now the
LegacyConnection is still used directly in (the awkwardly mis-named) CoreConnection
and Core for reading/writing their data during the init phase.

18 files changed:
src/client/coreconnection.cpp
src/client/coreconnection.h
src/common/CMakeLists.txt
src/common/internalconnection.cpp [new file with mode: 0644]
src/common/internalconnection.h [new file with mode: 0644]
src/common/protocols/legacy/legacyconnection.cpp [new file with mode: 0644]
src/common/protocols/legacy/legacyconnection.h [new file with mode: 0644]
src/common/remoteconnection.cpp [new file with mode: 0644]
src/common/remoteconnection.h [new file with mode: 0644]
src/common/signalproxy.cpp
src/common/signalproxy.h
src/core/core.cpp
src/core/core.h
src/core/coresession.cpp
src/core/coresession.h
src/core/sessionthread.cpp
src/core/sessionthread.h
src/qtui/monoapplication.cpp

index 6b49815..eeaa5e2 100644 (file)
 #include "clientsettings.h"
 #include "coreaccountmodel.h"
 #include "identity.h"
+#include "internalconnection.h"
 #include "network.h"
 #include "networkmodel.h"
 #include "quassel.h"
 #include "signalproxy.h"
 #include "util.h"
 
+#include "protocols/legacy/legacyconnection.h"
+
 CoreConnection::CoreConnection(CoreAccountModel *model, QObject *parent)
     : QObject(parent),
     _model(model),
-    _blockSize(0),
     _state(Disconnected),
     _wantReconnect(false),
     _progressMinimum(0),
     _progressMaximum(-1),
     _progressValue(-1),
     _wasReconnect(false),
-    _requestedDisconnect(false)
+    _requestedDisconnect(false),
+    _resetting(false)
 {
     qRegisterMetaType<ConnectionState>("CoreConnection::ConnectionState");
 }
@@ -120,7 +123,7 @@ void CoreConnection::updateProgress(int value, int max)
 
 void CoreConnection::reconnectTimeout()
 {
-    if (!_socket) {
+    if (!_connection) {
         CoreConnectionSettings s;
         if (_wantReconnect && s.autoReconnect()) {
 #ifdef HAVE_KDE
@@ -198,12 +201,7 @@ void CoreConnection::solidNetworkStatusChanged(Solid::Networking::Status status)
 
 bool CoreConnection::isEncrypted() const
 {
-#ifndef HAVE_SSL
-    return false;
-#else
-    QSslSocket *sock = qobject_cast<QSslSocket *>(_socket);
-    return isConnected() && sock && sock->isEncrypted();
-#endif
+    return _connection && _connection->isSecure();
 }
 
 
@@ -213,7 +211,7 @@ bool CoreConnection::isLocalConnection() const
         return false;
     if (currentAccount().isInternal())
         return true;
-    if (_socket->peerAddress().isInSubnet(QHostAddress::LocalHost, 0x00ffffff))
+    if (_connection->isLocal())
         return true;
 
     return false;
@@ -285,76 +283,74 @@ void CoreConnection::setState(ConnectionState state)
 
 void CoreConnection::coreSocketError(QAbstractSocket::SocketError)
 {
-    qDebug() << "coreSocketError" << _socket << _socket->errorString();
     disconnectFromCore(_socket->errorString(), true);
 }
 
 
 void CoreConnection::coreSocketDisconnected()
 {
-    // qDebug() << Q_FUNC_INFO;
     _wasReconnect = !_requestedDisconnect;
     resetConnection(true);
     // FIXME handle disconnects gracefully
 }
 
 
-void CoreConnection::coreHasData()
+// note: this still expects the legacy protocol
+// noteĀ²: after cleaning this up, we can probably get rid of _socket altogether
+void CoreConnection::coreHasData(const QVariant &item)
 {
-    QVariant item;
-    while (SignalProxy::readDataFromDevice(_socket, _blockSize, item)) {
-        QVariantMap msg = item.toMap();
-        if (!msg.contains("MsgType")) {
-            // This core is way too old and does not even speak our init protocol...
-            emit connectionErrorPopup(tr("The Quassel Core you try to connect to is too old! Please consider upgrading."));
-            disconnectFromCore(QString(), false);
-            return;
-        }
-        if (msg["MsgType"] == "ClientInitAck") {
-            clientInitAck(msg);
-        }
-        else if (msg["MsgType"] == "ClientInitReject") {
-            emit connectionErrorPopup(msg["Error"].toString());
-            disconnectFromCore(QString(), false);
-            return;
-        }
-        else if (msg["MsgType"] == "CoreSetupAck") {
-            emit coreSetupSuccess();
-        }
-        else if (msg["MsgType"] == "CoreSetupReject") {
-            emit coreSetupFailed(msg["Error"].toString());
-        }
-        else if (msg["MsgType"] == "ClientLoginReject") {
-            loginFailed(msg["Error"].toString());
-        }
-        else if (msg["MsgType"] == "ClientLoginAck") {
-            loginSuccess();
-        }
-        else if (msg["MsgType"] == "SessionInit") {
-            // that's it, let's hand over to the signal proxy
-            // if the socket is an orphan, the signalProxy adopts it.
-            // -> we don't need to care about it anymore
-            _socket->setParent(0);
-            Client::signalProxy()->addPeer(_socket);
-
-            sessionStateReceived(msg["SessionState"].toMap());
-            break; // this is definitively the last message we process here!
-        }
-        else {
-            disconnectFromCore(tr("Invalid data received from core"), false);
-            return;
-        }
+    QVariantMap msg = item.toMap();
+    if (!msg.contains("MsgType")) {
+        // This core is way too old and does not even speak our init protocol...
+        emit connectionErrorPopup(tr("The Quassel Core you try to connect to is too old! Please consider upgrading."));
+        disconnectFromCore(QString(), false);
+        return;
+    }
+    if (msg["MsgType"] == "ClientInitAck") {
+        clientInitAck(msg);
+    }
+    else if (msg["MsgType"] == "ClientInitReject") {
+        emit connectionErrorPopup(msg["Error"].toString());
+        disconnectFromCore(QString(), false);
+        return;
+    }
+    else if (msg["MsgType"] == "CoreSetupAck") {
+        emit coreSetupSuccess();
+    }
+    else if (msg["MsgType"] == "CoreSetupReject") {
+        emit coreSetupFailed(msg["Error"].toString());
+    }
+    else if (msg["MsgType"] == "ClientLoginReject") {
+        loginFailed(msg["Error"].toString());
     }
-    if (_blockSize > 0) {
-        updateProgress(_socket->bytesAvailable(), _blockSize);
+    else if (msg["MsgType"] == "ClientLoginAck") {
+        loginSuccess();
+    }
+    else if (msg["MsgType"] == "SessionInit") {
+        // that's it, let's hand over to the signal proxy
+        // if the connection is an orphan, the signalProxy adopts it.
+        // -> we don't need to care about it anymore
+
+        disconnect(_connection, 0, this, 0);
+
+        _connection->setParent(0);
+        Client::signalProxy()->addPeer(_connection);
+
+        sessionStateReceived(msg["SessionState"].toMap());
+    }
+    else {
+        disconnectFromCore(tr("Invalid data received from core"), false);
+        return;
     }
 }
 
 
 void CoreConnection::disconnectFromCore()
 {
-    _requestedDisconnect = true;
-    disconnectFromCore(QString(), false); // requested disconnect, so don't try to reconnect
+    if (_socket) {
+        _requestedDisconnect = true;
+        disconnectFromCore(QString(), false); // requested disconnect, so don't try to reconnect
+    }
 }
 
 
@@ -365,30 +361,42 @@ void CoreConnection::disconnectFromCore(const QString &errorString, bool wantRec
 
     _wasReconnect = wantReconnect; // store if disconnect was requested
 
+    resetConnection(wantReconnect);
+
     if (errorString.isEmpty())
         emit connectionError(tr("Disconnected"));
     else
         emit connectionError(errorString);
-
-    Client::signalProxy()->removeAllPeers();
-    resetConnection(wantReconnect);
 }
 
 
 void CoreConnection::resetConnection(bool wantReconnect)
 {
+    if (_resetting)
+        return;
+    _resetting = true;
+
     _wantReconnect = wantReconnect;
 
-    if (_socket) {
+    if (_connection) {
+        disconnect(_socket, 0, this, 0);
+        disconnect(_connection, 0, this, 0);
+        _connection->close();
+
+        if (_connection->parent() == this)
+            _connection->deleteLater(); // if it's not us, it belongs to the sigproxy which will delete it
+        _socket = 0;      // socket is owned and will be deleted by RemoteConnection
+        _connection = 0;
+    }
+    else if (_socket) {
         disconnect(_socket, 0, this, 0);
         _socket->deleteLater();
         _socket = 0;
     }
+
     _requestedDisconnect = false;
-    _blockSize = 0;
 
     _coreMsgBuffer.clear();
-
     _netsToSync.clear();
     _numNetsToSync = 0;
 
@@ -404,6 +412,8 @@ void CoreConnection::resetConnection(bool wantReconnect)
     if (wantReconnect && s.autoReconnect()) {
         _reconnectTimer.start();
     }
+
+    _resetting = false;
 }
 
 
@@ -464,7 +474,11 @@ void CoreConnection::connectToCurrentAccount()
             return;
         }
         emit startInternalCore();
-        emit connectToInternalCore(Client::instance()->signalProxy());
+
+        InternalConnection *conn = new InternalConnection();
+        Client::instance()->signalProxy()->addPeer(conn); // sigproxy will take ownership
+        emit connectToInternalCore(conn);
+
         return;
     }
 
@@ -472,7 +486,7 @@ void CoreConnection::connectToCurrentAccount()
 
     Q_ASSERT(!_socket);
 #ifdef HAVE_SSL
-    QSslSocket *sock = new QSslSocket(Client::instance());
+    QSslSocket *sock = new QSslSocket(this);
     // make sure the warning is shown if we happen to connect without SSL support later
     s.setAccountValue("ShowNoClientSslWarning", true);
 #else
@@ -487,7 +501,7 @@ void CoreConnection::connectToCurrentAccount()
             s.setAccountValue("ShowNoClientSslWarning", false);
         }
     }
-    QTcpSocket *sock = new QTcpSocket(Client::instance());
+    QTcpSocket *sock = new QTcpSocket(this);
 #endif
 
 #ifndef QT_NO_NETWORKPROXY
@@ -498,7 +512,6 @@ void CoreConnection::connectToCurrentAccount()
 #endif
 
     _socket = sock;
-    connect(sock, SIGNAL(readyRead()), SLOT(coreHasData()));
     connect(sock, SIGNAL(connected()), SLOT(coreSocketConnected()));
     connect(sock, SIGNAL(disconnected()), SLOT(coreSocketDisconnected()));
     connect(sock, SIGNAL(error(QAbstractSocket::SocketError)), SLOT(coreSocketError(QAbstractSocket::SocketError)));
@@ -511,6 +524,12 @@ void CoreConnection::connectToCurrentAccount()
 
 void CoreConnection::coreSocketConnected()
 {
+    // Create the connection which will handle the incoming data
+    Q_ASSERT(!_connection);
+    _connection = new LegacyConnection(_socket, this);
+    connect(_connection, SIGNAL(dataReceived(QVariant)), SLOT(coreHasData(QVariant)));
+    connect(_connection, SIGNAL(transferProgress(int,int)), SLOT(updateProgress(int,int)));
+
     // Phase One: Send client info and wait for core info
 
     emit connectionMsg(tr("Synchronizing to core..."));
@@ -527,7 +546,7 @@ void CoreConnection::coreSocketConnected()
     clientInit["UseCompression"] = false;
 #endif
 
-    SignalProxy::writeDataToDevice(_socket, clientInit);
+    qobject_cast<RemoteConnection *>(_connection)->writeSocketData(clientInit);
 }
 
 
@@ -676,7 +695,7 @@ void CoreConnection::loginToCore(const QString &prevError)
     clientLogin["MsgType"] = "ClientLogin";
     clientLogin["User"] = currentAccount().user();
     clientLogin["Password"] = currentAccount().password();
-    SignalProxy::writeDataToDevice(_socket, clientLogin);
+    qobject_cast<RemoteConnection*>(_connection)->writeSocketData(clientLogin);
 }
 
 
@@ -706,10 +725,6 @@ void CoreConnection::sessionStateReceived(const QVariantMap &state)
 {
     updateProgress(100, 100);
 
-    // rest of communication happens through SignalProxy...
-    disconnect(_socket, SIGNAL(readyRead()), this, 0);
-    disconnect(_socket, SIGNAL(connected()), this, 0);
-
     syncToCore(state);
 }
 
@@ -794,5 +809,5 @@ void CoreConnection::doCoreSetup(const QVariant &setupData)
     QVariantMap setup;
     setup["MsgType"] = "CoreSetupData";
     setup["SetupData"] = setupData;
-    SignalProxy::writeDataToDevice(_socket, setup);
+    qobject_cast<RemoteConnection *>(_connection)->writeSocketData(setup);
 }
index e04921c..dcc914e 100644 (file)
 #endif
 
 #include "coreaccount.h"
+#include "remoteconnection.h"
 #include "types.h"
 
 class CoreAccountModel;
+class InternalConnection;
 class Network;
 class SignalProxy;
 
@@ -105,7 +107,7 @@ signals:
     void coreSetupFailed(const QString &error);
 
     void startInternalCore();
-    void connectToInternalCore(SignalProxy *proxy);
+    void connectToInternalCore(InternalConnection *connection);
 
     // These signals MUST be handled synchronously!
     void userAuthenticationRequired(CoreAccount *, bool *valid, const QString &errorMessage = QString());
@@ -121,7 +123,7 @@ private slots:
 
     void socketStateChanged(QAbstractSocket::SocketState);
     void coreSocketError(QAbstractSocket::SocketError);
-    void coreHasData();
+    void coreHasData(const QVariant &item);
     void coreSocketConnected();
     void coreSocketDisconnected();
 
@@ -173,8 +175,8 @@ private:
     CoreAccount _account;
     QVariantMap _coreMsgBuffer;
 
-    QPointer<QAbstractSocket> _socket;
-    quint32 _blockSize;
+    QPointer<QTcpSocket> _socket;
+    QPointer<SignalProxy::AbstractPeer> _connection;
     ConnectionState _state;
 
     QTimer _reconnectTimer;
@@ -188,6 +190,7 @@ private:
     QString _coreInfoString(const QVariantMap &);
     bool _wasReconnect;
     bool _requestedDisconnect;
+    bool _resetting;
 
     inline CoreAccountModel *accountModel() const;
 
index 2ddd926..a21f807 100644 (file)
@@ -17,6 +17,7 @@ set(SOURCES
     eventmanager.cpp
     identity.cpp
     ignorelistmanager.cpp
+    internalconnection.cpp
     ircchannel.cpp
     ircevent.cpp
     irclisthelper.cpp
@@ -28,10 +29,14 @@ set(SOURCES
     networkconfig.cpp
     networkevent.cpp
     quassel.cpp
+    remoteconnection.cpp
     settings.cpp
     signalproxy.cpp
     syncableobject.cpp
-    util.cpp)
+    util.cpp
+
+    protocols/legacy/legacyconnection.cpp
+)
 
 set(MOC_HDRS
     aliasmanager.h
@@ -44,14 +49,19 @@ set(MOC_HDRS
     eventmanager.h
     identity.h
     ignorelistmanager.h
+    internalconnection.h
     ircchannel.h
     irclisthelper.h
     ircuser.h
     network.h
     networkconfig.h
+    remoteconnection.h
     settings.h
     signalproxy.h
-    syncableobject.h)
+    syncableobject.h
+
+    protocols/legacy/legacyconnection.h
+)
 
 set(HEADERS ${MOC_HDRS}
     abstractcliparser.h
diff --git a/src/common/internalconnection.cpp b/src/common/internalconnection.cpp
new file mode 100644 (file)
index 0000000..a194a03
--- /dev/null
@@ -0,0 +1,215 @@
+/***************************************************************************
+ *   Copyright (C) 2005-2012 by the Quassel Project                        *
+ *   devel@quassel-irc.org                                                 *
+ *                                                                         *
+ *   This program is free software; you can redistribute it and/or modify  *
+ *   it under the terms of the GNU General Public License as published by  *
+ *   the Free Software Foundation; either version 2 of the License, or     *
+ *   (at your option) version 3.                                           *
+ *                                                                         *
+ *   This program is distributed in the hope that it will be useful,       *
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the         *
+ *   GNU General Public License for more details.                          *
+ *                                                                         *
+ *   You should have received a copy of the GNU General Public License     *
+ *   along with this program; if not, write to the                         *
+ *   Free Software Foundation, Inc.,                                       *
+ *   51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.         *
+ ***************************************************************************/
+
+#include <QCoreApplication>
+#include <QThread>
+
+#include "internalconnection.h"
+
+template<class T>
+class PeerMessageEvent : public QEvent
+{
+public:
+    PeerMessageEvent(InternalConnection *sender, InternalConnection::EventType eventType, const T &message)
+    : QEvent(QEvent::Type(eventType)), sender(sender), message(message) {}
+    InternalConnection *sender;
+    T message;
+};
+
+
+InternalConnection::InternalConnection(QObject *parent)
+    : SignalProxy::AbstractPeer(parent),
+    _proxy(0),
+    _peer(0),
+    _isOpen(true)
+{
+
+}
+
+
+InternalConnection::~InternalConnection()
+{
+    if (_isOpen)
+        emit disconnected();
+}
+
+
+QString InternalConnection::description() const
+{
+    return tr("internal connection");
+}
+
+
+bool InternalConnection::isOpen() const
+{
+    return true;
+}
+
+
+bool InternalConnection::isSecure() const
+{
+    return true;
+}
+
+
+bool InternalConnection::isLocal() const
+{
+    return true;
+}
+
+
+void InternalConnection::close(const QString &reason)
+{
+    // FIXME
+    Q_UNUSED(reason)
+    qWarning() << "closing not implemented!";
+}
+
+
+int InternalConnection::lag() const
+{
+    return 0;
+}
+
+
+void InternalConnection::setSignalProxy(SignalProxy *proxy)
+{
+    if (!proxy && _proxy) {
+        _proxy = 0;
+        if (_isOpen) {
+            _isOpen = false;
+            emit disconnected();
+        }
+        return;
+    }
+
+    if (proxy && !_proxy) {
+        _proxy = proxy;
+        return;
+    }
+
+    qWarning() << Q_FUNC_INFO << "Changing the SignalProxy is not supported!";
+}
+
+
+void InternalConnection::setPeer(InternalConnection *peer)
+{
+    if (_peer) {
+        qWarning() << Q_FUNC_INFO << "Peer already set, ignoring!";
+        return;
+    }
+    _peer = peer;
+    connect(peer, SIGNAL(disconnected()), SLOT(peerDisconnected()));
+}
+
+
+void InternalConnection::peerDisconnected()
+{
+    disconnect(_peer, 0, this, 0);
+    _peer = 0;
+    if (_isOpen) {
+        _isOpen = false;
+        emit disconnected();
+    }
+}
+
+
+void InternalConnection::dispatch(const SignalProxy::SyncMessage &msg)
+{
+    dispatch(SyncMessageEvent, msg);
+}
+
+
+void InternalConnection::dispatch(const SignalProxy::RpcCall &msg)
+{
+    dispatch(RpcCallEvent, msg);
+}
+
+
+void InternalConnection::dispatch(const SignalProxy::InitRequest &msg)
+{
+    dispatch(InitRequestEvent, msg);
+}
+
+
+void InternalConnection::dispatch(const SignalProxy::InitData &msg)
+{
+    dispatch(InitDataEvent, msg);
+}
+
+
+template<class T>
+void InternalConnection::dispatch(EventType eventType, const T &msg)
+{
+    if (!_peer) {
+        qWarning() << Q_FUNC_INFO << "Cannot dispatch a message without a peer!";
+        return;
+    }
+
+    if(QThread::currentThread() == _peer->thread())
+        _peer->handle(msg);
+    else
+        QCoreApplication::postEvent(_peer, new PeerMessageEvent<T>(this, eventType, msg));
+}
+
+
+template<class T>
+void InternalConnection::handle(const T &msg)
+{
+    if (!_proxy) {
+        qWarning() << Q_FUNC_INFO << "Cannot handle a message without having a signal proxy set!";
+        return;
+    }
+
+    _proxy->handle(this, msg);
+}
+
+
+void InternalConnection::customEvent(QEvent *event)
+{
+    switch ((int)event->type()) {
+        case SyncMessageEvent: {
+            PeerMessageEvent<SignalProxy::SyncMessage> *e = static_cast<PeerMessageEvent<SignalProxy::SyncMessage> *>(event);
+            handle(e->message);
+            break;
+        }
+        case RpcCallEvent: {
+            PeerMessageEvent<SignalProxy::RpcCall> *e = static_cast<PeerMessageEvent<SignalProxy::RpcCall> *>(event);
+            handle(e->message);
+            break;
+        }
+        case InitRequestEvent: {
+            PeerMessageEvent<SignalProxy::InitRequest> *e = static_cast<PeerMessageEvent<SignalProxy::InitRequest> *>(event);
+            handle(e->message);
+            break;
+        }
+        case InitDataEvent: {
+            PeerMessageEvent<SignalProxy::InitData> *e = static_cast<PeerMessageEvent<SignalProxy::InitData> *>(event);
+            handle(e->message);
+            break;
+        }
+
+        default:
+            qWarning() << Q_FUNC_INFO << "Received unknown custom event:" << event->type();
+            return;
+    }
+
+    event->accept();
+}
diff --git a/src/common/internalconnection.h b/src/common/internalconnection.h
new file mode 100644 (file)
index 0000000..31f3756
--- /dev/null
@@ -0,0 +1,92 @@
+/***************************************************************************
+ *   Copyright (C) 2005-2012 by the Quassel Project                        *
+ *   devel@quassel-irc.org                                                 *
+ *                                                                         *
+ *   This program is free software; you can redistribute it and/or modify  *
+ *   it under the terms of the GNU General Public License as published by  *
+ *   the Free Software Foundation; either version 2 of the License, or     *
+ *   (at your option) version 3.                                           *
+ *                                                                         *
+ *   This program is distributed in the hope that it will be useful,       *
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the         *
+ *   GNU General Public License for more details.                          *
+ *                                                                         *
+ *   You should have received a copy of the GNU General Public License     *
+ *   along with this program; if not, write to the                         *
+ *   Free Software Foundation, Inc.,                                       *
+ *   51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.         *
+ ***************************************************************************/
+
+#ifndef INTERNALCONNECTION_H
+#define INTERNALCONNECTION_H
+
+#include <QTcpSocket>
+
+#include "signalproxy.h"
+
+class QEvent;
+
+class InternalConnection : public SignalProxy::AbstractPeer
+{
+    Q_OBJECT
+
+public:
+    enum EventType {
+        SyncMessageEvent = QEvent::User,
+        RpcCallEvent,
+        InitRequestEvent,
+        InitDataEvent
+    };
+
+    InternalConnection(QObject *parent = 0);
+    virtual ~InternalConnection();
+
+    QString description() const;
+
+    SignalProxy *signalProxy() const;
+    void setSignalProxy(SignalProxy *proxy);
+
+    InternalConnection *peer() const;
+    void setPeer(InternalConnection *peer);
+
+    bool isOpen() const;
+    bool isSecure() const;
+    bool isLocal() const;
+
+    int lag() const;
+
+    void dispatch(const SignalProxy::SyncMessage &msg);
+    void dispatch(const SignalProxy::RpcCall &msg);
+    void dispatch(const SignalProxy::InitRequest &msg);
+    void dispatch(const SignalProxy::InitData &msg);
+
+public slots:
+    void close(const QString &reason = QString());
+
+signals:
+
+    void disconnected();
+    void error(QAbstractSocket::SocketError);
+
+protected:
+    void customEvent(QEvent *event);
+
+private slots:
+    void peerDisconnected();
+
+private:
+    template<class T>
+    void dispatch(EventType eventType, const T &msg);
+
+    template<class T>
+    void handle(const T &msg);
+
+private:
+    SignalProxy *_proxy;
+    InternalConnection *_peer;
+    bool _isOpen;
+};
+
+
+#endif
diff --git a/src/common/protocols/legacy/legacyconnection.cpp b/src/common/protocols/legacy/legacyconnection.cpp
new file mode 100644 (file)
index 0000000..ec6b26a
--- /dev/null
@@ -0,0 +1,268 @@
+/***************************************************************************
+ *   Copyright (C) 2005-2012 by the Quassel Project                        *
+ *   devel@quassel-irc.org                                                 *
+ *                                                                         *
+ *   This program is free software; you can redistribute it and/or modify  *
+ *   it under the terms of the GNU General Public License as published by  *
+ *   the Free Software Foundation; either version 2 of the License, or     *
+ *   (at your option) version 3.                                           *
+ *                                                                         *
+ *   This program is distributed in the hope that it will be useful,       *
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the         *
+ *   GNU General Public License for more details.                          *
+ *                                                                         *
+ *   You should have received a copy of the GNU General Public License     *
+ *   along with this program; if not, write to the                         *
+ *   Free Software Foundation, Inc.,                                       *
+ *   51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.         *
+ ***************************************************************************/
+
+#include "legacyconnection.h"
+
+LegacyConnection::LegacyConnection(QTcpSocket *socket, QObject *parent)
+    : RemoteConnection(socket, parent),
+    _blockSize(0),
+    _useCompression(false)
+{
+    _stream.setDevice(socket);
+    _stream.setVersion(QDataStream::Qt_4_2);
+
+    connect(socket, SIGNAL(readyRead()), SLOT(socketDataAvailable()));
+}
+
+
+void LegacyConnection::setSignalProxy(SignalProxy *proxy)
+{
+    RemoteConnection::setSignalProxy(proxy);
+
+    if (proxy) {
+        // enable compression now if requested - the initial handshake is uncompressed in the legacy protocol!
+        _useCompression = socket()->property("UseCompression").toBool();
+    }
+
+}
+
+
+void LegacyConnection::socketDataAvailable()
+{
+    QVariant item;
+    while (readSocketData(item)) {
+        // if no sigproxy is set, we're in handshake mode and let the data be handled elsewhere
+        if (!signalProxy())
+            emit dataReceived(item);
+        else
+            handlePackedFunc(item);
+    }
+}
+
+
+bool LegacyConnection::readSocketData(QVariant &item)
+{
+    if (_blockSize == 0) {
+        if (socket()->bytesAvailable() < 4)
+            return false;
+        _stream >> _blockSize;
+    }
+
+    if (_blockSize > 1 << 22) {
+        close("Peer tried to send package larger than max package size!");
+        return false;
+    }
+
+    if (_blockSize == 0) {
+        close("Peer tried to send 0 byte package!");
+        return false;
+    }
+
+    if (socket()->bytesAvailable() < _blockSize) {
+        emit transferProgress(socket()->bytesAvailable(), _blockSize);
+        return false;
+    }
+
+    emit transferProgress(_blockSize, _blockSize);
+
+    _blockSize = 0;
+
+    if (_useCompression) {
+        QByteArray rawItem;
+        _stream >> rawItem;
+
+        int nbytes = rawItem.size();
+        if (nbytes <= 4) {
+            const char *data = rawItem.constData();
+            if (nbytes < 4 || (data[0] != 0 || data[1] != 0 || data[2] != 0 || data[3] != 0)) {
+                close("Peer sent corrupted compressed data!");
+                return false;
+            }
+        }
+
+        rawItem = qUncompress(rawItem);
+
+        QDataStream itemStream(&rawItem, QIODevice::ReadOnly);
+        itemStream.setVersion(QDataStream::Qt_4_2);
+        itemStream >> item;
+    }
+    else {
+        _stream >> item;
+    }
+
+    if (!item.isValid()) {
+        close("Peer sent corrupt data: unable to load QVariant!");
+        return false;
+    }
+
+    return true;
+}
+
+
+void LegacyConnection::writeSocketData(const QVariant &item)
+{
+    if (!socket()->isOpen()) {
+        qWarning() << Q_FUNC_INFO << "Can't write to a closed socket!";
+        return;
+    }
+
+    QByteArray block;
+    QDataStream out(&block, QIODevice::WriteOnly);
+    out.setVersion(QDataStream::Qt_4_2);
+
+    if (_useCompression) {
+        QByteArray rawItem;
+        QDataStream itemStream(&rawItem, QIODevice::WriteOnly);
+        itemStream.setVersion(QDataStream::Qt_4_2);
+        itemStream << item;
+
+        rawItem = qCompress(rawItem);
+
+        out << rawItem;
+    }
+    else {
+        out << item;
+    }
+
+    _stream << block;  // also writes the length as part of the serialization format
+}
+
+
+void LegacyConnection::handlePackedFunc(const QVariant &packedFunc)
+{
+    QVariantList params(packedFunc.toList());
+
+    if (params.isEmpty()) {
+        qWarning() << Q_FUNC_INFO << "Received incompatible data:" << packedFunc;
+        return;
+    }
+
+    RequestType requestType = (RequestType)params.takeFirst().value<int>();
+    switch (requestType) {
+        case Sync: {
+            if (params.count() < 3) {
+                qWarning() << Q_FUNC_INFO << "Received invalid sync call:" << params;
+                return;
+            }
+            QByteArray className = params.takeFirst().toByteArray();
+            QString objectName = params.takeFirst().toString();
+            QByteArray slotName = params.takeFirst().toByteArray();
+            handle(SignalProxy::SyncMessage(className, objectName, slotName, params));
+            break;
+        }
+        case RpcCall: {
+            if (params.empty()) {
+                qWarning() << Q_FUNC_INFO << "Received empty RPC call!";
+                return;
+            }
+            QByteArray slotName = params.takeFirst().toByteArray();
+            handle(SignalProxy::RpcCall(slotName, params));
+            break;
+        }
+        case InitRequest: {
+            if (params.count() != 2) {
+                qWarning() << Q_FUNC_INFO << "Received invalid InitRequest:" << params;
+                return;
+            }
+            QByteArray className = params[0].toByteArray();
+            QString objectName = params[1].toString();
+            handle(SignalProxy::InitRequest(className, objectName));
+            break;
+        }
+        case InitData: {
+            if (params.count() != 3) {
+                qWarning() << Q_FUNC_INFO << "Received invalid InitData:" << params;
+                return;
+            }
+            QByteArray className = params[0].toByteArray();
+            QString objectName = params[1].toString();
+            QVariantMap initData = params[2].toMap();
+            handle(SignalProxy::InitData(className, objectName, initData));
+            break;
+        }
+        case HeartBeat: {
+            if (params.count() != 1) {
+                qWarning() << Q_FUNC_INFO << "Received invalid HeartBeat:" << params;
+                return;
+            }
+            // The legacy protocol would only send a QTime, no QDateTime
+            // so we assume it's sent today, which works in exactly the same cases as it did in the old implementation
+            QDateTime dateTime = QDateTime::currentDateTimeUtc();
+            dateTime.setTime(params[0].toTime());
+            handle(RemoteConnection::HeartBeat(dateTime));
+            break;
+        }
+        case HeartBeatReply: {
+            if (params.count() != 1) {
+                qWarning() << Q_FUNC_INFO << "Received invalid HeartBeat:" << params;
+                return;
+            }
+            // The legacy protocol would only send a QTime, no QDateTime
+            // so we assume it's sent today, which works in exactly the same cases as it did in the old implementation
+            QDateTime dateTime = QDateTime::currentDateTimeUtc();
+            dateTime.setTime(params[0].toTime());
+            handle(RemoteConnection::HeartBeatReply(dateTime));
+            break;
+        }
+
+    }
+}
+
+
+void LegacyConnection::dispatch(const SignalProxy::SyncMessage &msg)
+{
+    dispatchPackedFunc(QVariantList() << (qint16)Sync << msg.className() << msg.objectName() << msg.slotName() << msg.params());
+}
+
+
+void LegacyConnection::dispatch(const SignalProxy::RpcCall &msg)
+{
+    dispatchPackedFunc(QVariantList() << (qint16)RpcCall << msg.slotName() << msg.params());
+}
+
+
+void LegacyConnection::dispatch(const SignalProxy::InitRequest &msg)
+{
+    dispatchPackedFunc(QVariantList() << (qint16)InitRequest << msg.className() << msg.objectName());
+}
+
+
+void LegacyConnection::dispatch(const SignalProxy::InitData &msg)
+{
+    dispatchPackedFunc(QVariantList() << (qint16)InitData << msg.className() << msg.objectName() << msg.initData());
+}
+
+
+void LegacyConnection::dispatch(const RemoteConnection::HeartBeat &msg)
+{
+    dispatchPackedFunc(QVariantList() << (qint16)HeartBeat << msg.timestamp().time());
+}
+
+
+void LegacyConnection::dispatch(const RemoteConnection::HeartBeatReply &msg)
+{
+    dispatchPackedFunc(QVariantList() << (qint16)HeartBeatReply << msg.timestamp().time());
+}
+
+
+void LegacyConnection::dispatchPackedFunc(const QVariantList &packedFunc)
+{
+    writeSocketData(QVariant(packedFunc));
+}
diff --git a/src/common/protocols/legacy/legacyconnection.h b/src/common/protocols/legacy/legacyconnection.h
new file mode 100644 (file)
index 0000000..c05ce51
--- /dev/null
@@ -0,0 +1,73 @@
+/***************************************************************************
+ *   Copyright (C) 2005-2012 by the Quassel Project                        *
+ *   devel@quassel-irc.org                                                 *
+ *                                                                         *
+ *   This program is free software; you can redistribute it and/or modify  *
+ *   it under the terms of the GNU General Public License as published by  *
+ *   the Free Software Foundation; either version 2 of the License, or     *
+ *   (at your option) version 3.                                           *
+ *                                                                         *
+ *   This program is distributed in the hope that it will be useful,       *
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the         *
+ *   GNU General Public License for more details.                          *
+ *                                                                         *
+ *   You should have received a copy of the GNU General Public License     *
+ *   along with this program; if not, write to the                         *
+ *   Free Software Foundation, Inc.,                                       *
+ *   51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.         *
+ ***************************************************************************/
+
+#ifndef LEGACYCONNECTION_H
+#define LEGACYCONNECTION_H
+
+#include <QDataStream>
+
+#include "../../remoteconnection.h"
+
+class QDataStream;
+
+class LegacyConnection : public RemoteConnection
+{
+    Q_OBJECT
+
+public:
+    enum RequestType {
+        Sync = 1,
+        RpcCall,
+        InitRequest,
+        InitData,
+        HeartBeat,
+        HeartBeatReply
+    };
+
+    LegacyConnection(QTcpSocket *socket, QObject *parent = 0);
+    ~LegacyConnection() {}
+
+    void setSignalProxy(SignalProxy *proxy);
+
+    void dispatch(const SignalProxy::SyncMessage &msg);
+    void dispatch(const SignalProxy::RpcCall &msg);
+    void dispatch(const SignalProxy::InitRequest &msg);
+    void dispatch(const SignalProxy::InitData &msg);
+
+    void dispatch(const RemoteConnection::HeartBeat &msg);
+    void dispatch(const RemoteConnection::HeartBeatReply &msg);
+
+    // FIXME: this is only used for the auth phase and should be replaced by something more generic
+    void writeSocketData(const QVariant &item);
+
+private slots:
+    void socketDataAvailable();
+
+private:
+    bool readSocketData(QVariant &item);
+    void handlePackedFunc(const QVariant &packedFunc);
+    void dispatchPackedFunc(const QVariantList &packedFunc);
+
+    QDataStream _stream;
+    qint32 _blockSize;
+    bool _useCompression;
+};
+
+#endif
diff --git a/src/common/remoteconnection.cpp b/src/common/remoteconnection.cpp
new file mode 100644 (file)
index 0000000..4b73768
--- /dev/null
@@ -0,0 +1,190 @@
+/***************************************************************************
+ *   Copyright (C) 2005-2012 by the Quassel Project                        *
+ *   devel@quassel-irc.org                                                 *
+ *                                                                         *
+ *   This program is free software; you can redistribute it and/or modify  *
+ *   it under the terms of the GNU General Public License as published by  *
+ *   the Free Software Foundation; either version 2 of the License, or     *
+ *   (at your option) version 3.                                           *
+ *                                                                         *
+ *   This program is distributed in the hope that it will be useful,       *
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the         *
+ *   GNU General Public License for more details.                          *
+ *                                                                         *
+ *   You should have received a copy of the GNU General Public License     *
+ *   along with this program; if not, write to the                         *
+ *   Free Software Foundation, Inc.,                                       *
+ *   51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.         *
+ ***************************************************************************/
+
+#include <QHostAddress>
+#include <QTimer>
+
+#ifdef HAVE_SSL
+#  include <QSslSocket>
+#endif
+
+#include "remoteconnection.h"
+
+RemoteConnection::RemoteConnection(QTcpSocket *socket, QObject *parent)
+    : SignalProxy::AbstractPeer(parent),
+    _socket(socket),
+    _signalProxy(0),
+    _heartBeatTimer(new QTimer(this)),
+    _heartBeatCount(0),
+    _lag(0)
+{
+    socket->setParent(this);
+    connect(socket, SIGNAL(disconnected()), SIGNAL(disconnected()));
+    connect(socket, SIGNAL(error(QAbstractSocket::SocketError)), SIGNAL(error(QAbstractSocket::SocketError)));
+
+#ifdef HAVE_SSL
+    QSslSocket *sslSocket = qobject_cast<QSslSocket *>(socket);
+    if (sslSocket)
+        connect(sslSocket, SIGNAL(encrypted()), SIGNAL(secureStateChanged()));
+#endif
+
+    connect(_heartBeatTimer, SIGNAL(timeout()), SLOT(sendHeartBeat()));
+}
+
+
+QString RemoteConnection::description() const
+{
+    if (socket())
+        return socket()->peerAddress().toString();
+
+    return QString();
+}
+
+
+SignalProxy *RemoteConnection::signalProxy() const
+{
+    return _signalProxy;
+}
+
+
+void RemoteConnection::setSignalProxy(SignalProxy *proxy)
+{
+    if (proxy == _signalProxy)
+        return;
+
+    if (!proxy) {
+        _heartBeatTimer->stop();
+        disconnect(signalProxy(), 0, this, 0);
+        _signalProxy = 0;
+        if (isOpen())
+            close();
+    }
+    else {
+        if (signalProxy()) {
+            qWarning() << Q_FUNC_INFO << "Setting another SignalProxy not supported, ignoring!";
+            return;
+        }
+        _signalProxy = proxy;
+        connect(proxy, SIGNAL(heartBeatIntervalChanged(int)), SLOT(changeHeartBeatInterval(int)));
+        _heartBeatTimer->setInterval(proxy->heartBeatInterval() * 1000);
+        _heartBeatTimer->start();
+    }
+}
+
+
+void RemoteConnection::changeHeartBeatInterval(int secs)
+{
+    if(secs <= 0)
+        _heartBeatTimer->stop();
+    else {
+        _heartBeatTimer->setInterval(secs * 1000);
+        _heartBeatTimer->start();
+    }
+}
+
+
+int RemoteConnection::lag() const
+{
+    return _lag;
+}
+
+
+QTcpSocket *RemoteConnection::socket() const
+{
+    return _socket;
+}
+
+
+bool RemoteConnection::isSecure() const
+{
+    if (socket()) {
+        if (isLocal())
+            return true;
+#ifdef HAVE_SSL
+        QSslSocket *sslSocket = qobject_cast<QSslSocket *>(socket());
+        if (sslSocket && sslSocket->isEncrypted())
+            return true;
+#endif
+    }
+    return false;
+}
+
+
+bool RemoteConnection::isLocal() const
+{
+    if (socket()) {
+        if (socket()->peerAddress() == QHostAddress::LocalHost || socket()->peerAddress() == QHostAddress::LocalHostIPv6)
+            return true;
+    }
+    return false;
+}
+
+
+bool RemoteConnection::isOpen() const
+{
+    return socket() && socket()->state() == QTcpSocket::ConnectedState;
+}
+
+
+void RemoteConnection::close(const QString &reason)
+{
+    if (!reason.isEmpty()) {
+        qWarning() << "Disconnecting:" << reason;
+    }
+
+    if (socket() && socket()->state() != QTcpSocket::UnconnectedState) {
+        socket()->disconnectFromHost();
+    }
+}
+
+
+void RemoteConnection::handle(const HeartBeat &heartBeat)
+{
+    dispatch(HeartBeatReply(heartBeat.timestamp()));
+}
+
+
+void RemoteConnection::handle(const HeartBeatReply &heartBeatReply)
+{
+    _heartBeatCount = 0;
+    emit lagUpdated(heartBeatReply.timestamp().msecsTo(QDateTime::currentDateTimeUtc()));
+}
+
+
+void RemoteConnection::sendHeartBeat()
+{
+    if (signalProxy()->maxHeartBeatCount() > 0 && _heartBeatCount >= signalProxy()->maxHeartBeatCount()) {
+        qWarning() << "Disconnecting peer:" << description()
+                   << "(didn't receive a heartbeat for over" << _heartBeatCount *_heartBeatTimer->interval() / 1000 << "seconds)";
+        socket()->close();
+        _heartBeatTimer->stop();
+        return;
+    }
+
+    if (_heartBeatCount > 0) {
+        _lag = _heartBeatCount * _heartBeatTimer->interval();
+        emit lagUpdated(_lag);
+    }
+
+    dispatch(HeartBeat(QDateTime::currentDateTimeUtc()));
+    ++_heartBeatCount;
+}
+
+
diff --git a/src/common/remoteconnection.h b/src/common/remoteconnection.h
new file mode 100644 (file)
index 0000000..198627a
--- /dev/null
@@ -0,0 +1,138 @@
+/***************************************************************************
+ *   Copyright (C) 2005-2012 by the Quassel Project                        *
+ *   devel@quassel-irc.org                                                 *
+ *                                                                         *
+ *   This program is free software; you can redistribute it and/or modify  *
+ *   it under the terms of the GNU General Public License as published by  *
+ *   the Free Software Foundation; either version 2 of the License, or     *
+ *   (at your option) version 3.                                           *
+ *                                                                         *
+ *   This program is distributed in the hope that it will be useful,       *
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the         *
+ *   GNU General Public License for more details.                          *
+ *                                                                         *
+ *   You should have received a copy of the GNU General Public License     *
+ *   along with this program; if not, write to the                         *
+ *   Free Software Foundation, Inc.,                                       *
+ *   51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.         *
+ ***************************************************************************/
+
+#ifndef REMOTECONNECTION_H
+#define REMOTECONNECTION_H
+
+#include <QDateTime>
+#include <QTcpSocket>
+
+#include "signalproxy.h"
+
+class QTimer;
+
+class RemoteConnection : public SignalProxy::AbstractPeer
+{
+    Q_OBJECT
+
+public:
+    RemoteConnection(QTcpSocket *socket, QObject *parent = 0);
+    virtual ~RemoteConnection() {};
+
+    void setSignalProxy(SignalProxy *proxy);
+
+    QString description() const;
+
+    bool isOpen() const;
+    bool isSecure() const;
+    bool isLocal() const;
+
+    int lag() const;
+
+    bool compressionEnabled() const;
+    void setCompressionEnabled(bool enabled);
+
+    QTcpSocket *socket() const;
+
+    // this is only used for the auth phase and should be replaced by something more generic
+    virtual void writeSocketData(const QVariant &item) = 0;
+
+public slots:
+    void close(const QString &reason = QString());
+
+signals:
+    // this is only used for the auth phase and should be replaced by something more generic
+    void dataReceived(const QVariant &item);
+
+    void disconnected();
+    void error(QAbstractSocket::SocketError);
+
+    void transferProgress(int current, int max);
+
+protected:
+    class HeartBeat;
+    class HeartBeatReply;
+
+    SignalProxy *signalProxy() const;
+
+    template<class T>
+    void handle(const T &protoMessage);
+
+    void handle(const HeartBeat &heartBeat);
+    void handle(const HeartBeatReply &heartBeatReply);
+
+    virtual void dispatch(const HeartBeat &msg) = 0;
+    virtual void dispatch(const HeartBeatReply &msg) = 0;
+
+private slots:
+    void sendHeartBeat();
+    void changeHeartBeatInterval(int secs);
+
+private:
+    QTcpSocket *_socket;
+    SignalProxy *_signalProxy;
+    QTimer *_heartBeatTimer;
+    int _heartBeatCount;
+    int _lag;
+};
+
+
+// These protocol messages get handled internally and won't reach SignalProxy
+
+class RemoteConnection::HeartBeat
+{
+public:
+    inline HeartBeat(const QDateTime &timestamp) : _timestamp(timestamp) {}
+
+    inline QDateTime timestamp() const { return _timestamp; }
+
+private:
+    QDateTime _timestamp;
+};
+
+
+class RemoteConnection::HeartBeatReply
+{
+public:
+    inline HeartBeatReply(const QDateTime &timestamp) : _timestamp(timestamp) {}
+
+    inline QDateTime timestamp() const { return _timestamp; }
+
+private:
+    QDateTime _timestamp;
+};
+
+
+// Template methods we need in the header
+template<class T> inline
+void RemoteConnection::handle(const T &protoMessage)
+{
+    if (!signalProxy()) {
+        qWarning() << Q_FUNC_INFO << "Cannot handle messages without a SignalProxy!";
+        return;
+    }
+
+    // _heartBeatCount = 0;
+
+    signalProxy()->handle(this, protoMessage);
+}
+
+
+#endif
index 9836408..fca6d0d 100644 (file)
 #include "syncableobject.h"
 #include "util.h"
 
-// ==================================================
-//  PeerSignalEvent
-// ==================================================
-class PeerSignalEvent : public QEvent
-{
-public:
-    PeerSignalEvent(SignalProxy *sender, SignalProxy::RequestType requestType, const QVariantList &params) : QEvent(QEvent::Type(SignalProxy::PeerSignal)), sender(sender), requestType(requestType), params(params) {}
-    SignalProxy *sender;
-    SignalProxy::RequestType requestType;
-    QVariantList params;
-};
-
-
 class RemovePeerEvent : public QEvent
 {
 public:
-    RemovePeerEvent(QObject *peer) : QEvent(QEvent::Type(SignalProxy::RemovePeer)), peer(peer) {}
-    QObject *peer;
+    RemovePeerEvent(SignalProxy::AbstractPeer *peer) : QEvent(QEvent::Type(SignalProxy::RemovePeerEvent)), peer(peer) {}
+    SignalProxy::AbstractPeer *peer;
 };
 
 
@@ -155,7 +142,6 @@ int SignalProxy::SignalRelay::qt_metacall(QMetaObject::Call _c, int _id, void **
             const Signal &signal = _slots[_id];
 
             QVariantList params;
-            params << signal.signature;
 
             const QList<int> &argTypes = eMeta->argTypes(signal.signalId);
             for (int i = 0; i < argTypes.size(); i++) {
@@ -167,7 +153,7 @@ int SignalProxy::SignalRelay::qt_metacall(QMetaObject::Call _c, int _id, void **
                 params << QVariant(argTypes[i], _a[i+1]);
             }
 
-            proxy()->dispatchSignal(SignalProxy::RpcCall, params);
+            proxy()->dispatch(SignalProxy::RpcCall(signal.signature, params));
         }
         _id -= _slots.count();
     }
@@ -175,59 +161,6 @@ int SignalProxy::SignalRelay::qt_metacall(QMetaObject::Call _c, int _id, void **
 }
 
 
-// ==================================================
-//  Peers
-// ==================================================
-void SignalProxy::IODevicePeer::dispatchSignal(const RequestType &requestType, const QVariantList &params)
-{
-    QVariantList packedFunc;
-    packedFunc << (qint16)requestType
-               << params;
-    dispatchPackedFunc(QVariant(packedFunc));
-}
-
-
-bool SignalProxy::IODevicePeer::isSecure() const
-{
-#ifdef HAVE_SSL
-    QSslSocket *sslSocket = qobject_cast<QSslSocket *>(_device);
-    if (sslSocket)
-        return sslSocket->isEncrypted() || sslSocket->localAddress() == QHostAddress::LocalHost || sslSocket->localAddress() == QHostAddress::LocalHostIPv6;
-#endif
-
-    QAbstractSocket *socket = qobject_cast<QAbstractSocket *>(_device);
-    if (socket)
-        return socket->localAddress() == QHostAddress::LocalHost || socket->localAddress() == QHostAddress::LocalHostIPv6;
-
-    return false;
-}
-
-
-QString SignalProxy::IODevicePeer::address() const
-{
-    QAbstractSocket *socket = qobject_cast<QAbstractSocket *>(_device);
-    if (socket)
-        return socket->peerAddress().toString();
-    else
-        return QString();
-}
-
-
-void SignalProxy::SignalProxyPeer::dispatchSignal(const RequestType &requestType, const QVariantList &params)
-{
-    Qt::ConnectionType type = QThread::currentThread() == receiver->thread()
-                              ? Qt::DirectConnection
-                              : Qt::QueuedConnection;
-
-    if (type == Qt::DirectConnection) {
-        receiver->receivePeerSignal(sender, requestType, params);
-    }
-    else {
-        QCoreApplication::postEvent(receiver, new PeerSignalEvent(sender, requestType, params));
-    }
-}
-
-
 // ==================================================
 //  SignalProxy
 // ==================================================
@@ -247,15 +180,6 @@ SignalProxy::SignalProxy(ProxyMode mode, QObject *parent)
 }
 
 
-SignalProxy::SignalProxy(ProxyMode mode, QIODevice *device, QObject *parent)
-    : QObject(parent)
-{
-    setProxyMode(mode);
-    addPeer(device);
-    init();
-}
-
-
 SignalProxy::~SignalProxy()
 {
     QHash<QByteArray, ObjectId>::iterator classIter = _syncSlave.begin();
@@ -276,20 +200,9 @@ SignalProxy::~SignalProxy()
 
 void SignalProxy::setProxyMode(ProxyMode mode)
 {
-    PeerHash::iterator peer = _peers.begin();
-    while (peer != _peers.end()) {
-        if ((*peer)->type() != AbstractPeer::IODevicePeer) {
-            IODevicePeer *ioPeer = static_cast<IODevicePeer *>(*peer);
-            if (ioPeer->isOpen()) {
-                qWarning() << "SignalProxy: Cannot change proxy mode while connected";
-                return;
-            }
-        }
-        if ((*peer)->type() != AbstractPeer::SignalProxyPeer) {
-            qWarning() << "SignalProxy: Cannot change proxy mode while connected to another internal SignalProxy";
-            return;
-        }
-        peer++;
+    if (_peers.count()) {
+        qWarning() << Q_FUNC_INFO << "Cannot change proxy mode while connected";
+        return;
     }
 
     _proxyMode = mode;
@@ -305,10 +218,8 @@ void SignalProxy::init()
     _heartBeatInterval = 0;
     _maxHeartBeatCount = 0;
     _signalRelay = new SignalRelay(this);
-    connect(&_heartBeatTimer, SIGNAL(timeout()), this, SLOT(sendHeartBeat()));
     setHeartBeatInterval(30);
     setMaxHeartBeatCount(2);
-    _heartBeatTimer.start();
     _secure = false;
     updateSecureState();
 }
@@ -321,87 +232,58 @@ void SignalProxy::initServer()
 
 void SignalProxy::initClient()
 {
-    attachSlot("__objectRenamed__", this, SLOT(objectRenamed(QByteArray, QString, QString)));
-}
-
-
-bool SignalProxy::addPeer(QIODevice *iodev)
-{
-    if (!iodev)
-        return false;
-
-    if (_peers.contains(iodev))
-        return true;
-
-    if (proxyMode() == Client && !_peers.isEmpty()) {
-        qWarning("SignalProxy: only one peer allowed in client mode!");
-        return false;
-    }
-
-    if (!iodev->isOpen()) {
-        qWarning("SignalProxy::addPeer(QIODevice *iodev): iodev needs to be open!");
-        return false;
-    }
-
-    connect(iodev, SIGNAL(disconnected()), this, SLOT(removePeerBySender()));
-    connect(iodev, SIGNAL(readyRead()), this, SLOT(dataAvailable()));
-
-#ifdef HAVE_SSL
-    QSslSocket *sslSocket = qobject_cast<QSslSocket *>(iodev);
-    if (sslSocket) {
-        connect(iodev, SIGNAL(encrypted()), this, SLOT(updateSecureState()));
-    }
-#endif
-
-    if (!iodev->parent())
-        iodev->setParent(this);
-
-    _peers[iodev] = new IODevicePeer(iodev, iodev->property("UseCompression").toBool());
-
-    if (_peers.count() == 1)
-        emit connected();
-
-    updateSecureState();
-    return true;
+    attachSlot("__objectRenamed__", this, SLOT(objectRenamed(QByteArray,QString,QString)));
 }
 
 
 void SignalProxy::setHeartBeatInterval(int secs)
 {
-    if (secs != _heartBeatInterval) {
+    if (_heartBeatInterval != secs) {
         _heartBeatInterval = secs;
-        _heartBeatTimer.setInterval(secs * 1000);
+        emit heartBeatIntervalChanged(secs);
     }
 }
 
 
 void SignalProxy::setMaxHeartBeatCount(int max)
 {
-    _maxHeartBeatCount = max;
+    if (_maxHeartBeatCount != max) {
+        _maxHeartBeatCount = max;
+        emit maxHeartBeatCountChanged(max);
+    }
 }
 
 
-bool SignalProxy::addPeer(SignalProxy *proxy)
+bool SignalProxy::addPeer(AbstractPeer *peer)
 {
-    if (!proxy)
+    if (!peer)
         return false;
 
-    if (proxyMode() == proxy->proxyMode()) {
-        qWarning() << "SignalProxy::addPeer(): adding a SignalProxy as peer requires one proxy to be server and one client!";
+    if (_peers.contains(peer))
+        return true;
+
+    if (!peer->isOpen()) {
+        qWarning("SignalProxy: peer needs to be open!");
         return false;
     }
 
-    if (_peers.contains(proxy)) {
-        return true;
+    if (proxyMode() == Client) {
+        if (!_peers.isEmpty()) {
+            qWarning("SignalProxy: only one peer allowed in client mode!");
+            return false;
+        }
+        connect(peer, SIGNAL(lagUpdated(int)), SIGNAL(lagUpdated(int)));
     }
 
-    if (proxyMode() == Client && !_peers.isEmpty()) {
-        qWarning("SignalProxy: only one peer allowed in client mode!");
-        return false;
-    }
+    connect(peer, SIGNAL(disconnected()), SLOT(removePeerBySender()));
+    connect(peer, SIGNAL(secureStateChanged(bool)), SLOT(updateSecureState()));
+
+    if (!peer->parent())
+        peer->setParent(this);
 
-    _peers[proxy] = new SignalProxyPeer(this, proxy);
-    proxy->addPeer(this);
+    _peers.insert(peer);
+
+    peer->setSignalProxy(this);
 
     if (_peers.count() == 1)
         emit connected();
@@ -415,43 +297,38 @@ void SignalProxy::removeAllPeers()
 {
     Q_ASSERT(proxyMode() == Server || _peers.count() <= 1);
     // wee need to copy that list since we modify it in the loop
-    QList<QObject *> peers = _peers.keys();
-    foreach(QObject *peer, peers) {
+    QSet<AbstractPeer *> peers = _peers;
+    foreach(AbstractPeer *peer, peers) {
         removePeer(peer);
     }
 }
 
 
-void SignalProxy::removePeer(QObject *dev)
+void SignalProxy::removePeer(AbstractPeer *peer)
 {
+    if (!peer) {
+        qWarning() << Q_FUNC_INFO << "Trying to remove a null peer!";
+        return;
+    }
+
     if (_peers.isEmpty()) {
         qWarning() << "SignalProxy::removePeer(): No peers in use!";
         return;
     }
 
-    Q_ASSERT(dev);
-    if (!_peers.contains(dev)) {
-        qWarning() << "SignalProxy: unknown Peer" << dev;
+    if (!_peers.contains(peer)) {
+        qWarning() << "SignalProxy: unknown Peer" << peer;
         return;
     }
 
-    AbstractPeer *peer = _peers[dev];
-    _peers.remove(dev);
-
-    disconnect(dev, 0, this, 0);
-    if (peer->type() == AbstractPeer::IODevicePeer)
-        emit peerRemoved(static_cast<QIODevice *>(dev));
-
-    if (peer->type() == AbstractPeer::SignalProxyPeer) {
-        SignalProxy *proxy = static_cast<SignalProxy *>(dev);
-        if (proxy->_peers.contains(this))
-            proxy->removePeer(this);
-    }
+    disconnect(peer, 0, this, 0);
+    peer->setSignalProxy(0);
 
-    if (dev->parent() == this)
-        dev->deleteLater();
+    _peers.remove(peer);
+    emit peerRemoved(peer);
 
-    delete peer;
+    if (peer->parent() == this)
+        peer->deleteLater();
 
     updateSecureState();
 
@@ -462,7 +339,7 @@ void SignalProxy::removePeer(QObject *dev)
 
 void SignalProxy::removePeerBySender()
 {
-    removePeer(sender());
+    removePeer(qobject_cast<SignalProxy::AbstractPeer *>(sender()));
 }
 
 
@@ -475,9 +352,7 @@ void SignalProxy::renameObject(const SyncableObject *obj, const QString &newname
     const QByteArray className(meta->className());
     objectRenamed(className, newname, oldname);
 
-    QVariantList params;
-    params << "__objectRenamed__" << className << newname << oldname;
-    dispatchSignal(RpcCall, params);
+    dispatch(RpcCall("__objectRenamed__", QVariantList() << className << newname << oldname));
 }
 
 
@@ -621,139 +496,42 @@ void SignalProxy::stopSynchronize(SyncableObject *obj)
 }
 
 
-void SignalProxy::dispatchSignal(const RequestType &requestType, const QVariantList &params)
+template<class T>
+void SignalProxy::dispatch(const T &protoMessage)
 {
-    QVariant packedFunc(QVariantList() << (qint16)requestType << params);
-    PeerHash::iterator peer = _peers.begin();
-    while (peer != _peers.end()) {
-        switch ((*peer)->type()) {
-        case AbstractPeer::IODevicePeer:
-        {
-            IODevicePeer *ioPeer = static_cast<IODevicePeer *>(*peer);
-            if (ioPeer->isOpen())
-                ioPeer->dispatchPackedFunc(packedFunc);
-            else
-                QCoreApplication::postEvent(this, new RemovePeerEvent(peer.key()));
-        }
-        break;
-        case AbstractPeer::SignalProxyPeer:
-            (*peer)->dispatchSignal(requestType, params);
-            break;
-        default:
-            Q_ASSERT(false); // there shouldn't be any peers with wrong / unknown type
-        }
-        peer++;
-    }
-}
-
-
-void SignalProxy::receivePackedFunc(AbstractPeer *sender, const QVariant &packedFunc)
-{
-    QVariantList params(packedFunc.toList());
-
-    if (params.isEmpty()) {
-        qWarning() << "SignalProxy::receivePeerSignal(): received incompatible Data:" << packedFunc;
-        return;
-    }
-
-    RequestType requestType = (RequestType)params.takeFirst().value<int>();
-    receivePeerSignal(sender, requestType, params);
-}
-
-
-void SignalProxy::receivePeerSignal(AbstractPeer *sender, const RequestType &requestType, const QVariantList &params)
-{
-    switch (requestType) {
-    // list all RequestTypes that shouldnot trigger a heartbeat counter reset here
-    case HeartBeatReply:
-        break;
-    default:
-        if (sender->type() == AbstractPeer::IODevicePeer) {
-            IODevicePeer *ioPeer = static_cast<IODevicePeer *>(sender);
-            ioPeer->sentHeartBeats = 0;
-        }
-    }
-
-    // qDebug() << "SignalProxy::receivePeerSignal)" << requestType << params;
-    switch (requestType) {
-    case RpcCall:
-        if (params.empty())
-            qWarning() << "SignalProxy::receivePeerSignal(): received empty RPC-Call";
+    foreach (AbstractPeer *peer, _peers) {
+        if (peer->isOpen())
+            peer->dispatch(protoMessage);
         else
-            handleSignal(params);
-        //handleSignal(params.takeFirst().toByteArray(), params);
-        break;
-
-    case Sync:
-        handleSync(sender, params);
-        break;
-
-    case InitRequest:
-        handleInitRequest(sender, params);
-        break;
-
-    case InitData:
-        handleInitData(sender, params);
-        break;
-
-    case HeartBeat:
-        receiveHeartBeat(sender, params);
-        break;
-
-    case HeartBeatReply:
-        receiveHeartBeatReply(sender, params);
-        break;
-
-    default:
-        qWarning() << "SignalProxy::receivePeerSignal(): received undefined CallType" << requestType << params;
-    }
-}
-
-
-void SignalProxy::receivePeerSignal(SignalProxy *sender, const RequestType &requestType, const QVariantList &params)
-{
-    if (!_peers.contains(sender)) {
-        // we output only the pointer value. otherwise Qt would try to pretty print. As the object might already been destroyed, this is not a good idea.
-        qWarning() << "SignalProxy::receivePeerSignal(): received Signal from unknown Proxy" << reinterpret_cast<void *>(sender);
-        return;
+            QCoreApplication::postEvent(this, new ::RemovePeerEvent(peer));
     }
-    receivePeerSignal(_peers[sender], requestType, params);
 }
 
 
-void SignalProxy::handleSync(AbstractPeer *sender, QVariantList params)
+void SignalProxy::handle(SignalProxy::AbstractPeer *peer, const SignalProxy::SyncMessage &syncMessage)
 {
-    if (params.count() < 3) {
-        qWarning() << "received invalid Sync call" << params;
-        return;
-    }
-
-    QByteArray className = params.takeFirst().toByteArray();
-    QString objectName = params.takeFirst().toString();
-    QByteArray slot = params.takeFirst().toByteArray();
-
-    if (!_syncSlave.contains(className) || !_syncSlave[className].contains(objectName)) {
-        qWarning() << QString("no registered receiver for sync call: %1::%2 (objectName=\"%3\"). Params are:").arg(QString(className)).arg(QString(slot)).arg(objectName)
-                   << params;
+    if (!_syncSlave.contains(syncMessage.className()) || !_syncSlave[syncMessage.className()].contains(syncMessage.objectName())) {
+        qWarning() << QString("no registered receiver for sync call: %1::%2 (objectName=\"%3\"). Params are:").arg(syncMessage.className(), syncMessage.slotName(), syncMessage.objectName())
+                   << syncMessage.params();
         return;
     }
 
-    SyncableObject *receiver = _syncSlave[className][objectName];
+    SyncableObject *receiver = _syncSlave[syncMessage.className()][syncMessage.objectName()];
     ExtendedMetaObject *eMeta = extendedMetaObject(receiver);
-    if (!eMeta->slotMap().contains(slot)) {
-        qWarning() << QString("no matching slot for sync call: %1::%2 (objectName=\"%3\"). Params are:").arg(QString(className)).arg(QString(slot)).arg(objectName)
-                   << params;
+    if (!eMeta->slotMap().contains(syncMessage.slotName())) {
+        qWarning() << QString("no matching slot for sync call: %1::%2 (objectName=\"%3\"). Params are:").arg(syncMessage.className(), syncMessage.slotName(), syncMessage.objectName())
+                   << syncMessage.params();
         return;
     }
 
-    int slotId = eMeta->slotMap()[slot];
+    int slotId = eMeta->slotMap()[syncMessage.slotName()];
     if (proxyMode() != eMeta->receiverMode(slotId)) {
         qWarning("SignalProxy::handleSync(): invokeMethod for \"%s\" failed. Wrong ProxyMode!", eMeta->methodName(slotId).constData());
         return;
     }
 
     QVariant returnValue((QVariant::Type)eMeta->returnType(slotId));
-    if (!invokeSlot(receiver, slotId, params, returnValue)) {
+    if (!invokeSlot(receiver, slotId, syncMessage.params(), returnValue)) {
         qWarning("SignalProxy::handleSync(): invokeMethod for \"%s\" failed ", eMeta->methodName(slotId).constData());
         return;
     }
@@ -761,14 +539,10 @@ void SignalProxy::handleSync(AbstractPeer *sender, QVariantList params)
     if (returnValue.type() != QVariant::Invalid && eMeta->receiveMap().contains(slotId)) {
         int receiverId = eMeta->receiveMap()[slotId];
         QVariantList returnParams;
-        returnParams << className
-                     << objectName
-                     << eMeta->methodName(receiverId);
-        //QByteArray(receiver->metaObject()->method(receiverId).signature());
         if (eMeta->argTypes(receiverId).count() > 1)
-            returnParams << params;
+            returnParams << syncMessage.params();
         returnParams << returnValue;
-        sender->dispatchSignal(Sync, returnParams);
+        peer->dispatch(SyncMessage(syncMessage.className(), syncMessage.objectName(), eMeta->methodName(receiverId), returnParams));
     }
 
     // send emit update signal
@@ -776,87 +550,61 @@ void SignalProxy::handleSync(AbstractPeer *sender, QVariantList params)
 }
 
 
-void SignalProxy::handleInitRequest(AbstractPeer *sender, const QVariantList &params)
+void SignalProxy::handle(SignalProxy::AbstractPeer *peer, const SignalProxy::InitRequest &initRequest)
 {
-    if (params.count() != 2) {
-        qWarning() << "SignalProxy::handleInitRequest() received initRequest with invalid param Count:"
-                   << params;
-        return;
-    }
-
-    QByteArray className(params[0].toByteArray());
-    QString objectName(params[1].toString());
-
-    if (!_syncSlave.contains(className)) {
+   if (!_syncSlave.contains(initRequest.className())) {
         qWarning() << "SignalProxy::handleInitRequest() received initRequest for unregistered Class:"
-                   << className;
+                   << initRequest.className();
         return;
     }
 
-    if (!_syncSlave[className].contains(objectName)) {
+    if (!_syncSlave[initRequest.className()].contains(initRequest.objectName())) {
         qWarning() << "SignalProxy::handleInitRequest() received initRequest for unregistered Object:"
-                   << className << objectName;
+                   << initRequest.className() << initRequest.objectName();
         return;
     }
 
-    SyncableObject *obj = _syncSlave[className][objectName];
-
-    QVariantList params_;
-    params_ << className
-            << objectName
-            << initData(obj);
-
-    sender->dispatchSignal(InitData, params_);
+    SyncableObject *obj = _syncSlave[initRequest.className()][initRequest.objectName()];
+    peer->dispatch(InitData(initRequest.className(), initRequest.objectName(), initData(obj)));
 }
 
 
-void SignalProxy::handleInitData(AbstractPeer *sender, const QVariantList &params)
+void SignalProxy::handle(SignalProxy::AbstractPeer *peer, const SignalProxy::InitData &initData)
 {
-    Q_UNUSED(sender)
-    if (params.count() != 3) {
-        qWarning() << "SignalProxy::handleInitData() received initData with invalid param Count:"
-                   << params;
-        return;
-    }
-
-    QByteArray className(params[0].toByteArray());
-    QString objectName(params[1].toString());
-    QVariantMap propertyMap(params[2].toMap());
+    Q_UNUSED(peer)
 
-    if (!_syncSlave.contains(className)) {
+    if (!_syncSlave.contains(initData.className())) {
         qWarning() << "SignalProxy::handleInitData() received initData for unregistered Class:"
-                   << className;
+                   << initData.className();
         return;
     }
 
-    if (!_syncSlave[className].contains(objectName)) {
+    if (!_syncSlave[initData.className()].contains(initData.objectName())) {
         qWarning() << "SignalProxy::handleInitData() received initData for unregistered Object:"
-                   << className << objectName;
+                   << initData.className() << initData.objectName();
         return;
     }
 
-    SyncableObject *obj = _syncSlave[className][objectName];
-    setInitData(obj, propertyMap);
+    SyncableObject *obj = _syncSlave[initData.className()][initData.objectName()];
+    setInitData(obj, initData.initData());
 }
 
 
-//void SignalProxy::handleSignal(const QByteArray &funcName, const QVariantList &params) {
-void SignalProxy::handleSignal(const QVariantList &data)
+void SignalProxy::handle(SignalProxy::AbstractPeer *peer, const SignalProxy::RpcCall &rpcCall)
 {
-    QVariantList params = data;
-    QByteArray funcName = params.takeFirst().toByteArray();
+    Q_UNUSED(peer)
 
     QObject *receiver;
     int methodId;
-    SlotHash::const_iterator slot = _attachedSlots.constFind(funcName);
-    while (slot != _attachedSlots.constEnd() && slot.key() == funcName) {
+    SlotHash::const_iterator slot = _attachedSlots.constFind(rpcCall.slotName());
+    while (slot != _attachedSlots.constEnd() && slot.key() == rpcCall.slotName()) {
         receiver = (*slot).first;
         methodId = (*slot).second;
-        if (!invokeSlot(receiver, methodId, params)) {
+        if (!invokeSlot(receiver, methodId, rpcCall.params())) {
             ExtendedMetaObject *eMeta = extendedMetaObject(receiver);
             qWarning("SignalProxy::handleSignal(): invokeMethod for \"%s\" failed ", eMeta->methodName(methodId).constData());
         }
-        slot++;
+        ++slot;
     }
 }
 
@@ -917,122 +665,12 @@ bool SignalProxy::invokeSlot(QObject *receiver, int methodId, const QVariantList
 }
 
 
-void SignalProxy::dataAvailable()
-{
-    // yet again. it's a private slot. no need for checks.
-    QIODevice *ioDev = qobject_cast<QIODevice *>(sender());
-    Q_ASSERT(_peers.contains(ioDev) && _peers[ioDev]->type() == AbstractPeer::IODevicePeer);
-    IODevicePeer *peer = static_cast<IODevicePeer *>(_peers[ioDev]);
-    QVariant var;
-    while (peer->readData(var))
-        receivePackedFunc(peer, var);
-}
-
-
-void SignalProxy::writeDataToDevice(QIODevice *dev, const QVariant &item, bool compressed)
-{
-    QAbstractSocket *sock  = qobject_cast<QAbstractSocket *>(dev);
-    if (!dev->isOpen() || (sock && sock->state() != QAbstractSocket::ConnectedState)) {
-        qWarning("SignalProxy: Can't call write on a closed device");
-        return;
-    }
-
-    QByteArray block;
-    QDataStream out(&block, QIODevice::WriteOnly);
-    out.setVersion(QDataStream::Qt_4_2);
-    out << (quint32)0;
-
-    if (compressed) {
-        QByteArray rawItem;
-        QDataStream itemStream(&rawItem, QIODevice::WriteOnly);
-
-        itemStream.setVersion(QDataStream::Qt_4_2);
-        itemStream << item;
-
-        rawItem = qCompress(rawItem);
-
-        out << rawItem;
-    }
-    else {
-        out << item;
-    }
-
-    out.device()->seek(0);
-    out << (quint32)(block.size() - sizeof(quint32));
-
-    dev->write(block);
-}
-
-
-bool SignalProxy::readDataFromDevice(QIODevice *dev, quint32 &blockSize, QVariant &item, bool compressed)
-{
-    if (!dev)
-        return false;
-
-    QDataStream in(dev);
-    in.setVersion(QDataStream::Qt_4_2);
-
-    if (blockSize == 0) {
-        if (dev->bytesAvailable() < (int)sizeof(quint32)) return false;
-        in >> blockSize;
-    }
-
-    if (blockSize > 1 << 22) {
-        disconnectDevice(dev, tr("Peer tried to send package larger than max package size!"));
-        return false;
-    }
-
-    if (blockSize == 0) {
-        disconnectDevice(dev, tr("Peer tried to send 0 byte package!"));
-        return false;
-    }
-
-    if (dev->bytesAvailable() < blockSize)
-        return false;
-
-    blockSize = 0;
-
-    if (compressed) {
-        QByteArray rawItem;
-        in >> rawItem;
-
-        int nbytes = rawItem.size();
-        if (nbytes <= 4) {
-            const char *data = rawItem.constData();
-            if (nbytes < 4 || (data[0] != 0 || data[1] != 0 || data[2] != 0 || data[3] != 0)) {
-                disconnectDevice(dev, tr("Peer sent corrupted compressed data!"));
-                return false;
-            }
-        }
-
-        rawItem = qUncompress(rawItem);
-
-        QDataStream itemStream(&rawItem, QIODevice::ReadOnly);
-        itemStream.setVersion(QDataStream::Qt_4_2);
-        itemStream >> item;
-    }
-    else {
-        in >> item;
-    }
-
-    if (!item.isValid()) {
-        disconnectDevice(dev, tr("Peer sent corrupt data: unable to load QVariant!"));
-        return false;
-    }
-
-    return true;
-}
-
-
 void SignalProxy::requestInit(SyncableObject *obj)
 {
     if (proxyMode() == Server || obj->isInitialized())
         return;
 
-    QVariantList params;
-    params << obj->syncMetaObject()->className()
-           << obj->objectName();
-    dispatchSignal(InitRequest, params);
+    dispatch(InitRequest(obj->syncMetaObject()->className(), obj->objectName()));
 }
 
 
@@ -1053,79 +691,18 @@ void SignalProxy::setInitData(SyncableObject *obj, const QVariantMap &properties
 }
 
 
-void SignalProxy::sendHeartBeat()
-{
-    QVariantList heartBeatParams;
-    heartBeatParams << QTime::currentTime();
-    QList<IODevicePeer *> toClose;
-
-    PeerHash::iterator peer = _peers.begin();
-    while (peer != _peers.end()) {
-        if ((*peer)->type() == AbstractPeer::IODevicePeer) {
-            IODevicePeer *ioPeer = static_cast<IODevicePeer *>(*peer);
-            ioPeer->dispatchSignal(SignalProxy::HeartBeat, heartBeatParams);
-            if (ioPeer->sentHeartBeats > 0) {
-                updateLag(ioPeer, ioPeer->sentHeartBeats * _heartBeatTimer.interval());
-            }
-            if (maxHeartBeatCount() >= 0 && ioPeer->sentHeartBeats >= maxHeartBeatCount())
-                toClose.append(ioPeer);
-            else
-                ioPeer->sentHeartBeats++;
-        }
-        ++peer;
-    }
-
-    foreach(IODevicePeer *ioPeer, toClose) {
-        qWarning() << "SignalProxy: Disconnecting peer:" << ioPeer->address()
-                   << "(didn't receive a heartbeat for over" << ioPeer->sentHeartBeats *_heartBeatTimer.interval() / 1000 << "seconds)";
-        ioPeer->close();
-    }
-}
-
-
-void SignalProxy::receiveHeartBeat(AbstractPeer *peer, const QVariantList &params)
-{
-    peer->dispatchSignal(SignalProxy::HeartBeatReply, params);
-}
-
-
-void SignalProxy::receiveHeartBeatReply(AbstractPeer *peer, const QVariantList &params)
-{
-    if (peer->type() != AbstractPeer::IODevicePeer) {
-        qWarning() << "SignalProxy::receiveHeartBeatReply: received heart beat from a non IODevicePeer!";
-        return;
-    }
-
-    IODevicePeer *ioPeer = static_cast<IODevicePeer *>(peer);
-    ioPeer->sentHeartBeats = 0;
-
-    if (params.isEmpty()) {
-        qWarning() << "SignalProxy: received heart beat reply with less params then sent from:" << ioPeer->address();
-        return;
-    }
-
-    QTime sendTime = params[0].value<QTime>();
-    updateLag(ioPeer, sendTime.msecsTo(QTime::currentTime()) / 2);
-}
-
-
 void SignalProxy::customEvent(QEvent *event)
 {
-    switch (+event->type()) {
-    case PeerSignal:
-    {
-        PeerSignalEvent *e = static_cast<PeerSignalEvent *>(event);
-        receivePeerSignal(e->sender, e->requestType, e->params);
-    }
+    switch ((int)event->type()) {
+    case RemovePeerEvent: {
+        ::RemovePeerEvent *e = static_cast< ::RemovePeerEvent *>(event);
+        removePeer(e->peer);
         event->accept();
         break;
-    case RemovePeer:
-    {
-        RemovePeerEvent *e = static_cast<RemovePeerEvent *>(event);
-        removePeer(e->peer);
     }
-        event->accept();
+
     default:
+        qWarning() << Q_FUNC_INFO << "Received unknown custom event:" << event->type();
         return;
     }
 }
@@ -1140,9 +717,6 @@ void SignalProxy::sync_call__(const SyncableObject *obj, SignalProxy::ProxyMode
     ExtendedMetaObject *eMeta = extendedMetaObject(obj);
 
     QVariantList params;
-    params << eMeta->metaObject()->className()
-           << obj->objectName()
-           << QByteArray(funcname);
 
     const QList<int> &argTypes = eMeta->argTypes(eMeta->methodId(QByteArray(funcname)));
 
@@ -1155,7 +729,7 @@ void SignalProxy::sync_call__(const SyncableObject *obj, SignalProxy::ProxyMode
         params << QVariant(argTypes[i], va_arg(ap, void *));
     }
 
-    dispatchSignal(Sync, params);
+    dispatch(SyncMessage(eMeta->metaObject()->className(), obj->objectName(), QByteArray(funcname), params));
 }
 
 
@@ -1170,15 +744,6 @@ void SignalProxy::disconnectDevice(QIODevice *dev, const QString &reason)
 }
 
 
-void SignalProxy::updateLag(IODevicePeer *peer, int lag)
-{
-    peer->lag = lag;
-    if (proxyMode() == Client) {
-        emit lagUpdated(lag);
-    }
-}
-
-
 void SignalProxy::dumpProxyStats()
 {
     QString mode;
@@ -1204,9 +769,8 @@ void SignalProxy::updateSecureState()
     bool wasSecure = _secure;
 
     _secure = !_peers.isEmpty();
-    PeerHash::const_iterator peerIter;
-    for (peerIter = _peers.constBegin(); peerIter != _peers.constEnd(); peerIter++) {
-        _secure &= (*peerIter)->isSecure();
+    foreach (const AbstractPeer *peer,  _peers) {
+        _secure &= peer->isSecure();
     }
 
     if (wasSecure != _secure)
index 69a8a9f..645157e 100644 (file)
 #ifndef SIGNALPROXY_H
 #define SIGNALPROXY_H
 
+#include <QAbstractSocket>
 #include <QEvent>
 #include <QList>
 #include <QHash>
 #include <QVariant>
 #include <QVariantMap>
 #include <QPair>
+#include <QSet>
 #include <QString>
 #include <QByteArray>
 #include <QTimer>
@@ -38,35 +40,27 @@ class SignalProxy : public QObject
 {
     Q_OBJECT
 
-    class AbstractPeer;
-    class IODevicePeer;
-    class SignalProxyPeer;
-
     class SignalRelay;
 
 public:
+    class AbstractPeer;
+
+    class SyncMessage;
+    class RpcCall;
+    class InitRequest;
+    class InitData;
+
     enum ProxyMode {
         Server,
         Client
     };
 
-    enum RequestType {
-        Sync = 1,
-        RpcCall,
-        InitRequest,
-        InitData,
-        HeartBeat,
-        HeartBeatReply
-    };
-
-    enum CustomEvents {
-        PeerSignal = QEvent::User,
-        RemovePeer
+    enum EventType {
+        RemovePeerEvent = QEvent::User
     };
 
     SignalProxy(QObject *parent);
     SignalProxy(ProxyMode mode, QObject *parent);
-    SignalProxy(ProxyMode mode, QIODevice *device, QObject *parent);
     virtual ~SignalProxy();
 
     void setProxyMode(ProxyMode mode);
@@ -77,10 +71,7 @@ public:
     void setMaxHeartBeatCount(int max);
     inline int maxHeartBeatCount() const { return _maxHeartBeatCount; }
 
-    bool addPeer(QIODevice *iodev);
-    bool addPeer(SignalProxy *proxy);
-    void removePeer(QObject *peer);
-    void removeAllPeers();
+    bool addPeer(AbstractPeer *peer);
 
     bool attachSignal(QObject *sender, const char *signal, const QByteArray &sigName = QByteArray());
     bool attachSlot(const QByteArray &sigName, QObject *recv, const char *slot);
@@ -88,19 +79,6 @@ public:
     void synchronize(SyncableObject *obj);
     void stopSynchronize(SyncableObject *obj);
 
-    //! Writes a QVariant to a device.
-    /** The data item is prefixed with the resulting blocksize,
-     *  so the corresponding function readDataFromDevice() can check if enough data is available
-     *  at the device to reread the item.
-     */
-    static void writeDataToDevice(QIODevice *dev, const QVariant &item, bool compressed = false);
-
-    //! Reads a data item from a device that has been written by writeDataToDevice().
-    /** If not enough data bytes are available, the function returns false and the QVariant reference
-     *  remains untouched.
-     */
-    static bool readDataFromDevice(QIODevice *dev, quint32 &blockSize, QVariant &item, bool compressed = false);
-
     class ExtendedMetaObject;
     ExtendedMetaObject *extendedMetaObject(const QMetaObject *meta) const;
     ExtendedMetaObject *createExtendedMetaObject(const QMetaObject *meta, bool checkConflicts = false);
@@ -109,6 +87,8 @@ public:
 
     bool isSecure() const { return _secure; }
     void dumpProxyStats();
+    void dumpSyncMap(SyncableObject *object);
+    inline int peerCount() const { return _peers.size(); }
 
 public slots:
     void detachObject(QObject *obj);
@@ -121,41 +101,40 @@ protected:
     void renameObject(const SyncableObject *obj, const QString &newname, const QString &oldname);
 
 private slots:
-    void dataAvailable();
     void removePeerBySender();
     void objectRenamed(const QByteArray &classname, const QString &newname, const QString &oldname);
-    void sendHeartBeat();
-    void receiveHeartBeat(AbstractPeer *peer, const QVariantList &params);
-    void receiveHeartBeatReply(AbstractPeer *peer, const QVariantList &params);
-
     void updateSecureState();
 
 signals:
-    void peerRemoved(QIODevice *dev);
+    void peerRemoved(SignalProxy::AbstractPeer *peer);
     void connected();
     void disconnected();
     void objectInitialized(SyncableObject *);
+    void heartBeatIntervalChanged(int secs);
+    void maxHeartBeatCountChanged(int max);
     void lagUpdated(int lag);
-    void securityChanged(bool);
     void secureStateChanged(bool);
 
 private:
+    template<class T>
+    class PeerMessageEvent;
+
     void init();
     void initServer();
     void initClient();
 
     static const QMetaObject *metaObject(const QObject *obj);
 
-    void dispatchSignal(QIODevice *receiver, const RequestType &requestType, const QVariantList &params);
-    void dispatchSignal(const RequestType &requestType, const QVariantList &params);
+    void removePeer(AbstractPeer *peer);
+    void removeAllPeers();
+
+    template<class T>
+    void dispatch(const T &protoMessage);
 
-    void receivePackedFunc(AbstractPeer *sender, const QVariant &packedFunc);
-    void receivePeerSignal(AbstractPeer *sender, const RequestType &requestType, const QVariantList &params);
-    void receivePeerSignal(SignalProxy *sender, const RequestType &requestType, const QVariantList &params);
-    void handleSync(AbstractPeer *sender, QVariantList params);
-    void handleInitRequest(AbstractPeer *sender, const QVariantList &params);
-    void handleInitData(AbstractPeer *sender, const QVariantList &params);
-    void handleSignal(const QVariantList &data);
+    void handle(AbstractPeer *peer, const SyncMessage &syncMessage);
+    void handle(AbstractPeer *peer, const RpcCall &rpcCall);
+    void handle(AbstractPeer *peer, const InitRequest &initRequest);
+    void handle(AbstractPeer *peer, const InitData &initData);
 
     bool invokeSlot(QObject *receiver, int methodId, const QVariantList &params, QVariant &returnValue);
     bool invokeSlot(QObject *receiver, int methodId, const QVariantList &params = QVariantList());
@@ -164,19 +143,9 @@ private:
     QVariantMap initData(SyncableObject *obj) const;
     void setInitData(SyncableObject *obj, const QVariantMap &properties);
 
-    void updateLag(IODevicePeer *peer, int lag);
-
-public:
-    void dumpSyncMap(SyncableObject *object);
-    inline int peerCount() const { return _peers.size(); }
-
-private:
     static void disconnectDevice(QIODevice *dev, const QString &reason = QString());
 
-    // a Hash of the actual used communication object to it's corresponding peer
-    // currently a communication object can either be an arbitrary QIODevice or another SignalProxy
-    typedef QHash<QObject *, AbstractPeer *> PeerHash;
-    PeerHash _peers;
+    QSet<AbstractPeer *> _peers;
 
     // containg a list of argtypes for fast access
     QHash<const QMetaObject *, ExtendedMetaObject *> _extendedMetaObjects;
@@ -194,7 +163,6 @@ private:
     QHash<QByteArray, ObjectId> _syncSlave;
 
     ProxyMode _proxyMode;
-    QTimer _heartBeatTimer;
     int _heartBeatInterval;
     int _maxHeartBeatCount;
 
@@ -202,6 +170,8 @@ private:
 
     friend class SignalRelay;
     friend class SyncableObject;
+    friend class InternalConnection;
+    friend class RemoteConnection;
 };
 
 
@@ -265,58 +235,111 @@ private:
 
 
 // ==================================================
-//  Peers
+//  AbstractPeer
 // ==================================================
-class SignalProxy::AbstractPeer
+class SignalProxy::AbstractPeer : public QObject
 {
+    Q_OBJECT
+
 public:
-    enum PeerType {
-        NotAPeer = 0,
-        IODevicePeer = 1,
-        SignalProxyPeer = 2
-    };
-    AbstractPeer() : _type(NotAPeer) {}
-    AbstractPeer(PeerType type) : _type(type) {}
-    virtual ~AbstractPeer() {}
-    inline PeerType type() const { return _type; }
-    virtual void dispatchSignal(const RequestType &requestType, const QVariantList &params) = 0;
+    AbstractPeer(QObject *parent = 0) : QObject(parent) {}
+
+    virtual QString description() const = 0;
+
+    virtual void setSignalProxy(SignalProxy *proxy) = 0;
+
+    virtual bool isOpen() const = 0;
     virtual bool isSecure() const = 0;
-private:
-    PeerType _type;
+    virtual bool isLocal() const = 0;
+
+    virtual QString errorString() const { return QString(); }
+
+    virtual int lag() const = 0;
+
+public slots:
+    virtual void dispatch(const SyncMessage &msg) = 0;
+    virtual void dispatch(const RpcCall &msg) = 0;
+    virtual void dispatch(const InitRequest &msg) = 0;
+    virtual void dispatch(const InitData &msg) = 0;
+
+    virtual void close(const QString &reason = QString()) = 0;
+
+signals:
+    void disconnected();
+    void error(QAbstractSocket::SocketError);
+    void secureStateChanged(bool secure = true);
+    void lagUpdated(int msecs);
 };
 
 
-class SignalProxy::IODevicePeer : public SignalProxy::AbstractPeer
+// ==================================================
+//  Protocol Messages
+// ==================================================
+class SignalProxy::SyncMessage
 {
 public:
-    IODevicePeer(QIODevice *device, bool compress) : AbstractPeer(AbstractPeer::IODevicePeer), _device(device), byteCount(0), usesCompression(compress), sentHeartBeats(0), lag(0) {}
-    virtual void dispatchSignal(const RequestType &requestType, const QVariantList &params);
-    virtual bool isSecure() const;
-    inline void dispatchPackedFunc(const QVariant &packedFunc) { SignalProxy::writeDataToDevice(_device, packedFunc, usesCompression); }
-    QString address() const;
-    inline bool isOpen() const { return _device->isOpen(); }
-    inline void close() const { _device->close(); }
-    inline bool readData(QVariant &item) { return SignalProxy::readDataFromDevice(_device, byteCount, item, usesCompression); }
+    inline SyncMessage(const QByteArray &className, const QString &objectName, const QByteArray &slotName, const QVariantList &params)
+    : _className(className), _objectName(objectName), _slotName(slotName), _params(params) {}
+
+    inline QByteArray className() const { return _className; }
+    inline QString objectName() const { return _objectName; }
+    inline QByteArray slotName() const { return _slotName; }
+
+    inline QVariantList params() const { return _params; }
+
 private:
-    QIODevice *_device;
-    quint32 byteCount;
-    bool usesCompression;
+    QByteArray _className;
+    QString _objectName;
+    QByteArray _slotName;
+    QVariantList _params;
+};
+
+
+class SignalProxy::RpcCall
+{
 public:
-    int sentHeartBeats;
-    int lag;
+    inline RpcCall(const QByteArray &slotName, const QVariantList &params)
+    : _slotName(slotName), _params(params) {}
+
+    inline QByteArray slotName() const { return _slotName; }
+    inline QVariantList params() const { return _params; }
+
+private:
+    QByteArray _slotName;
+    QVariantList _params;
 };
 
 
-class SignalProxy::SignalProxyPeer : public SignalProxy::AbstractPeer
+class SignalProxy::InitRequest
 {
 public:
-    SignalProxyPeer(SignalProxy *sender, SignalProxy *receiver) : AbstractPeer(AbstractPeer::SignalProxyPeer), sender(sender), receiver(receiver) {}
-    virtual void dispatchSignal(const RequestType &requestType, const QVariantList &params);
-    virtual inline bool isSecure() const { return true; }
+    inline InitRequest(const QByteArray &className, const QString &objectName)
+    : _className(className), _objectName(objectName) {}
+
+    inline QByteArray className() const { return _className; }
+    inline QString objectName() const { return _objectName; }
+
 private:
-    SignalProxy *sender;
-    SignalProxy *receiver;
+    QByteArray _className;
+    QString _objectName;
 };
 
 
+class SignalProxy::InitData
+{
+public:
+    inline InitData(const QByteArray &className, const QString &objectName, const QVariantMap &initData)
+    : _className(className), _objectName(objectName), _initData(initData) {}
+
+    inline QByteArray className() const { return _className; }
+    inline QString objectName() const { return _objectName; }
+
+    inline QVariantMap initData() const { return _initData; }
+
+private:
+    QByteArray _className;
+    QString _objectName;
+    QVariantMap _initData;
+};
+
 #endif
index 46c2893..f5b554f 100644 (file)
@@ -23,6 +23,7 @@
 #include "core.h"
 #include "coresession.h"
 #include "coresettings.h"
+#include "internalconnection.h"
 #include "postgresqlstorage.h"
 #include "quassel.h"
 #include "signalproxy.h"
@@ -32,6 +33,8 @@
 
 #include "util.h"
 
+#include "protocols/legacy/legacyconnection.h"
+
 // migration related
 #include <QFile>
 #ifdef Q_OS_WIN32
@@ -54,8 +57,8 @@ const int Core::AddClientEventId = QEvent::registerEventType();
 class AddClientEvent : public QEvent
 {
 public:
-    AddClientEvent(QTcpSocket *socket, UserId uid) : QEvent(QEvent::Type(Core::AddClientEventId)), socket(socket), userId(uid) {}
-    QTcpSocket *socket;
+    AddClientEvent(RemoteConnection *connection, UserId uid) : QEvent(QEvent::Type(Core::AddClientEventId)), connection(connection), userId(uid) {}
+    RemoteConnection *connection;
     UserId userId;
 };
 
@@ -215,8 +218,8 @@ void Core::init()
 
 Core::~Core()
 {
-    foreach(QTcpSocket *socket, blocksizes.keys()) {
-        socket->disconnectFromHost(); // disconnect non authed clients
+    foreach(RemoteConnection *connection, clientInfo.keys()) {
+        connection->close(); // disconnect non authed clients
     }
     qDeleteAll(sessions);
     qDeleteAll(_storageBackends);
@@ -515,12 +518,13 @@ void Core::incomingConnection()
     Q_ASSERT(server);
     while (server->hasPendingConnections()) {
         QTcpSocket *socket = server->nextPendingConnection();
-        connect(socket, SIGNAL(disconnected()), this, SLOT(clientDisconnected()));
-        connect(socket, SIGNAL(readyRead()), this, SLOT(clientHasData()));
-        connect(socket, SIGNAL(error(QAbstractSocket::SocketError)), this, SLOT(socketError(QAbstractSocket::SocketError)));
+        RemoteConnection *connection = new LegacyConnection(socket, this);
+
+        connect(connection, SIGNAL(disconnected()), SLOT(clientDisconnected()));
+        connect(connection, SIGNAL(dataReceived(QVariant)), SLOT(processClientMessage(QVariant)));
+        connect(connection, SIGNAL(error(QAbstractSocket::SocketError)), SLOT(socketError(QAbstractSocket::SocketError)));
 
-        QVariantMap clientInfo;
-        blocksizes.insert(socket, (quint32)0);
+        clientInfo.insert(connection, QVariantMap());
         quInfo() << qPrintable(tr("Client connected from"))  << qPrintable(socket->peerAddress().toString());
 
         if (!_configured) {
@@ -530,27 +534,22 @@ void Core::incomingConnection()
 }
 
 
-void Core::clientHasData()
+void Core::processClientMessage(const QVariant &data)
 {
-    QTcpSocket *socket = dynamic_cast<QTcpSocket *>(sender());
-    Q_ASSERT(socket && blocksizes.contains(socket));
-    QVariant item;
-    while (SignalProxy::readDataFromDevice(socket, blocksizes[socket], item)) {
-        QVariantMap msg = item.toMap();
-        processClientMessage(socket, msg);
-        if (!blocksizes.contains(socket)) break;  // this socket is no longer ours to handle!
+    RemoteConnection *connection = qobject_cast<RemoteConnection *>(sender());
+    if (!connection) {
+        qWarning() << Q_FUNC_INFO << "Message not sent by RemoteConnection!";
+        return;
     }
-}
-
 
-void Core::processClientMessage(QTcpSocket *socket, const QVariantMap &msg)
-{
+    QVariantMap msg = data.toMap();
     if (!msg.contains("MsgType")) {
         // Client is way too old, does not even use the current init format
         qWarning() << qPrintable(tr("Antique client trying to connect... refusing."));
-        socket->close();
+        connection->close();
         return;
     }
+
     // OK, so we have at least an init message format we can understand
     if (msg["MsgType"] == "ClientInit") {
         QVariantMap reply;
@@ -562,9 +561,10 @@ void Core::processClientMessage(QTcpSocket *socket, const QVariantMap &msg)
             reply["Error"] = tr("<b>Your Quassel Client is too old!</b><br>"
                                 "This core needs at least client/core protocol version %1.<br>"
                                 "Please consider upgrading your client.").arg(Quassel::buildInfo().coreNeedsProtocol);
-            SignalProxy::writeDataToDevice(socket, reply);
-            qWarning() << qPrintable(tr("Client")) << qPrintable(socket->peerAddress().toString()) << qPrintable(tr("too old, rejecting."));
-            socket->close(); return;
+            connection->writeSocketData(reply);
+            qWarning() << qPrintable(tr("Client")) << connection->description() << qPrintable(tr("too old, rejecting."));
+            connection->close();
+            return;
         }
 
         reply["ProtocolVersion"] = Quassel::buildInfo().protocolVersion;
@@ -590,8 +590,8 @@ void Core::processClientMessage(QTcpSocket *socket, const QVariantMap &msg)
 
 #ifdef HAVE_SSL
         SslServer *sslServer = qobject_cast<SslServer *>(&_server);
-        QSslSocket *sslSocket = qobject_cast<QSslSocket *>(socket);
-        bool supportSsl = (bool)sslServer && (bool)sslSocket && sslServer->isCertValid();
+        QSslSocket *sslSocket = qobject_cast<QSslSocket *>(connection->socket());
+        bool supportSsl = sslServer && sslSocket && sslServer->isCertValid();
 #else
         bool supportSsl = false;
 #endif
@@ -626,36 +626,36 @@ void Core::processClientMessage(QTcpSocket *socket, const QVariantMap &msg)
         else {
             reply["Configured"] = true;
         }
-        clientInfo[socket] = msg; // store for future reference
+        clientInfo[connection] = msg; // store for future reference
         reply["MsgType"] = "ClientInitAck";
-        SignalProxy::writeDataToDevice(socket, reply);
-        socket->flush(); // ensure that the write cache is flushed before we switch to ssl
+        connection->writeSocketData(reply);
+        connection->socket()->flush(); // ensure that the write cache is flushed before we switch to ssl
 
 #ifdef HAVE_SSL
         // after we told the client that we are ssl capable we switch to ssl mode
         if (supportSsl && msg["UseSsl"].toBool()) {
-            qDebug() << qPrintable(tr("Starting TLS for Client:"))  << qPrintable(socket->peerAddress().toString());
-            connect(sslSocket, SIGNAL(sslErrors(const QList<QSslError> &)), this, SLOT(sslErrors(const QList<QSslError> &)));
+            qDebug() << qPrintable(tr("Starting TLS for Client:"))  << connection->description();
+            connect(sslSocket, SIGNAL(sslErrors(const QList<QSslError> &)), SLOT(sslErrors(const QList<QSslError> &)));
             sslSocket->startServerEncryption();
         }
 #endif
 
 #ifndef QT_NO_COMPRESS
         if (supportsCompression && msg["UseCompression"].toBool()) {
-            socket->setProperty("UseCompression", true);
-            qDebug() << "Using compression for Client:" << qPrintable(socket->peerAddress().toString());
+            connection->socket()->setProperty("UseCompression", true);
+            qDebug() << "Using compression for Client:" << qPrintable(connection->socket()->peerAddress().toString());
         }
 #endif
     }
     else {
         // for the rest, we need an initialized connection
-        if (!clientInfo.contains(socket)) {
+        if (!clientInfo.contains(connection)) {
             QVariantMap reply;
             reply["MsgType"] = "ClientLoginReject";
             reply["Error"] = tr("<b>Client not initialized!</b><br>You need to send an init message before trying to login.");
-            SignalProxy::writeDataToDevice(socket, reply);
-            qWarning() << qPrintable(tr("Client")) << qPrintable(socket->peerAddress().toString()) << qPrintable(tr("did not send an init message before trying to login, rejecting."));
-            socket->close(); return;
+            connection->writeSocketData(reply);
+            qWarning() << qPrintable(tr("Client")) << qPrintable(connection->socket()->peerAddress().toString()) << qPrintable(tr("did not send an init message before trying to login, rejecting."));
+            connection->close(); return;
         }
         if (msg["MsgType"] == "CoreSetupData") {
             QVariantMap reply;
@@ -667,7 +667,7 @@ void Core::processClientMessage(QTcpSocket *socket, const QVariantMap &msg)
             else {
                 reply["MsgType"] = "CoreSetupAck";
             }
-            SignalProxy::writeDataToDevice(socket, reply);
+            connection->writeSocketData(reply);
         }
         else if (msg["MsgType"] == "ClientLogin") {
             QVariantMap reply;
@@ -675,13 +675,13 @@ void Core::processClientMessage(QTcpSocket *socket, const QVariantMap &msg)
             if (uid == 0) {
                 reply["MsgType"] = "ClientLoginReject";
                 reply["Error"] = tr("<b>Invalid username or password!</b><br>The username/password combination you supplied could not be found in the database.");
-                SignalProxy::writeDataToDevice(socket, reply);
+                connection->writeSocketData(reply);
                 return;
             }
             reply["MsgType"] = "ClientLoginAck";
-            SignalProxy::writeDataToDevice(socket, reply);
-            quInfo() << qPrintable(tr("Client")) << qPrintable(socket->peerAddress().toString()) << qPrintable(tr("initialized and authenticated successfully as \"%1\" (UserId: %2).").arg(msg["User"].toString()).arg(uid.toInt()));
-            setupClientSession(socket, uid);
+            connection->writeSocketData(reply);
+            quInfo() << qPrintable(tr("Client")) << qPrintable(connection->socket()->peerAddress().toString()) << qPrintable(tr("initialized and authenticated successfully as \"%1\" (UserId: %2).").arg(msg["User"].toString()).arg(uid.toInt()));
+            setupClientSession(connection, uid);
         }
     }
 }
@@ -690,41 +690,12 @@ void Core::processClientMessage(QTcpSocket *socket, const QVariantMap &msg)
 // Potentially called during the initialization phase (before handing the connection off to the session)
 void Core::clientDisconnected()
 {
-    QTcpSocket *socket = qobject_cast<QTcpSocket *>(sender());
-    if (socket) {
-        // here it's safe to call methods on socket!
-        quInfo() << qPrintable(tr("Non-authed client disconnected.")) << qPrintable(socket->peerAddress().toString());
-        blocksizes.remove(socket);
-        clientInfo.remove(socket);
-        socket->deleteLater();
-    }
-    else {
-        // we have to crawl through the hashes and see if we find a victim to remove
-        qDebug() << qPrintable(tr("Non-authed client disconnected. (socket allready destroyed)"));
+    RemoteConnection *connection = qobject_cast<RemoteConnection *>(sender());
+    Q_ASSERT(connection);
 
-        // DO NOT CALL ANY METHODS ON socket!!
-        socket = static_cast<QTcpSocket *>(sender());
-
-        QHash<QTcpSocket *, quint32>::iterator blockSizeIter = blocksizes.begin();
-        while (blockSizeIter != blocksizes.end()) {
-            if (blockSizeIter.key() == socket) {
-                blockSizeIter = blocksizes.erase(blockSizeIter);
-            }
-            else {
-                blockSizeIter++;
-            }
-        }
-
-        QHash<QTcpSocket *, QVariantMap>::iterator clientInfoIter = clientInfo.begin();
-        while (clientInfoIter != clientInfo.end()) {
-            if (clientInfoIter.key() == socket) {
-                clientInfoIter = clientInfo.erase(clientInfoIter);
-            }
-            else {
-                clientInfoIter++;
-            }
-        }
-    }
+    quInfo() << qPrintable(tr("Non-authed client disconnected.")) << qPrintable(connection->socket()->peerAddress().toString());
+    clientInfo.remove(connection);
+    connection->deleteLater();
 
     // make server listen again if still not configured
     if (!_configured) {
@@ -736,13 +707,12 @@ void Core::clientDisconnected()
 }
 
 
-void Core::setupClientSession(QTcpSocket *socket, UserId uid)
+void Core::setupClientSession(RemoteConnection *connection, UserId uid)
 {
     // From now on everything is handled by the client session
-    disconnect(socket, 0, this, 0);
-    socket->flush();
-    blocksizes.remove(socket);
-    clientInfo.remove(socket);
+    disconnect(connection, 0, this, 0);
+    connection->socket()->flush();
+    clientInfo.remove(connection);
 
     // Find or create session for validated user
     SessionThread *session;
@@ -752,15 +722,15 @@ void Core::setupClientSession(QTcpSocket *socket, UserId uid)
     else {
         session = createSession(uid);
         if (!session) {
-            qWarning() << qPrintable(tr("Could not initialize session for client:")) << qPrintable(socket->peerAddress().toString());
-            socket->close();
+            qWarning() << qPrintable(tr("Could not initialize session for client:")) << qPrintable(connection->socket()->peerAddress().toString());
+            connection->close();
             return;
         }
     }
 
     // as we are currently handling an event triggered by incoming data on this socket
     // it is unsafe to directly move the socket to the client thread.
-    QCoreApplication::postEvent(this, new AddClientEvent(socket, uid));
+    QCoreApplication::postEvent(this, new AddClientEvent(connection, uid));
 }
 
 
@@ -768,27 +738,27 @@ void Core::customEvent(QEvent *event)
 {
     if (event->type() == AddClientEventId) {
         AddClientEvent *addClientEvent = static_cast<AddClientEvent *>(event);
-        addClientHelper(addClientEvent->socket, addClientEvent->userId);
+        addClientHelper(addClientEvent->connection, addClientEvent->userId);
         return;
     }
 }
 
 
-void Core::addClientHelper(QTcpSocket *socket, UserId uid)
+void Core::addClientHelper(RemoteConnection *connection, UserId uid)
 {
     // Find or create session for validated user
     if (!sessions.contains(uid)) {
-        qWarning() << qPrintable(tr("Could not find a session for client:")) << qPrintable(socket->peerAddress().toString());
-        socket->close();
+        qWarning() << qPrintable(tr("Could not find a session for client:")) << qPrintable(connection->socket()->peerAddress().toString());
+        connection->close();
         return;
     }
 
     SessionThread *session = sessions[uid];
-    session->addClient(socket);
+    session->addClient(connection);
 }
 
 
-void Core::setupInternalClientSession(SignalProxy *proxy)
+void Core::setupInternalClientSession(InternalConnection *clientConnection)
 {
     if (!_configured) {
         stopListening();
@@ -804,13 +774,18 @@ void Core::setupInternalClientSession(SignalProxy *proxy)
         return;
     }
 
+    InternalConnection *coreConnection = new InternalConnection(this);
+    coreConnection->setPeer(clientConnection);
+    clientConnection->setPeer(coreConnection);
+
     // Find or create session for validated user
-    SessionThread *sess;
+    SessionThread *sessionThread;
     if (sessions.contains(uid))
-        sess = sessions[uid];
+        sessionThread = sessions[uid];
     else
-        sess = createSession(uid);
-    sess->addClient(proxy);
+        sessionThread = createSession(uid);
+
+    sessionThread->addClient(coreConnection);
 }
 
 
@@ -841,9 +816,9 @@ void Core::sslErrors(const QList<QSslError> &errors)
 
 void Core::socketError(QAbstractSocket::SocketError err)
 {
-    QAbstractSocket *socket = qobject_cast<QAbstractSocket *>(sender());
-    if (socket && err != QAbstractSocket::RemoteHostClosedError)
-        qWarning() << "Core::socketError()" << socket << err << socket->errorString();
+    RemoteConnection *connection = qobject_cast<RemoteConnection *>(sender());
+    if (connection && err != QAbstractSocket::RemoteHostClosedError)
+        qWarning() << "Core::socketError()" << connection->socket() << err << connection->socket()->errorString();
 }
 
 
index 45a37b6..48c015d 100644 (file)
 #include "types.h"
 
 class CoreSession;
+class RemoteConnection;
+struct NetworkInfo;
 class SessionThread;
 class SignalProxy;
-struct NetworkInfo;
 
 class AbstractSqlMigrationReader;
 class AbstractSqlMigrationWriter;
@@ -484,7 +485,7 @@ public slots:
     /** \note This method is threadsafe.
      */
     void syncStorage();
-    void setupInternalClientSession(SignalProxy *proxy);
+    void setupInternalClientSession(InternalConnection *clientConnection);
 
 signals:
     //! Sent when a BufferInfo is updated in storage.
@@ -500,7 +501,6 @@ private slots:
     bool startListening();
     void stopListening(const QString &msg = QString());
     void incomingConnection();
-    void clientHasData();
     void clientDisconnected();
 
     bool initStorage(const QString &backend, QVariantMap settings, bool setup = false);
@@ -511,6 +511,8 @@ private slots:
 #endif
     void socketError(QAbstractSocket::SocketError);
 
+    void processClientMessage(const QVariant &data);
+
 private:
     Core();
     ~Core();
@@ -518,9 +520,8 @@ private:
     static Core *instanceptr;
 
     SessionThread *createSession(UserId userId, bool restoreState = false);
-    void setupClientSession(QTcpSocket *socket, UserId uid);
-    void addClientHelper(QTcpSocket *socket, UserId uid);
-    void processClientMessage(QTcpSocket *socket, const QVariantMap &msg);
+    void setupClientSession(RemoteConnection *connection, UserId uid);
+    void addClientHelper(RemoteConnection *connection, UserId uid);
     //void processCoreSetup(QTcpSocket *socket, QVariantMap &msg);
     QString setupCoreForInternalUsage();
     QString setupCore(QVariantMap setupData);
@@ -547,8 +548,7 @@ private:
 
     OidentdConfigGenerator *_oidentdConfigGenerator;
 
-    QHash<QTcpSocket *, quint32> blocksizes;
-    QHash<QTcpSocket *, QVariantMap> clientInfo;
+    QHash<RemoteConnection *, QVariantMap> clientInfo;
 
     QHash<QString, Storage *> _storageBackends;
 
index 6abf631..0ac23b7 100644 (file)
 #include "coreusersettings.h"
 #include "ctcpparser.h"
 #include "eventstringifier.h"
+#include "internalconnection.h"
 #include "ircchannel.h"
 #include "ircparser.h"
 #include "ircuser.h"
 #include "logger.h"
 #include "messageevent.h"
-#include "signalproxy.h"
 #include "storage.h"
 #include "util.h"
 
+#include "protocols/legacy/legacyconnection.h"
+
 class ProcessMessagesEvent : public QEvent
 {
 public:
@@ -56,7 +58,7 @@ public:
 CoreSession::CoreSession(UserId uid, bool restoreState, QObject *parent)
     : QObject(parent),
     _user(uid),
-    _signalProxy(new SignalProxy(SignalProxy::Server, 0, this)),
+    _signalProxy(new SignalProxy(SignalProxy::Server, this)),
     _aliasManager(this),
     _bufferSyncer(new CoreBufferSyncer(this)),
     _backlogManager(new CoreBacklogManager(this)),
@@ -77,10 +79,10 @@ CoreSession::CoreSession(UserId uid, bool restoreState, QObject *parent)
     p->setHeartBeatInterval(30);
     p->setMaxHeartBeatCount(60); // 30 mins until we throw a dead socket out
 
-    connect(p, SIGNAL(peerRemoved(QIODevice *)), this, SLOT(removeClient(QIODevice *)));
+    connect(p, SIGNAL(peerRemoved(SignalProxy::AbstractPeer*)), SLOT(removeClient(SignalProxy::AbstractPeer*)));
 
-    connect(p, SIGNAL(connected()), this, SLOT(clientsConnected()));
-    connect(p, SIGNAL(disconnected()), this, SLOT(clientsDisconnected()));
+    connect(p, SIGNAL(connected()), SLOT(clientsConnected()));
+    connect(p, SIGNAL(disconnected()), SLOT(clientsDisconnected()));
 
     p->attachSlot(SIGNAL(sendInput(BufferInfo, QString)), this, SLOT(msgFromClient(BufferInfo, QString)));
     p->attachSignal(this, SIGNAL(displayMsg(Message)));
@@ -204,36 +206,28 @@ void CoreSession::restoreSessionState()
 }
 
 
-void CoreSession::addClient(QIODevice *device)
+void CoreSession::addClient(RemoteConnection *connection)
 {
-    if (!device) {
-        qCritical() << "Invoking CoreSession::addClient with a QObject that is not a QIODevice!";
-    }
-    else {
-        // if the socket is an orphan, the signalProxy adopts it.
-        // -> we don't need to care about it anymore
-        device->setParent(0);
-        signalProxy()->addPeer(device);
-        QVariantMap reply;
-        reply["MsgType"] = "SessionInit";
-        reply["SessionState"] = sessionState();
-        SignalProxy::writeDataToDevice(device, reply);
-    }
+    QVariantMap reply;
+    reply["MsgType"] = "SessionInit";
+    reply["SessionState"] = sessionState();
+    connection->writeSocketData(reply);
+    signalProxy()->addPeer(connection);
 }
 
 
-void CoreSession::addClient(SignalProxy *proxy)
+void CoreSession::addClient(InternalConnection *connection)
 {
-    signalProxy()->addPeer(proxy);
+    signalProxy()->addPeer(connection);
     emit sessionState(sessionState());
 }
 
 
-void CoreSession::removeClient(QIODevice *iodev)
+void CoreSession::removeClient(SignalProxy::AbstractPeer *peer)
 {
-    QTcpSocket *socket = qobject_cast<QTcpSocket *>(iodev);
-    if (socket)
-        quInfo() << qPrintable(tr("Client")) << qPrintable(socket->peerAddress().toString()) << qPrintable(tr("disconnected (UserId: %1).").arg(user().toInt()));
+    RemoteConnection *connection = qobject_cast<RemoteConnection *>(peer);
+    if (connection)
+        quInfo() << qPrintable(tr("Client")) << connection->description() << qPrintable(tr("disconnected (UserId: %1).").arg(user().toInt()));
 }
 
 
index 98d60d4..fffb768 100644 (file)
@@ -28,6 +28,7 @@
 #include "corealiasmanager.h"
 #include "coreignorelistmanager.h"
 #include "message.h"
+#include "signalproxy.h"
 #include "storage.h"
 
 class CoreBacklogManager;
@@ -41,10 +42,11 @@ class CoreSessionEventProcessor;
 class CtcpParser;
 class EventManager;
 class EventStringifier;
+class InternalConnection;
 class IrcParser;
 class MessageEvent;
 class NetworkConnection;
-class SignalProxy;
+class RemoteConnection;
 
 struct NetworkInfo;
 
@@ -87,8 +89,8 @@ public:
     void restoreSessionState();
 
 public slots:
-    void addClient(QIODevice *device);
-    void addClient(SignalProxy *proxy);
+    void addClient(RemoteConnection *connection);
+    void addClient(InternalConnection *connection);
 
     void msgFromClient(BufferInfo, QString message);
 
@@ -155,7 +157,7 @@ protected:
     virtual void customEvent(QEvent *event);
 
 private slots:
-    void removeClient(QIODevice *dev);
+    void removeClient(SignalProxy::AbstractPeer *peer);
 
     void recvStatusMsgFromServer(QString msg);
     void recvMessageFromServer(NetworkId networkId, Message::Type, BufferInfo::Type, const QString &target, const QString &text, const QString &sender = "", Message::Flags flags = Message::None);
index a56c42b..49790df 100644 (file)
  *   51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.         *
  ***************************************************************************/
 
-#include <QMutexLocker>
-
+#include "core.h"
+#include "coresession.h"
+#include "internalconnection.h"
+#include "remoteconnection.h"
 #include "sessionthread.h"
 #include "signalproxy.h"
-#include "coresession.h"
-#include "core.h"
 
 SessionThread::SessionThread(UserId uid, bool restoreState, QObject *parent)
     : QThread(parent),
@@ -72,6 +72,7 @@ void SessionThread::setSessionInitialized()
 }
 
 
+// this and the following related methods are executed in the Core thread!
 void SessionThread::addClient(QObject *peer)
 {
     if (isSessionInitialized()) {
@@ -85,42 +86,44 @@ void SessionThread::addClient(QObject *peer)
 
 void SessionThread::addClientToSession(QObject *peer)
 {
-    QIODevice *socket = qobject_cast<QIODevice *>(peer);
-    if (socket) {
-        addRemoteClientToSession(socket);
+    RemoteConnection *connection = qobject_cast<RemoteConnection *>(peer);
+    if (connection) {
+        addRemoteClientToSession(connection);
         return;
     }
 
-    SignalProxy *proxy = qobject_cast<SignalProxy *>(peer);
-    if (proxy) {
-        addInternalClientToSession(proxy);
+    InternalConnection *internal = qobject_cast<InternalConnection *>(peer);
+    if (internal) {
+        addInternalClientToSession(internal);
         return;
     }
 
-    qWarning() << "SessionThread::addClient() received neither QIODevice nor SignalProxy as peer!" << peer;
+    qWarning() << "SessionThread::addClient() received invalid peer!" << peer;
 }
 
 
-void SessionThread::addRemoteClientToSession(QIODevice *socket)
+void SessionThread::addRemoteClientToSession(RemoteConnection *connection)
 {
-    socket->setParent(0);
-    socket->moveToThread(session()->thread());
-    emit addRemoteClient(socket);
+    connection->setParent(0);
+    connection->moveToThread(session()->thread());
+    emit addRemoteClient(connection);
 }
 
 
-void SessionThread::addInternalClientToSession(SignalProxy *proxy)
+void SessionThread::addInternalClientToSession(InternalConnection *connection)
 {
-    emit addInternalClient(proxy);
+    connection->setParent(0);
+    connection->moveToThread(session()->thread());
+    emit addInternalClient(connection);
 }
 
 
 void SessionThread::run()
 {
     _session = new CoreSession(user(), _restoreState);
-    connect(this, SIGNAL(addRemoteClient(QIODevice *)), _session, SLOT(addClient(QIODevice *)));
-    connect(this, SIGNAL(addInternalClient(SignalProxy *)), _session, SLOT(addClient(SignalProxy *)));
-    connect(_session, SIGNAL(sessionState(const QVariant &)), Core::instance(), SIGNAL(sessionState(const QVariant &)));
+    connect(this, SIGNAL(addRemoteClient(RemoteConnection*)), _session, SLOT(addClient(RemoteConnection*)));
+    connect(this, SIGNAL(addInternalClient(InternalConnection*)), _session, SLOT(addClient(InternalConnection*)));
+    connect(_session, SIGNAL(sessionState(QVariant)), Core::instance(), SIGNAL(sessionState(QVariant)));
     emit initialized();
     exec();
     delete _session;
index 7540263..aacefce 100644 (file)
@@ -27,8 +27,9 @@
 #include "types.h"
 
 class CoreSession;
+class InternalConnection;
+class RemoteConnection;
 class QIODevice;
-class SignalProxy;
 
 class SessionThread : public QThread
 {
@@ -53,8 +54,8 @@ signals:
     void initialized();
     void shutdown();
 
-    void addRemoteClient(QIODevice *);
-    void addInternalClient(SignalProxy *);
+    void addRemoteClient(RemoteConnection *);
+    void addInternalClient(InternalConnection *);
 
 private:
     CoreSession *_session;
@@ -65,8 +66,8 @@ private:
 
     bool isSessionInitialized();
     void addClientToSession(QObject *peer);
-    void addRemoteClientToSession(QIODevice *socket);
-    void addInternalClientToSession(SignalProxy *proxy);
+    void addRemoteClientToSession(RemoteConnection *connection);
+    void addInternalClientToSession(InternalConnection *client);
 };
 
 
index 1ad4f03..ab3bf02 100644 (file)
@@ -24,6 +24,8 @@
 #include "core.h"
 #include "qtui.h"
 
+class InternalConnection;
+
 MonolithicApplication::MonolithicApplication(int &argc, char **argv)
     : QtUiApplication(argc, argv),
     _internalInitDone(false)
@@ -43,6 +45,7 @@ bool MonolithicApplication::init()
 
     connect(Client::coreConnection(), SIGNAL(startInternalCore()), SLOT(startInternalCore()));
 
+    // FIXME what's this for?
     if (isOptionSet("port")) {
         startInternalCore();
     }
@@ -67,6 +70,6 @@ void MonolithicApplication::startInternalCore()
     }
     Core *core = Core::instance();
     CoreConnection *connection = Client::coreConnection();
-    connect(connection, SIGNAL(connectToInternalCore(SignalProxy *)), core, SLOT(setupInternalClientSession(SignalProxy *)));
-    connect(core, SIGNAL(sessionState(const QVariant &)), connection, SLOT(internalSessionStateReceived(const QVariant &)));
+    connect(connection, SIGNAL(connectToInternalCore(InternalConnection*)), core, SLOT(setupInternalClientSession(InternalConnection*)));
+    connect(core, SIGNAL(sessionState(QVariant)), connection, SLOT(internalSessionStateReceived(QVariant)));
 }