#include "clientsettings.h"
#include "coreaccountmodel.h"
#include "identity.h"
+#include "internalconnection.h"
#include "network.h"
#include "networkmodel.h"
#include "quassel.h"
#include "signalproxy.h"
#include "util.h"
+#include "protocols/legacy/legacyconnection.h"
+
CoreConnection::CoreConnection(CoreAccountModel *model, QObject *parent)
: QObject(parent),
_model(model),
- _blockSize(0),
_state(Disconnected),
_wantReconnect(false),
_progressMinimum(0),
_progressMaximum(-1),
_progressValue(-1),
_wasReconnect(false),
- _requestedDisconnect(false)
+ _requestedDisconnect(false),
+ _resetting(false)
{
qRegisterMetaType<ConnectionState>("CoreConnection::ConnectionState");
}
void CoreConnection::reconnectTimeout()
{
- if (!_socket) {
+ if (!_connection) {
CoreConnectionSettings s;
if (_wantReconnect && s.autoReconnect()) {
#ifdef HAVE_KDE
bool CoreConnection::isEncrypted() const
{
-#ifndef HAVE_SSL
- return false;
-#else
- QSslSocket *sock = qobject_cast<QSslSocket *>(_socket);
- return isConnected() && sock && sock->isEncrypted();
-#endif
+ return _connection && _connection->isSecure();
}
return false;
if (currentAccount().isInternal())
return true;
- if (_socket->peerAddress().isInSubnet(QHostAddress::LocalHost, 0x00ffffff))
+ if (_connection->isLocal())
return true;
return false;
void CoreConnection::coreSocketError(QAbstractSocket::SocketError)
{
- qDebug() << "coreSocketError" << _socket << _socket->errorString();
disconnectFromCore(_socket->errorString(), true);
}
void CoreConnection::coreSocketDisconnected()
{
- // qDebug() << Q_FUNC_INFO;
_wasReconnect = !_requestedDisconnect;
resetConnection(true);
// FIXME handle disconnects gracefully
}
-void CoreConnection::coreHasData()
+// note: this still expects the legacy protocol
+// noteĀ²: after cleaning this up, we can probably get rid of _socket altogether
+void CoreConnection::coreHasData(const QVariant &item)
{
- QVariant item;
- while (SignalProxy::readDataFromDevice(_socket, _blockSize, item)) {
- QVariantMap msg = item.toMap();
- if (!msg.contains("MsgType")) {
- // This core is way too old and does not even speak our init protocol...
- emit connectionErrorPopup(tr("The Quassel Core you try to connect to is too old! Please consider upgrading."));
- disconnectFromCore(QString(), false);
- return;
- }
- if (msg["MsgType"] == "ClientInitAck") {
- clientInitAck(msg);
- }
- else if (msg["MsgType"] == "ClientInitReject") {
- emit connectionErrorPopup(msg["Error"].toString());
- disconnectFromCore(QString(), false);
- return;
- }
- else if (msg["MsgType"] == "CoreSetupAck") {
- emit coreSetupSuccess();
- }
- else if (msg["MsgType"] == "CoreSetupReject") {
- emit coreSetupFailed(msg["Error"].toString());
- }
- else if (msg["MsgType"] == "ClientLoginReject") {
- loginFailed(msg["Error"].toString());
- }
- else if (msg["MsgType"] == "ClientLoginAck") {
- loginSuccess();
- }
- else if (msg["MsgType"] == "SessionInit") {
- // that's it, let's hand over to the signal proxy
- // if the socket is an orphan, the signalProxy adopts it.
- // -> we don't need to care about it anymore
- _socket->setParent(0);
- Client::signalProxy()->addPeer(_socket);
-
- sessionStateReceived(msg["SessionState"].toMap());
- break; // this is definitively the last message we process here!
- }
- else {
- disconnectFromCore(tr("Invalid data received from core"), false);
- return;
- }
+ QVariantMap msg = item.toMap();
+ if (!msg.contains("MsgType")) {
+ // This core is way too old and does not even speak our init protocol...
+ emit connectionErrorPopup(tr("The Quassel Core you try to connect to is too old! Please consider upgrading."));
+ disconnectFromCore(QString(), false);
+ return;
+ }
+ if (msg["MsgType"] == "ClientInitAck") {
+ clientInitAck(msg);
+ }
+ else if (msg["MsgType"] == "ClientInitReject") {
+ emit connectionErrorPopup(msg["Error"].toString());
+ disconnectFromCore(QString(), false);
+ return;
+ }
+ else if (msg["MsgType"] == "CoreSetupAck") {
+ emit coreSetupSuccess();
+ }
+ else if (msg["MsgType"] == "CoreSetupReject") {
+ emit coreSetupFailed(msg["Error"].toString());
+ }
+ else if (msg["MsgType"] == "ClientLoginReject") {
+ loginFailed(msg["Error"].toString());
}
- if (_blockSize > 0) {
- updateProgress(_socket->bytesAvailable(), _blockSize);
+ else if (msg["MsgType"] == "ClientLoginAck") {
+ loginSuccess();
+ }
+ else if (msg["MsgType"] == "SessionInit") {
+ // that's it, let's hand over to the signal proxy
+ // if the connection is an orphan, the signalProxy adopts it.
+ // -> we don't need to care about it anymore
+
+ disconnect(_connection, 0, this, 0);
+
+ _connection->setParent(0);
+ Client::signalProxy()->addPeer(_connection);
+
+ sessionStateReceived(msg["SessionState"].toMap());
+ }
+ else {
+ disconnectFromCore(tr("Invalid data received from core"), false);
+ return;
}
}
void CoreConnection::disconnectFromCore()
{
- _requestedDisconnect = true;
- disconnectFromCore(QString(), false); // requested disconnect, so don't try to reconnect
+ if (_socket) {
+ _requestedDisconnect = true;
+ disconnectFromCore(QString(), false); // requested disconnect, so don't try to reconnect
+ }
}
_wasReconnect = wantReconnect; // store if disconnect was requested
+ resetConnection(wantReconnect);
+
if (errorString.isEmpty())
emit connectionError(tr("Disconnected"));
else
emit connectionError(errorString);
-
- Client::signalProxy()->removeAllPeers();
- resetConnection(wantReconnect);
}
void CoreConnection::resetConnection(bool wantReconnect)
{
+ if (_resetting)
+ return;
+ _resetting = true;
+
_wantReconnect = wantReconnect;
- if (_socket) {
+ if (_connection) {
+ disconnect(_socket, 0, this, 0);
+ disconnect(_connection, 0, this, 0);
+ _connection->close();
+
+ if (_connection->parent() == this)
+ _connection->deleteLater(); // if it's not us, it belongs to the sigproxy which will delete it
+ _socket = 0; // socket is owned and will be deleted by RemoteConnection
+ _connection = 0;
+ }
+ else if (_socket) {
disconnect(_socket, 0, this, 0);
_socket->deleteLater();
_socket = 0;
}
+
_requestedDisconnect = false;
- _blockSize = 0;
_coreMsgBuffer.clear();
-
_netsToSync.clear();
_numNetsToSync = 0;
if (wantReconnect && s.autoReconnect()) {
_reconnectTimer.start();
}
+
+ _resetting = false;
}
return;
}
emit startInternalCore();
- emit connectToInternalCore(Client::instance()->signalProxy());
+
+ InternalConnection *conn = new InternalConnection();
+ Client::instance()->signalProxy()->addPeer(conn); // sigproxy will take ownership
+ emit connectToInternalCore(conn);
+
return;
}
Q_ASSERT(!_socket);
#ifdef HAVE_SSL
- QSslSocket *sock = new QSslSocket(Client::instance());
+ QSslSocket *sock = new QSslSocket(this);
// make sure the warning is shown if we happen to connect without SSL support later
s.setAccountValue("ShowNoClientSslWarning", true);
#else
s.setAccountValue("ShowNoClientSslWarning", false);
}
}
- QTcpSocket *sock = new QTcpSocket(Client::instance());
+ QTcpSocket *sock = new QTcpSocket(this);
#endif
#ifndef QT_NO_NETWORKPROXY
#endif
_socket = sock;
- connect(sock, SIGNAL(readyRead()), SLOT(coreHasData()));
connect(sock, SIGNAL(connected()), SLOT(coreSocketConnected()));
connect(sock, SIGNAL(disconnected()), SLOT(coreSocketDisconnected()));
connect(sock, SIGNAL(error(QAbstractSocket::SocketError)), SLOT(coreSocketError(QAbstractSocket::SocketError)));
void CoreConnection::coreSocketConnected()
{
+ // Create the connection which will handle the incoming data
+ Q_ASSERT(!_connection);
+ _connection = new LegacyConnection(_socket, this);
+ connect(_connection, SIGNAL(dataReceived(QVariant)), SLOT(coreHasData(QVariant)));
+ connect(_connection, SIGNAL(transferProgress(int,int)), SLOT(updateProgress(int,int)));
+
// Phase One: Send client info and wait for core info
emit connectionMsg(tr("Synchronizing to core..."));
clientInit["UseCompression"] = false;
#endif
- SignalProxy::writeDataToDevice(_socket, clientInit);
+ qobject_cast<RemoteConnection *>(_connection)->writeSocketData(clientInit);
}
clientLogin["MsgType"] = "ClientLogin";
clientLogin["User"] = currentAccount().user();
clientLogin["Password"] = currentAccount().password();
- SignalProxy::writeDataToDevice(_socket, clientLogin);
+ qobject_cast<RemoteConnection*>(_connection)->writeSocketData(clientLogin);
}
{
updateProgress(100, 100);
- // rest of communication happens through SignalProxy...
- disconnect(_socket, SIGNAL(readyRead()), this, 0);
- disconnect(_socket, SIGNAL(connected()), this, 0);
-
syncToCore(state);
}
QVariantMap setup;
setup["MsgType"] = "CoreSetupData";
setup["SetupData"] = setupData;
- SignalProxy::writeDataToDevice(_socket, setup);
+ qobject_cast<RemoteConnection *>(_connection)->writeSocketData(setup);
}
#endif
#include "coreaccount.h"
+#include "remoteconnection.h"
#include "types.h"
class CoreAccountModel;
+class InternalConnection;
class Network;
class SignalProxy;
void coreSetupFailed(const QString &error);
void startInternalCore();
- void connectToInternalCore(SignalProxy *proxy);
+ void connectToInternalCore(InternalConnection *connection);
// These signals MUST be handled synchronously!
void userAuthenticationRequired(CoreAccount *, bool *valid, const QString &errorMessage = QString());
void socketStateChanged(QAbstractSocket::SocketState);
void coreSocketError(QAbstractSocket::SocketError);
- void coreHasData();
+ void coreHasData(const QVariant &item);
void coreSocketConnected();
void coreSocketDisconnected();
CoreAccount _account;
QVariantMap _coreMsgBuffer;
- QPointer<QAbstractSocket> _socket;
- quint32 _blockSize;
+ QPointer<QTcpSocket> _socket;
+ QPointer<SignalProxy::AbstractPeer> _connection;
ConnectionState _state;
QTimer _reconnectTimer;
QString _coreInfoString(const QVariantMap &);
bool _wasReconnect;
bool _requestedDisconnect;
+ bool _resetting;
inline CoreAccountModel *accountModel() const;
eventmanager.cpp
identity.cpp
ignorelistmanager.cpp
+ internalconnection.cpp
ircchannel.cpp
ircevent.cpp
irclisthelper.cpp
networkconfig.cpp
networkevent.cpp
quassel.cpp
+ remoteconnection.cpp
settings.cpp
signalproxy.cpp
syncableobject.cpp
- util.cpp)
+ util.cpp
+
+ protocols/legacy/legacyconnection.cpp
+)
set(MOC_HDRS
aliasmanager.h
eventmanager.h
identity.h
ignorelistmanager.h
+ internalconnection.h
ircchannel.h
irclisthelper.h
ircuser.h
network.h
networkconfig.h
+ remoteconnection.h
settings.h
signalproxy.h
- syncableobject.h)
+ syncableobject.h
+
+ protocols/legacy/legacyconnection.h
+)
set(HEADERS ${MOC_HDRS}
abstractcliparser.h
--- /dev/null
+/***************************************************************************
+ * Copyright (C) 2005-2012 by the Quassel Project *
+ * devel@quassel-irc.org *
+ * *
+ * This program is free software; you can redistribute it and/or modify *
+ * it under the terms of the GNU General Public License as published by *
+ * the Free Software Foundation; either version 2 of the License, or *
+ * (at your option) version 3. *
+ * *
+ * This program is distributed in the hope that it will be useful, *
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of *
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
+ * GNU General Public License for more details. *
+ * *
+ * You should have received a copy of the GNU General Public License *
+ * along with this program; if not, write to the *
+ * Free Software Foundation, Inc., *
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. *
+ ***************************************************************************/
+
+#include <QCoreApplication>
+#include <QThread>
+
+#include "internalconnection.h"
+
+template<class T>
+class PeerMessageEvent : public QEvent
+{
+public:
+ PeerMessageEvent(InternalConnection *sender, InternalConnection::EventType eventType, const T &message)
+ : QEvent(QEvent::Type(eventType)), sender(sender), message(message) {}
+ InternalConnection *sender;
+ T message;
+};
+
+
+InternalConnection::InternalConnection(QObject *parent)
+ : SignalProxy::AbstractPeer(parent),
+ _proxy(0),
+ _peer(0),
+ _isOpen(true)
+{
+
+}
+
+
+InternalConnection::~InternalConnection()
+{
+ if (_isOpen)
+ emit disconnected();
+}
+
+
+QString InternalConnection::description() const
+{
+ return tr("internal connection");
+}
+
+
+bool InternalConnection::isOpen() const
+{
+ return true;
+}
+
+
+bool InternalConnection::isSecure() const
+{
+ return true;
+}
+
+
+bool InternalConnection::isLocal() const
+{
+ return true;
+}
+
+
+void InternalConnection::close(const QString &reason)
+{
+ // FIXME
+ Q_UNUSED(reason)
+ qWarning() << "closing not implemented!";
+}
+
+
+int InternalConnection::lag() const
+{
+ return 0;
+}
+
+
+void InternalConnection::setSignalProxy(SignalProxy *proxy)
+{
+ if (!proxy && _proxy) {
+ _proxy = 0;
+ if (_isOpen) {
+ _isOpen = false;
+ emit disconnected();
+ }
+ return;
+ }
+
+ if (proxy && !_proxy) {
+ _proxy = proxy;
+ return;
+ }
+
+ qWarning() << Q_FUNC_INFO << "Changing the SignalProxy is not supported!";
+}
+
+
+void InternalConnection::setPeer(InternalConnection *peer)
+{
+ if (_peer) {
+ qWarning() << Q_FUNC_INFO << "Peer already set, ignoring!";
+ return;
+ }
+ _peer = peer;
+ connect(peer, SIGNAL(disconnected()), SLOT(peerDisconnected()));
+}
+
+
+void InternalConnection::peerDisconnected()
+{
+ disconnect(_peer, 0, this, 0);
+ _peer = 0;
+ if (_isOpen) {
+ _isOpen = false;
+ emit disconnected();
+ }
+}
+
+
+void InternalConnection::dispatch(const SignalProxy::SyncMessage &msg)
+{
+ dispatch(SyncMessageEvent, msg);
+}
+
+
+void InternalConnection::dispatch(const SignalProxy::RpcCall &msg)
+{
+ dispatch(RpcCallEvent, msg);
+}
+
+
+void InternalConnection::dispatch(const SignalProxy::InitRequest &msg)
+{
+ dispatch(InitRequestEvent, msg);
+}
+
+
+void InternalConnection::dispatch(const SignalProxy::InitData &msg)
+{
+ dispatch(InitDataEvent, msg);
+}
+
+
+template<class T>
+void InternalConnection::dispatch(EventType eventType, const T &msg)
+{
+ if (!_peer) {
+ qWarning() << Q_FUNC_INFO << "Cannot dispatch a message without a peer!";
+ return;
+ }
+
+ if(QThread::currentThread() == _peer->thread())
+ _peer->handle(msg);
+ else
+ QCoreApplication::postEvent(_peer, new PeerMessageEvent<T>(this, eventType, msg));
+}
+
+
+template<class T>
+void InternalConnection::handle(const T &msg)
+{
+ if (!_proxy) {
+ qWarning() << Q_FUNC_INFO << "Cannot handle a message without having a signal proxy set!";
+ return;
+ }
+
+ _proxy->handle(this, msg);
+}
+
+
+void InternalConnection::customEvent(QEvent *event)
+{
+ switch ((int)event->type()) {
+ case SyncMessageEvent: {
+ PeerMessageEvent<SignalProxy::SyncMessage> *e = static_cast<PeerMessageEvent<SignalProxy::SyncMessage> *>(event);
+ handle(e->message);
+ break;
+ }
+ case RpcCallEvent: {
+ PeerMessageEvent<SignalProxy::RpcCall> *e = static_cast<PeerMessageEvent<SignalProxy::RpcCall> *>(event);
+ handle(e->message);
+ break;
+ }
+ case InitRequestEvent: {
+ PeerMessageEvent<SignalProxy::InitRequest> *e = static_cast<PeerMessageEvent<SignalProxy::InitRequest> *>(event);
+ handle(e->message);
+ break;
+ }
+ case InitDataEvent: {
+ PeerMessageEvent<SignalProxy::InitData> *e = static_cast<PeerMessageEvent<SignalProxy::InitData> *>(event);
+ handle(e->message);
+ break;
+ }
+
+ default:
+ qWarning() << Q_FUNC_INFO << "Received unknown custom event:" << event->type();
+ return;
+ }
+
+ event->accept();
+}
--- /dev/null
+/***************************************************************************
+ * Copyright (C) 2005-2012 by the Quassel Project *
+ * devel@quassel-irc.org *
+ * *
+ * This program is free software; you can redistribute it and/or modify *
+ * it under the terms of the GNU General Public License as published by *
+ * the Free Software Foundation; either version 2 of the License, or *
+ * (at your option) version 3. *
+ * *
+ * This program is distributed in the hope that it will be useful, *
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of *
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
+ * GNU General Public License for more details. *
+ * *
+ * You should have received a copy of the GNU General Public License *
+ * along with this program; if not, write to the *
+ * Free Software Foundation, Inc., *
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. *
+ ***************************************************************************/
+
+#ifndef INTERNALCONNECTION_H
+#define INTERNALCONNECTION_H
+
+#include <QTcpSocket>
+
+#include "signalproxy.h"
+
+class QEvent;
+
+class InternalConnection : public SignalProxy::AbstractPeer
+{
+ Q_OBJECT
+
+public:
+ enum EventType {
+ SyncMessageEvent = QEvent::User,
+ RpcCallEvent,
+ InitRequestEvent,
+ InitDataEvent
+ };
+
+ InternalConnection(QObject *parent = 0);
+ virtual ~InternalConnection();
+
+ QString description() const;
+
+ SignalProxy *signalProxy() const;
+ void setSignalProxy(SignalProxy *proxy);
+
+ InternalConnection *peer() const;
+ void setPeer(InternalConnection *peer);
+
+ bool isOpen() const;
+ bool isSecure() const;
+ bool isLocal() const;
+
+ int lag() const;
+
+ void dispatch(const SignalProxy::SyncMessage &msg);
+ void dispatch(const SignalProxy::RpcCall &msg);
+ void dispatch(const SignalProxy::InitRequest &msg);
+ void dispatch(const SignalProxy::InitData &msg);
+
+public slots:
+ void close(const QString &reason = QString());
+
+signals:
+
+ void disconnected();
+ void error(QAbstractSocket::SocketError);
+
+protected:
+ void customEvent(QEvent *event);
+
+private slots:
+ void peerDisconnected();
+
+private:
+ template<class T>
+ void dispatch(EventType eventType, const T &msg);
+
+ template<class T>
+ void handle(const T &msg);
+
+private:
+ SignalProxy *_proxy;
+ InternalConnection *_peer;
+ bool _isOpen;
+};
+
+
+#endif
--- /dev/null
+/***************************************************************************
+ * Copyright (C) 2005-2012 by the Quassel Project *
+ * devel@quassel-irc.org *
+ * *
+ * This program is free software; you can redistribute it and/or modify *
+ * it under the terms of the GNU General Public License as published by *
+ * the Free Software Foundation; either version 2 of the License, or *
+ * (at your option) version 3. *
+ * *
+ * This program is distributed in the hope that it will be useful, *
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of *
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
+ * GNU General Public License for more details. *
+ * *
+ * You should have received a copy of the GNU General Public License *
+ * along with this program; if not, write to the *
+ * Free Software Foundation, Inc., *
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. *
+ ***************************************************************************/
+
+#include "legacyconnection.h"
+
+LegacyConnection::LegacyConnection(QTcpSocket *socket, QObject *parent)
+ : RemoteConnection(socket, parent),
+ _blockSize(0),
+ _useCompression(false)
+{
+ _stream.setDevice(socket);
+ _stream.setVersion(QDataStream::Qt_4_2);
+
+ connect(socket, SIGNAL(readyRead()), SLOT(socketDataAvailable()));
+}
+
+
+void LegacyConnection::setSignalProxy(SignalProxy *proxy)
+{
+ RemoteConnection::setSignalProxy(proxy);
+
+ if (proxy) {
+ // enable compression now if requested - the initial handshake is uncompressed in the legacy protocol!
+ _useCompression = socket()->property("UseCompression").toBool();
+ }
+
+}
+
+
+void LegacyConnection::socketDataAvailable()
+{
+ QVariant item;
+ while (readSocketData(item)) {
+ // if no sigproxy is set, we're in handshake mode and let the data be handled elsewhere
+ if (!signalProxy())
+ emit dataReceived(item);
+ else
+ handlePackedFunc(item);
+ }
+}
+
+
+bool LegacyConnection::readSocketData(QVariant &item)
+{
+ if (_blockSize == 0) {
+ if (socket()->bytesAvailable() < 4)
+ return false;
+ _stream >> _blockSize;
+ }
+
+ if (_blockSize > 1 << 22) {
+ close("Peer tried to send package larger than max package size!");
+ return false;
+ }
+
+ if (_blockSize == 0) {
+ close("Peer tried to send 0 byte package!");
+ return false;
+ }
+
+ if (socket()->bytesAvailable() < _blockSize) {
+ emit transferProgress(socket()->bytesAvailable(), _blockSize);
+ return false;
+ }
+
+ emit transferProgress(_blockSize, _blockSize);
+
+ _blockSize = 0;
+
+ if (_useCompression) {
+ QByteArray rawItem;
+ _stream >> rawItem;
+
+ int nbytes = rawItem.size();
+ if (nbytes <= 4) {
+ const char *data = rawItem.constData();
+ if (nbytes < 4 || (data[0] != 0 || data[1] != 0 || data[2] != 0 || data[3] != 0)) {
+ close("Peer sent corrupted compressed data!");
+ return false;
+ }
+ }
+
+ rawItem = qUncompress(rawItem);
+
+ QDataStream itemStream(&rawItem, QIODevice::ReadOnly);
+ itemStream.setVersion(QDataStream::Qt_4_2);
+ itemStream >> item;
+ }
+ else {
+ _stream >> item;
+ }
+
+ if (!item.isValid()) {
+ close("Peer sent corrupt data: unable to load QVariant!");
+ return false;
+ }
+
+ return true;
+}
+
+
+void LegacyConnection::writeSocketData(const QVariant &item)
+{
+ if (!socket()->isOpen()) {
+ qWarning() << Q_FUNC_INFO << "Can't write to a closed socket!";
+ return;
+ }
+
+ QByteArray block;
+ QDataStream out(&block, QIODevice::WriteOnly);
+ out.setVersion(QDataStream::Qt_4_2);
+
+ if (_useCompression) {
+ QByteArray rawItem;
+ QDataStream itemStream(&rawItem, QIODevice::WriteOnly);
+ itemStream.setVersion(QDataStream::Qt_4_2);
+ itemStream << item;
+
+ rawItem = qCompress(rawItem);
+
+ out << rawItem;
+ }
+ else {
+ out << item;
+ }
+
+ _stream << block; // also writes the length as part of the serialization format
+}
+
+
+void LegacyConnection::handlePackedFunc(const QVariant &packedFunc)
+{
+ QVariantList params(packedFunc.toList());
+
+ if (params.isEmpty()) {
+ qWarning() << Q_FUNC_INFO << "Received incompatible data:" << packedFunc;
+ return;
+ }
+
+ RequestType requestType = (RequestType)params.takeFirst().value<int>();
+ switch (requestType) {
+ case Sync: {
+ if (params.count() < 3) {
+ qWarning() << Q_FUNC_INFO << "Received invalid sync call:" << params;
+ return;
+ }
+ QByteArray className = params.takeFirst().toByteArray();
+ QString objectName = params.takeFirst().toString();
+ QByteArray slotName = params.takeFirst().toByteArray();
+ handle(SignalProxy::SyncMessage(className, objectName, slotName, params));
+ break;
+ }
+ case RpcCall: {
+ if (params.empty()) {
+ qWarning() << Q_FUNC_INFO << "Received empty RPC call!";
+ return;
+ }
+ QByteArray slotName = params.takeFirst().toByteArray();
+ handle(SignalProxy::RpcCall(slotName, params));
+ break;
+ }
+ case InitRequest: {
+ if (params.count() != 2) {
+ qWarning() << Q_FUNC_INFO << "Received invalid InitRequest:" << params;
+ return;
+ }
+ QByteArray className = params[0].toByteArray();
+ QString objectName = params[1].toString();
+ handle(SignalProxy::InitRequest(className, objectName));
+ break;
+ }
+ case InitData: {
+ if (params.count() != 3) {
+ qWarning() << Q_FUNC_INFO << "Received invalid InitData:" << params;
+ return;
+ }
+ QByteArray className = params[0].toByteArray();
+ QString objectName = params[1].toString();
+ QVariantMap initData = params[2].toMap();
+ handle(SignalProxy::InitData(className, objectName, initData));
+ break;
+ }
+ case HeartBeat: {
+ if (params.count() != 1) {
+ qWarning() << Q_FUNC_INFO << "Received invalid HeartBeat:" << params;
+ return;
+ }
+ // The legacy protocol would only send a QTime, no QDateTime
+ // so we assume it's sent today, which works in exactly the same cases as it did in the old implementation
+ QDateTime dateTime = QDateTime::currentDateTimeUtc();
+ dateTime.setTime(params[0].toTime());
+ handle(RemoteConnection::HeartBeat(dateTime));
+ break;
+ }
+ case HeartBeatReply: {
+ if (params.count() != 1) {
+ qWarning() << Q_FUNC_INFO << "Received invalid HeartBeat:" << params;
+ return;
+ }
+ // The legacy protocol would only send a QTime, no QDateTime
+ // so we assume it's sent today, which works in exactly the same cases as it did in the old implementation
+ QDateTime dateTime = QDateTime::currentDateTimeUtc();
+ dateTime.setTime(params[0].toTime());
+ handle(RemoteConnection::HeartBeatReply(dateTime));
+ break;
+ }
+
+ }
+}
+
+
+void LegacyConnection::dispatch(const SignalProxy::SyncMessage &msg)
+{
+ dispatchPackedFunc(QVariantList() << (qint16)Sync << msg.className() << msg.objectName() << msg.slotName() << msg.params());
+}
+
+
+void LegacyConnection::dispatch(const SignalProxy::RpcCall &msg)
+{
+ dispatchPackedFunc(QVariantList() << (qint16)RpcCall << msg.slotName() << msg.params());
+}
+
+
+void LegacyConnection::dispatch(const SignalProxy::InitRequest &msg)
+{
+ dispatchPackedFunc(QVariantList() << (qint16)InitRequest << msg.className() << msg.objectName());
+}
+
+
+void LegacyConnection::dispatch(const SignalProxy::InitData &msg)
+{
+ dispatchPackedFunc(QVariantList() << (qint16)InitData << msg.className() << msg.objectName() << msg.initData());
+}
+
+
+void LegacyConnection::dispatch(const RemoteConnection::HeartBeat &msg)
+{
+ dispatchPackedFunc(QVariantList() << (qint16)HeartBeat << msg.timestamp().time());
+}
+
+
+void LegacyConnection::dispatch(const RemoteConnection::HeartBeatReply &msg)
+{
+ dispatchPackedFunc(QVariantList() << (qint16)HeartBeatReply << msg.timestamp().time());
+}
+
+
+void LegacyConnection::dispatchPackedFunc(const QVariantList &packedFunc)
+{
+ writeSocketData(QVariant(packedFunc));
+}
--- /dev/null
+/***************************************************************************
+ * Copyright (C) 2005-2012 by the Quassel Project *
+ * devel@quassel-irc.org *
+ * *
+ * This program is free software; you can redistribute it and/or modify *
+ * it under the terms of the GNU General Public License as published by *
+ * the Free Software Foundation; either version 2 of the License, or *
+ * (at your option) version 3. *
+ * *
+ * This program is distributed in the hope that it will be useful, *
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of *
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
+ * GNU General Public License for more details. *
+ * *
+ * You should have received a copy of the GNU General Public License *
+ * along with this program; if not, write to the *
+ * Free Software Foundation, Inc., *
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. *
+ ***************************************************************************/
+
+#ifndef LEGACYCONNECTION_H
+#define LEGACYCONNECTION_H
+
+#include <QDataStream>
+
+#include "../../remoteconnection.h"
+
+class QDataStream;
+
+class LegacyConnection : public RemoteConnection
+{
+ Q_OBJECT
+
+public:
+ enum RequestType {
+ Sync = 1,
+ RpcCall,
+ InitRequest,
+ InitData,
+ HeartBeat,
+ HeartBeatReply
+ };
+
+ LegacyConnection(QTcpSocket *socket, QObject *parent = 0);
+ ~LegacyConnection() {}
+
+ void setSignalProxy(SignalProxy *proxy);
+
+ void dispatch(const SignalProxy::SyncMessage &msg);
+ void dispatch(const SignalProxy::RpcCall &msg);
+ void dispatch(const SignalProxy::InitRequest &msg);
+ void dispatch(const SignalProxy::InitData &msg);
+
+ void dispatch(const RemoteConnection::HeartBeat &msg);
+ void dispatch(const RemoteConnection::HeartBeatReply &msg);
+
+ // FIXME: this is only used for the auth phase and should be replaced by something more generic
+ void writeSocketData(const QVariant &item);
+
+private slots:
+ void socketDataAvailable();
+
+private:
+ bool readSocketData(QVariant &item);
+ void handlePackedFunc(const QVariant &packedFunc);
+ void dispatchPackedFunc(const QVariantList &packedFunc);
+
+ QDataStream _stream;
+ qint32 _blockSize;
+ bool _useCompression;
+};
+
+#endif
--- /dev/null
+/***************************************************************************
+ * Copyright (C) 2005-2012 by the Quassel Project *
+ * devel@quassel-irc.org *
+ * *
+ * This program is free software; you can redistribute it and/or modify *
+ * it under the terms of the GNU General Public License as published by *
+ * the Free Software Foundation; either version 2 of the License, or *
+ * (at your option) version 3. *
+ * *
+ * This program is distributed in the hope that it will be useful, *
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of *
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
+ * GNU General Public License for more details. *
+ * *
+ * You should have received a copy of the GNU General Public License *
+ * along with this program; if not, write to the *
+ * Free Software Foundation, Inc., *
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. *
+ ***************************************************************************/
+
+#include <QHostAddress>
+#include <QTimer>
+
+#ifdef HAVE_SSL
+# include <QSslSocket>
+#endif
+
+#include "remoteconnection.h"
+
+RemoteConnection::RemoteConnection(QTcpSocket *socket, QObject *parent)
+ : SignalProxy::AbstractPeer(parent),
+ _socket(socket),
+ _signalProxy(0),
+ _heartBeatTimer(new QTimer(this)),
+ _heartBeatCount(0),
+ _lag(0)
+{
+ socket->setParent(this);
+ connect(socket, SIGNAL(disconnected()), SIGNAL(disconnected()));
+ connect(socket, SIGNAL(error(QAbstractSocket::SocketError)), SIGNAL(error(QAbstractSocket::SocketError)));
+
+#ifdef HAVE_SSL
+ QSslSocket *sslSocket = qobject_cast<QSslSocket *>(socket);
+ if (sslSocket)
+ connect(sslSocket, SIGNAL(encrypted()), SIGNAL(secureStateChanged()));
+#endif
+
+ connect(_heartBeatTimer, SIGNAL(timeout()), SLOT(sendHeartBeat()));
+}
+
+
+QString RemoteConnection::description() const
+{
+ if (socket())
+ return socket()->peerAddress().toString();
+
+ return QString();
+}
+
+
+SignalProxy *RemoteConnection::signalProxy() const
+{
+ return _signalProxy;
+}
+
+
+void RemoteConnection::setSignalProxy(SignalProxy *proxy)
+{
+ if (proxy == _signalProxy)
+ return;
+
+ if (!proxy) {
+ _heartBeatTimer->stop();
+ disconnect(signalProxy(), 0, this, 0);
+ _signalProxy = 0;
+ if (isOpen())
+ close();
+ }
+ else {
+ if (signalProxy()) {
+ qWarning() << Q_FUNC_INFO << "Setting another SignalProxy not supported, ignoring!";
+ return;
+ }
+ _signalProxy = proxy;
+ connect(proxy, SIGNAL(heartBeatIntervalChanged(int)), SLOT(changeHeartBeatInterval(int)));
+ _heartBeatTimer->setInterval(proxy->heartBeatInterval() * 1000);
+ _heartBeatTimer->start();
+ }
+}
+
+
+void RemoteConnection::changeHeartBeatInterval(int secs)
+{
+ if(secs <= 0)
+ _heartBeatTimer->stop();
+ else {
+ _heartBeatTimer->setInterval(secs * 1000);
+ _heartBeatTimer->start();
+ }
+}
+
+
+int RemoteConnection::lag() const
+{
+ return _lag;
+}
+
+
+QTcpSocket *RemoteConnection::socket() const
+{
+ return _socket;
+}
+
+
+bool RemoteConnection::isSecure() const
+{
+ if (socket()) {
+ if (isLocal())
+ return true;
+#ifdef HAVE_SSL
+ QSslSocket *sslSocket = qobject_cast<QSslSocket *>(socket());
+ if (sslSocket && sslSocket->isEncrypted())
+ return true;
+#endif
+ }
+ return false;
+}
+
+
+bool RemoteConnection::isLocal() const
+{
+ if (socket()) {
+ if (socket()->peerAddress() == QHostAddress::LocalHost || socket()->peerAddress() == QHostAddress::LocalHostIPv6)
+ return true;
+ }
+ return false;
+}
+
+
+bool RemoteConnection::isOpen() const
+{
+ return socket() && socket()->state() == QTcpSocket::ConnectedState;
+}
+
+
+void RemoteConnection::close(const QString &reason)
+{
+ if (!reason.isEmpty()) {
+ qWarning() << "Disconnecting:" << reason;
+ }
+
+ if (socket() && socket()->state() != QTcpSocket::UnconnectedState) {
+ socket()->disconnectFromHost();
+ }
+}
+
+
+void RemoteConnection::handle(const HeartBeat &heartBeat)
+{
+ dispatch(HeartBeatReply(heartBeat.timestamp()));
+}
+
+
+void RemoteConnection::handle(const HeartBeatReply &heartBeatReply)
+{
+ _heartBeatCount = 0;
+ emit lagUpdated(heartBeatReply.timestamp().msecsTo(QDateTime::currentDateTimeUtc()));
+}
+
+
+void RemoteConnection::sendHeartBeat()
+{
+ if (signalProxy()->maxHeartBeatCount() > 0 && _heartBeatCount >= signalProxy()->maxHeartBeatCount()) {
+ qWarning() << "Disconnecting peer:" << description()
+ << "(didn't receive a heartbeat for over" << _heartBeatCount *_heartBeatTimer->interval() / 1000 << "seconds)";
+ socket()->close();
+ _heartBeatTimer->stop();
+ return;
+ }
+
+ if (_heartBeatCount > 0) {
+ _lag = _heartBeatCount * _heartBeatTimer->interval();
+ emit lagUpdated(_lag);
+ }
+
+ dispatch(HeartBeat(QDateTime::currentDateTimeUtc()));
+ ++_heartBeatCount;
+}
+
+
--- /dev/null
+/***************************************************************************
+ * Copyright (C) 2005-2012 by the Quassel Project *
+ * devel@quassel-irc.org *
+ * *
+ * This program is free software; you can redistribute it and/or modify *
+ * it under the terms of the GNU General Public License as published by *
+ * the Free Software Foundation; either version 2 of the License, or *
+ * (at your option) version 3. *
+ * *
+ * This program is distributed in the hope that it will be useful, *
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of *
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
+ * GNU General Public License for more details. *
+ * *
+ * You should have received a copy of the GNU General Public License *
+ * along with this program; if not, write to the *
+ * Free Software Foundation, Inc., *
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. *
+ ***************************************************************************/
+
+#ifndef REMOTECONNECTION_H
+#define REMOTECONNECTION_H
+
+#include <QDateTime>
+#include <QTcpSocket>
+
+#include "signalproxy.h"
+
+class QTimer;
+
+class RemoteConnection : public SignalProxy::AbstractPeer
+{
+ Q_OBJECT
+
+public:
+ RemoteConnection(QTcpSocket *socket, QObject *parent = 0);
+ virtual ~RemoteConnection() {};
+
+ void setSignalProxy(SignalProxy *proxy);
+
+ QString description() const;
+
+ bool isOpen() const;
+ bool isSecure() const;
+ bool isLocal() const;
+
+ int lag() const;
+
+ bool compressionEnabled() const;
+ void setCompressionEnabled(bool enabled);
+
+ QTcpSocket *socket() const;
+
+ // this is only used for the auth phase and should be replaced by something more generic
+ virtual void writeSocketData(const QVariant &item) = 0;
+
+public slots:
+ void close(const QString &reason = QString());
+
+signals:
+ // this is only used for the auth phase and should be replaced by something more generic
+ void dataReceived(const QVariant &item);
+
+ void disconnected();
+ void error(QAbstractSocket::SocketError);
+
+ void transferProgress(int current, int max);
+
+protected:
+ class HeartBeat;
+ class HeartBeatReply;
+
+ SignalProxy *signalProxy() const;
+
+ template<class T>
+ void handle(const T &protoMessage);
+
+ void handle(const HeartBeat &heartBeat);
+ void handle(const HeartBeatReply &heartBeatReply);
+
+ virtual void dispatch(const HeartBeat &msg) = 0;
+ virtual void dispatch(const HeartBeatReply &msg) = 0;
+
+private slots:
+ void sendHeartBeat();
+ void changeHeartBeatInterval(int secs);
+
+private:
+ QTcpSocket *_socket;
+ SignalProxy *_signalProxy;
+ QTimer *_heartBeatTimer;
+ int _heartBeatCount;
+ int _lag;
+};
+
+
+// These protocol messages get handled internally and won't reach SignalProxy
+
+class RemoteConnection::HeartBeat
+{
+public:
+ inline HeartBeat(const QDateTime ×tamp) : _timestamp(timestamp) {}
+
+ inline QDateTime timestamp() const { return _timestamp; }
+
+private:
+ QDateTime _timestamp;
+};
+
+
+class RemoteConnection::HeartBeatReply
+{
+public:
+ inline HeartBeatReply(const QDateTime ×tamp) : _timestamp(timestamp) {}
+
+ inline QDateTime timestamp() const { return _timestamp; }
+
+private:
+ QDateTime _timestamp;
+};
+
+
+// Template methods we need in the header
+template<class T> inline
+void RemoteConnection::handle(const T &protoMessage)
+{
+ if (!signalProxy()) {
+ qWarning() << Q_FUNC_INFO << "Cannot handle messages without a SignalProxy!";
+ return;
+ }
+
+ // _heartBeatCount = 0;
+
+ signalProxy()->handle(this, protoMessage);
+}
+
+
+#endif
#include "syncableobject.h"
#include "util.h"
-// ==================================================
-// PeerSignalEvent
-// ==================================================
-class PeerSignalEvent : public QEvent
-{
-public:
- PeerSignalEvent(SignalProxy *sender, SignalProxy::RequestType requestType, const QVariantList ¶ms) : QEvent(QEvent::Type(SignalProxy::PeerSignal)), sender(sender), requestType(requestType), params(params) {}
- SignalProxy *sender;
- SignalProxy::RequestType requestType;
- QVariantList params;
-};
-
-
class RemovePeerEvent : public QEvent
{
public:
- RemovePeerEvent(QObject *peer) : QEvent(QEvent::Type(SignalProxy::RemovePeer)), peer(peer) {}
- QObject *peer;
+ RemovePeerEvent(SignalProxy::AbstractPeer *peer) : QEvent(QEvent::Type(SignalProxy::RemovePeerEvent)), peer(peer) {}
+ SignalProxy::AbstractPeer *peer;
};
const Signal &signal = _slots[_id];
QVariantList params;
- params << signal.signature;
const QList<int> &argTypes = eMeta->argTypes(signal.signalId);
for (int i = 0; i < argTypes.size(); i++) {
params << QVariant(argTypes[i], _a[i+1]);
}
- proxy()->dispatchSignal(SignalProxy::RpcCall, params);
+ proxy()->dispatch(SignalProxy::RpcCall(signal.signature, params));
}
_id -= _slots.count();
}
}
-// ==================================================
-// Peers
-// ==================================================
-void SignalProxy::IODevicePeer::dispatchSignal(const RequestType &requestType, const QVariantList ¶ms)
-{
- QVariantList packedFunc;
- packedFunc << (qint16)requestType
- << params;
- dispatchPackedFunc(QVariant(packedFunc));
-}
-
-
-bool SignalProxy::IODevicePeer::isSecure() const
-{
-#ifdef HAVE_SSL
- QSslSocket *sslSocket = qobject_cast<QSslSocket *>(_device);
- if (sslSocket)
- return sslSocket->isEncrypted() || sslSocket->localAddress() == QHostAddress::LocalHost || sslSocket->localAddress() == QHostAddress::LocalHostIPv6;
-#endif
-
- QAbstractSocket *socket = qobject_cast<QAbstractSocket *>(_device);
- if (socket)
- return socket->localAddress() == QHostAddress::LocalHost || socket->localAddress() == QHostAddress::LocalHostIPv6;
-
- return false;
-}
-
-
-QString SignalProxy::IODevicePeer::address() const
-{
- QAbstractSocket *socket = qobject_cast<QAbstractSocket *>(_device);
- if (socket)
- return socket->peerAddress().toString();
- else
- return QString();
-}
-
-
-void SignalProxy::SignalProxyPeer::dispatchSignal(const RequestType &requestType, const QVariantList ¶ms)
-{
- Qt::ConnectionType type = QThread::currentThread() == receiver->thread()
- ? Qt::DirectConnection
- : Qt::QueuedConnection;
-
- if (type == Qt::DirectConnection) {
- receiver->receivePeerSignal(sender, requestType, params);
- }
- else {
- QCoreApplication::postEvent(receiver, new PeerSignalEvent(sender, requestType, params));
- }
-}
-
-
// ==================================================
// SignalProxy
// ==================================================
}
-SignalProxy::SignalProxy(ProxyMode mode, QIODevice *device, QObject *parent)
- : QObject(parent)
-{
- setProxyMode(mode);
- addPeer(device);
- init();
-}
-
-
SignalProxy::~SignalProxy()
{
QHash<QByteArray, ObjectId>::iterator classIter = _syncSlave.begin();
void SignalProxy::setProxyMode(ProxyMode mode)
{
- PeerHash::iterator peer = _peers.begin();
- while (peer != _peers.end()) {
- if ((*peer)->type() != AbstractPeer::IODevicePeer) {
- IODevicePeer *ioPeer = static_cast<IODevicePeer *>(*peer);
- if (ioPeer->isOpen()) {
- qWarning() << "SignalProxy: Cannot change proxy mode while connected";
- return;
- }
- }
- if ((*peer)->type() != AbstractPeer::SignalProxyPeer) {
- qWarning() << "SignalProxy: Cannot change proxy mode while connected to another internal SignalProxy";
- return;
- }
- peer++;
+ if (_peers.count()) {
+ qWarning() << Q_FUNC_INFO << "Cannot change proxy mode while connected";
+ return;
}
_proxyMode = mode;
_heartBeatInterval = 0;
_maxHeartBeatCount = 0;
_signalRelay = new SignalRelay(this);
- connect(&_heartBeatTimer, SIGNAL(timeout()), this, SLOT(sendHeartBeat()));
setHeartBeatInterval(30);
setMaxHeartBeatCount(2);
- _heartBeatTimer.start();
_secure = false;
updateSecureState();
}
void SignalProxy::initClient()
{
- attachSlot("__objectRenamed__", this, SLOT(objectRenamed(QByteArray, QString, QString)));
-}
-
-
-bool SignalProxy::addPeer(QIODevice *iodev)
-{
- if (!iodev)
- return false;
-
- if (_peers.contains(iodev))
- return true;
-
- if (proxyMode() == Client && !_peers.isEmpty()) {
- qWarning("SignalProxy: only one peer allowed in client mode!");
- return false;
- }
-
- if (!iodev->isOpen()) {
- qWarning("SignalProxy::addPeer(QIODevice *iodev): iodev needs to be open!");
- return false;
- }
-
- connect(iodev, SIGNAL(disconnected()), this, SLOT(removePeerBySender()));
- connect(iodev, SIGNAL(readyRead()), this, SLOT(dataAvailable()));
-
-#ifdef HAVE_SSL
- QSslSocket *sslSocket = qobject_cast<QSslSocket *>(iodev);
- if (sslSocket) {
- connect(iodev, SIGNAL(encrypted()), this, SLOT(updateSecureState()));
- }
-#endif
-
- if (!iodev->parent())
- iodev->setParent(this);
-
- _peers[iodev] = new IODevicePeer(iodev, iodev->property("UseCompression").toBool());
-
- if (_peers.count() == 1)
- emit connected();
-
- updateSecureState();
- return true;
+ attachSlot("__objectRenamed__", this, SLOT(objectRenamed(QByteArray,QString,QString)));
}
void SignalProxy::setHeartBeatInterval(int secs)
{
- if (secs != _heartBeatInterval) {
+ if (_heartBeatInterval != secs) {
_heartBeatInterval = secs;
- _heartBeatTimer.setInterval(secs * 1000);
+ emit heartBeatIntervalChanged(secs);
}
}
void SignalProxy::setMaxHeartBeatCount(int max)
{
- _maxHeartBeatCount = max;
+ if (_maxHeartBeatCount != max) {
+ _maxHeartBeatCount = max;
+ emit maxHeartBeatCountChanged(max);
+ }
}
-bool SignalProxy::addPeer(SignalProxy *proxy)
+bool SignalProxy::addPeer(AbstractPeer *peer)
{
- if (!proxy)
+ if (!peer)
return false;
- if (proxyMode() == proxy->proxyMode()) {
- qWarning() << "SignalProxy::addPeer(): adding a SignalProxy as peer requires one proxy to be server and one client!";
+ if (_peers.contains(peer))
+ return true;
+
+ if (!peer->isOpen()) {
+ qWarning("SignalProxy: peer needs to be open!");
return false;
}
- if (_peers.contains(proxy)) {
- return true;
+ if (proxyMode() == Client) {
+ if (!_peers.isEmpty()) {
+ qWarning("SignalProxy: only one peer allowed in client mode!");
+ return false;
+ }
+ connect(peer, SIGNAL(lagUpdated(int)), SIGNAL(lagUpdated(int)));
}
- if (proxyMode() == Client && !_peers.isEmpty()) {
- qWarning("SignalProxy: only one peer allowed in client mode!");
- return false;
- }
+ connect(peer, SIGNAL(disconnected()), SLOT(removePeerBySender()));
+ connect(peer, SIGNAL(secureStateChanged(bool)), SLOT(updateSecureState()));
+
+ if (!peer->parent())
+ peer->setParent(this);
- _peers[proxy] = new SignalProxyPeer(this, proxy);
- proxy->addPeer(this);
+ _peers.insert(peer);
+
+ peer->setSignalProxy(this);
if (_peers.count() == 1)
emit connected();
{
Q_ASSERT(proxyMode() == Server || _peers.count() <= 1);
// wee need to copy that list since we modify it in the loop
- QList<QObject *> peers = _peers.keys();
- foreach(QObject *peer, peers) {
+ QSet<AbstractPeer *> peers = _peers;
+ foreach(AbstractPeer *peer, peers) {
removePeer(peer);
}
}
-void SignalProxy::removePeer(QObject *dev)
+void SignalProxy::removePeer(AbstractPeer *peer)
{
+ if (!peer) {
+ qWarning() << Q_FUNC_INFO << "Trying to remove a null peer!";
+ return;
+ }
+
if (_peers.isEmpty()) {
qWarning() << "SignalProxy::removePeer(): No peers in use!";
return;
}
- Q_ASSERT(dev);
- if (!_peers.contains(dev)) {
- qWarning() << "SignalProxy: unknown Peer" << dev;
+ if (!_peers.contains(peer)) {
+ qWarning() << "SignalProxy: unknown Peer" << peer;
return;
}
- AbstractPeer *peer = _peers[dev];
- _peers.remove(dev);
-
- disconnect(dev, 0, this, 0);
- if (peer->type() == AbstractPeer::IODevicePeer)
- emit peerRemoved(static_cast<QIODevice *>(dev));
-
- if (peer->type() == AbstractPeer::SignalProxyPeer) {
- SignalProxy *proxy = static_cast<SignalProxy *>(dev);
- if (proxy->_peers.contains(this))
- proxy->removePeer(this);
- }
+ disconnect(peer, 0, this, 0);
+ peer->setSignalProxy(0);
- if (dev->parent() == this)
- dev->deleteLater();
+ _peers.remove(peer);
+ emit peerRemoved(peer);
- delete peer;
+ if (peer->parent() == this)
+ peer->deleteLater();
updateSecureState();
void SignalProxy::removePeerBySender()
{
- removePeer(sender());
+ removePeer(qobject_cast<SignalProxy::AbstractPeer *>(sender()));
}
const QByteArray className(meta->className());
objectRenamed(className, newname, oldname);
- QVariantList params;
- params << "__objectRenamed__" << className << newname << oldname;
- dispatchSignal(RpcCall, params);
+ dispatch(RpcCall("__objectRenamed__", QVariantList() << className << newname << oldname));
}
}
-void SignalProxy::dispatchSignal(const RequestType &requestType, const QVariantList ¶ms)
+template<class T>
+void SignalProxy::dispatch(const T &protoMessage)
{
- QVariant packedFunc(QVariantList() << (qint16)requestType << params);
- PeerHash::iterator peer = _peers.begin();
- while (peer != _peers.end()) {
- switch ((*peer)->type()) {
- case AbstractPeer::IODevicePeer:
- {
- IODevicePeer *ioPeer = static_cast<IODevicePeer *>(*peer);
- if (ioPeer->isOpen())
- ioPeer->dispatchPackedFunc(packedFunc);
- else
- QCoreApplication::postEvent(this, new RemovePeerEvent(peer.key()));
- }
- break;
- case AbstractPeer::SignalProxyPeer:
- (*peer)->dispatchSignal(requestType, params);
- break;
- default:
- Q_ASSERT(false); // there shouldn't be any peers with wrong / unknown type
- }
- peer++;
- }
-}
-
-
-void SignalProxy::receivePackedFunc(AbstractPeer *sender, const QVariant &packedFunc)
-{
- QVariantList params(packedFunc.toList());
-
- if (params.isEmpty()) {
- qWarning() << "SignalProxy::receivePeerSignal(): received incompatible Data:" << packedFunc;
- return;
- }
-
- RequestType requestType = (RequestType)params.takeFirst().value<int>();
- receivePeerSignal(sender, requestType, params);
-}
-
-
-void SignalProxy::receivePeerSignal(AbstractPeer *sender, const RequestType &requestType, const QVariantList ¶ms)
-{
- switch (requestType) {
- // list all RequestTypes that shouldnot trigger a heartbeat counter reset here
- case HeartBeatReply:
- break;
- default:
- if (sender->type() == AbstractPeer::IODevicePeer) {
- IODevicePeer *ioPeer = static_cast<IODevicePeer *>(sender);
- ioPeer->sentHeartBeats = 0;
- }
- }
-
- // qDebug() << "SignalProxy::receivePeerSignal)" << requestType << params;
- switch (requestType) {
- case RpcCall:
- if (params.empty())
- qWarning() << "SignalProxy::receivePeerSignal(): received empty RPC-Call";
+ foreach (AbstractPeer *peer, _peers) {
+ if (peer->isOpen())
+ peer->dispatch(protoMessage);
else
- handleSignal(params);
- //handleSignal(params.takeFirst().toByteArray(), params);
- break;
-
- case Sync:
- handleSync(sender, params);
- break;
-
- case InitRequest:
- handleInitRequest(sender, params);
- break;
-
- case InitData:
- handleInitData(sender, params);
- break;
-
- case HeartBeat:
- receiveHeartBeat(sender, params);
- break;
-
- case HeartBeatReply:
- receiveHeartBeatReply(sender, params);
- break;
-
- default:
- qWarning() << "SignalProxy::receivePeerSignal(): received undefined CallType" << requestType << params;
- }
-}
-
-
-void SignalProxy::receivePeerSignal(SignalProxy *sender, const RequestType &requestType, const QVariantList ¶ms)
-{
- if (!_peers.contains(sender)) {
- // we output only the pointer value. otherwise Qt would try to pretty print. As the object might already been destroyed, this is not a good idea.
- qWarning() << "SignalProxy::receivePeerSignal(): received Signal from unknown Proxy" << reinterpret_cast<void *>(sender);
- return;
+ QCoreApplication::postEvent(this, new ::RemovePeerEvent(peer));
}
- receivePeerSignal(_peers[sender], requestType, params);
}
-void SignalProxy::handleSync(AbstractPeer *sender, QVariantList params)
+void SignalProxy::handle(SignalProxy::AbstractPeer *peer, const SignalProxy::SyncMessage &syncMessage)
{
- if (params.count() < 3) {
- qWarning() << "received invalid Sync call" << params;
- return;
- }
-
- QByteArray className = params.takeFirst().toByteArray();
- QString objectName = params.takeFirst().toString();
- QByteArray slot = params.takeFirst().toByteArray();
-
- if (!_syncSlave.contains(className) || !_syncSlave[className].contains(objectName)) {
- qWarning() << QString("no registered receiver for sync call: %1::%2 (objectName=\"%3\"). Params are:").arg(QString(className)).arg(QString(slot)).arg(objectName)
- << params;
+ if (!_syncSlave.contains(syncMessage.className()) || !_syncSlave[syncMessage.className()].contains(syncMessage.objectName())) {
+ qWarning() << QString("no registered receiver for sync call: %1::%2 (objectName=\"%3\"). Params are:").arg(syncMessage.className(), syncMessage.slotName(), syncMessage.objectName())
+ << syncMessage.params();
return;
}
- SyncableObject *receiver = _syncSlave[className][objectName];
+ SyncableObject *receiver = _syncSlave[syncMessage.className()][syncMessage.objectName()];
ExtendedMetaObject *eMeta = extendedMetaObject(receiver);
- if (!eMeta->slotMap().contains(slot)) {
- qWarning() << QString("no matching slot for sync call: %1::%2 (objectName=\"%3\"). Params are:").arg(QString(className)).arg(QString(slot)).arg(objectName)
- << params;
+ if (!eMeta->slotMap().contains(syncMessage.slotName())) {
+ qWarning() << QString("no matching slot for sync call: %1::%2 (objectName=\"%3\"). Params are:").arg(syncMessage.className(), syncMessage.slotName(), syncMessage.objectName())
+ << syncMessage.params();
return;
}
- int slotId = eMeta->slotMap()[slot];
+ int slotId = eMeta->slotMap()[syncMessage.slotName()];
if (proxyMode() != eMeta->receiverMode(slotId)) {
qWarning("SignalProxy::handleSync(): invokeMethod for \"%s\" failed. Wrong ProxyMode!", eMeta->methodName(slotId).constData());
return;
}
QVariant returnValue((QVariant::Type)eMeta->returnType(slotId));
- if (!invokeSlot(receiver, slotId, params, returnValue)) {
+ if (!invokeSlot(receiver, slotId, syncMessage.params(), returnValue)) {
qWarning("SignalProxy::handleSync(): invokeMethod for \"%s\" failed ", eMeta->methodName(slotId).constData());
return;
}
if (returnValue.type() != QVariant::Invalid && eMeta->receiveMap().contains(slotId)) {
int receiverId = eMeta->receiveMap()[slotId];
QVariantList returnParams;
- returnParams << className
- << objectName
- << eMeta->methodName(receiverId);
- //QByteArray(receiver->metaObject()->method(receiverId).signature());
if (eMeta->argTypes(receiverId).count() > 1)
- returnParams << params;
+ returnParams << syncMessage.params();
returnParams << returnValue;
- sender->dispatchSignal(Sync, returnParams);
+ peer->dispatch(SyncMessage(syncMessage.className(), syncMessage.objectName(), eMeta->methodName(receiverId), returnParams));
}
// send emit update signal
}
-void SignalProxy::handleInitRequest(AbstractPeer *sender, const QVariantList ¶ms)
+void SignalProxy::handle(SignalProxy::AbstractPeer *peer, const SignalProxy::InitRequest &initRequest)
{
- if (params.count() != 2) {
- qWarning() << "SignalProxy::handleInitRequest() received initRequest with invalid param Count:"
- << params;
- return;
- }
-
- QByteArray className(params[0].toByteArray());
- QString objectName(params[1].toString());
-
- if (!_syncSlave.contains(className)) {
+ if (!_syncSlave.contains(initRequest.className())) {
qWarning() << "SignalProxy::handleInitRequest() received initRequest for unregistered Class:"
- << className;
+ << initRequest.className();
return;
}
- if (!_syncSlave[className].contains(objectName)) {
+ if (!_syncSlave[initRequest.className()].contains(initRequest.objectName())) {
qWarning() << "SignalProxy::handleInitRequest() received initRequest for unregistered Object:"
- << className << objectName;
+ << initRequest.className() << initRequest.objectName();
return;
}
- SyncableObject *obj = _syncSlave[className][objectName];
-
- QVariantList params_;
- params_ << className
- << objectName
- << initData(obj);
-
- sender->dispatchSignal(InitData, params_);
+ SyncableObject *obj = _syncSlave[initRequest.className()][initRequest.objectName()];
+ peer->dispatch(InitData(initRequest.className(), initRequest.objectName(), initData(obj)));
}
-void SignalProxy::handleInitData(AbstractPeer *sender, const QVariantList ¶ms)
+void SignalProxy::handle(SignalProxy::AbstractPeer *peer, const SignalProxy::InitData &initData)
{
- Q_UNUSED(sender)
- if (params.count() != 3) {
- qWarning() << "SignalProxy::handleInitData() received initData with invalid param Count:"
- << params;
- return;
- }
-
- QByteArray className(params[0].toByteArray());
- QString objectName(params[1].toString());
- QVariantMap propertyMap(params[2].toMap());
+ Q_UNUSED(peer)
- if (!_syncSlave.contains(className)) {
+ if (!_syncSlave.contains(initData.className())) {
qWarning() << "SignalProxy::handleInitData() received initData for unregistered Class:"
- << className;
+ << initData.className();
return;
}
- if (!_syncSlave[className].contains(objectName)) {
+ if (!_syncSlave[initData.className()].contains(initData.objectName())) {
qWarning() << "SignalProxy::handleInitData() received initData for unregistered Object:"
- << className << objectName;
+ << initData.className() << initData.objectName();
return;
}
- SyncableObject *obj = _syncSlave[className][objectName];
- setInitData(obj, propertyMap);
+ SyncableObject *obj = _syncSlave[initData.className()][initData.objectName()];
+ setInitData(obj, initData.initData());
}
-//void SignalProxy::handleSignal(const QByteArray &funcName, const QVariantList ¶ms) {
-void SignalProxy::handleSignal(const QVariantList &data)
+void SignalProxy::handle(SignalProxy::AbstractPeer *peer, const SignalProxy::RpcCall &rpcCall)
{
- QVariantList params = data;
- QByteArray funcName = params.takeFirst().toByteArray();
+ Q_UNUSED(peer)
QObject *receiver;
int methodId;
- SlotHash::const_iterator slot = _attachedSlots.constFind(funcName);
- while (slot != _attachedSlots.constEnd() && slot.key() == funcName) {
+ SlotHash::const_iterator slot = _attachedSlots.constFind(rpcCall.slotName());
+ while (slot != _attachedSlots.constEnd() && slot.key() == rpcCall.slotName()) {
receiver = (*slot).first;
methodId = (*slot).second;
- if (!invokeSlot(receiver, methodId, params)) {
+ if (!invokeSlot(receiver, methodId, rpcCall.params())) {
ExtendedMetaObject *eMeta = extendedMetaObject(receiver);
qWarning("SignalProxy::handleSignal(): invokeMethod for \"%s\" failed ", eMeta->methodName(methodId).constData());
}
- slot++;
+ ++slot;
}
}
}
-void SignalProxy::dataAvailable()
-{
- // yet again. it's a private slot. no need for checks.
- QIODevice *ioDev = qobject_cast<QIODevice *>(sender());
- Q_ASSERT(_peers.contains(ioDev) && _peers[ioDev]->type() == AbstractPeer::IODevicePeer);
- IODevicePeer *peer = static_cast<IODevicePeer *>(_peers[ioDev]);
- QVariant var;
- while (peer->readData(var))
- receivePackedFunc(peer, var);
-}
-
-
-void SignalProxy::writeDataToDevice(QIODevice *dev, const QVariant &item, bool compressed)
-{
- QAbstractSocket *sock = qobject_cast<QAbstractSocket *>(dev);
- if (!dev->isOpen() || (sock && sock->state() != QAbstractSocket::ConnectedState)) {
- qWarning("SignalProxy: Can't call write on a closed device");
- return;
- }
-
- QByteArray block;
- QDataStream out(&block, QIODevice::WriteOnly);
- out.setVersion(QDataStream::Qt_4_2);
- out << (quint32)0;
-
- if (compressed) {
- QByteArray rawItem;
- QDataStream itemStream(&rawItem, QIODevice::WriteOnly);
-
- itemStream.setVersion(QDataStream::Qt_4_2);
- itemStream << item;
-
- rawItem = qCompress(rawItem);
-
- out << rawItem;
- }
- else {
- out << item;
- }
-
- out.device()->seek(0);
- out << (quint32)(block.size() - sizeof(quint32));
-
- dev->write(block);
-}
-
-
-bool SignalProxy::readDataFromDevice(QIODevice *dev, quint32 &blockSize, QVariant &item, bool compressed)
-{
- if (!dev)
- return false;
-
- QDataStream in(dev);
- in.setVersion(QDataStream::Qt_4_2);
-
- if (blockSize == 0) {
- if (dev->bytesAvailable() < (int)sizeof(quint32)) return false;
- in >> blockSize;
- }
-
- if (blockSize > 1 << 22) {
- disconnectDevice(dev, tr("Peer tried to send package larger than max package size!"));
- return false;
- }
-
- if (blockSize == 0) {
- disconnectDevice(dev, tr("Peer tried to send 0 byte package!"));
- return false;
- }
-
- if (dev->bytesAvailable() < blockSize)
- return false;
-
- blockSize = 0;
-
- if (compressed) {
- QByteArray rawItem;
- in >> rawItem;
-
- int nbytes = rawItem.size();
- if (nbytes <= 4) {
- const char *data = rawItem.constData();
- if (nbytes < 4 || (data[0] != 0 || data[1] != 0 || data[2] != 0 || data[3] != 0)) {
- disconnectDevice(dev, tr("Peer sent corrupted compressed data!"));
- return false;
- }
- }
-
- rawItem = qUncompress(rawItem);
-
- QDataStream itemStream(&rawItem, QIODevice::ReadOnly);
- itemStream.setVersion(QDataStream::Qt_4_2);
- itemStream >> item;
- }
- else {
- in >> item;
- }
-
- if (!item.isValid()) {
- disconnectDevice(dev, tr("Peer sent corrupt data: unable to load QVariant!"));
- return false;
- }
-
- return true;
-}
-
-
void SignalProxy::requestInit(SyncableObject *obj)
{
if (proxyMode() == Server || obj->isInitialized())
return;
- QVariantList params;
- params << obj->syncMetaObject()->className()
- << obj->objectName();
- dispatchSignal(InitRequest, params);
+ dispatch(InitRequest(obj->syncMetaObject()->className(), obj->objectName()));
}
}
-void SignalProxy::sendHeartBeat()
-{
- QVariantList heartBeatParams;
- heartBeatParams << QTime::currentTime();
- QList<IODevicePeer *> toClose;
-
- PeerHash::iterator peer = _peers.begin();
- while (peer != _peers.end()) {
- if ((*peer)->type() == AbstractPeer::IODevicePeer) {
- IODevicePeer *ioPeer = static_cast<IODevicePeer *>(*peer);
- ioPeer->dispatchSignal(SignalProxy::HeartBeat, heartBeatParams);
- if (ioPeer->sentHeartBeats > 0) {
- updateLag(ioPeer, ioPeer->sentHeartBeats * _heartBeatTimer.interval());
- }
- if (maxHeartBeatCount() >= 0 && ioPeer->sentHeartBeats >= maxHeartBeatCount())
- toClose.append(ioPeer);
- else
- ioPeer->sentHeartBeats++;
- }
- ++peer;
- }
-
- foreach(IODevicePeer *ioPeer, toClose) {
- qWarning() << "SignalProxy: Disconnecting peer:" << ioPeer->address()
- << "(didn't receive a heartbeat for over" << ioPeer->sentHeartBeats *_heartBeatTimer.interval() / 1000 << "seconds)";
- ioPeer->close();
- }
-}
-
-
-void SignalProxy::receiveHeartBeat(AbstractPeer *peer, const QVariantList ¶ms)
-{
- peer->dispatchSignal(SignalProxy::HeartBeatReply, params);
-}
-
-
-void SignalProxy::receiveHeartBeatReply(AbstractPeer *peer, const QVariantList ¶ms)
-{
- if (peer->type() != AbstractPeer::IODevicePeer) {
- qWarning() << "SignalProxy::receiveHeartBeatReply: received heart beat from a non IODevicePeer!";
- return;
- }
-
- IODevicePeer *ioPeer = static_cast<IODevicePeer *>(peer);
- ioPeer->sentHeartBeats = 0;
-
- if (params.isEmpty()) {
- qWarning() << "SignalProxy: received heart beat reply with less params then sent from:" << ioPeer->address();
- return;
- }
-
- QTime sendTime = params[0].value<QTime>();
- updateLag(ioPeer, sendTime.msecsTo(QTime::currentTime()) / 2);
-}
-
-
void SignalProxy::customEvent(QEvent *event)
{
- switch (+event->type()) {
- case PeerSignal:
- {
- PeerSignalEvent *e = static_cast<PeerSignalEvent *>(event);
- receivePeerSignal(e->sender, e->requestType, e->params);
- }
+ switch ((int)event->type()) {
+ case RemovePeerEvent: {
+ ::RemovePeerEvent *e = static_cast< ::RemovePeerEvent *>(event);
+ removePeer(e->peer);
event->accept();
break;
- case RemovePeer:
- {
- RemovePeerEvent *e = static_cast<RemovePeerEvent *>(event);
- removePeer(e->peer);
}
- event->accept();
+
default:
+ qWarning() << Q_FUNC_INFO << "Received unknown custom event:" << event->type();
return;
}
}
ExtendedMetaObject *eMeta = extendedMetaObject(obj);
QVariantList params;
- params << eMeta->metaObject()->className()
- << obj->objectName()
- << QByteArray(funcname);
const QList<int> &argTypes = eMeta->argTypes(eMeta->methodId(QByteArray(funcname)));
params << QVariant(argTypes[i], va_arg(ap, void *));
}
- dispatchSignal(Sync, params);
+ dispatch(SyncMessage(eMeta->metaObject()->className(), obj->objectName(), QByteArray(funcname), params));
}
}
-void SignalProxy::updateLag(IODevicePeer *peer, int lag)
-{
- peer->lag = lag;
- if (proxyMode() == Client) {
- emit lagUpdated(lag);
- }
-}
-
-
void SignalProxy::dumpProxyStats()
{
QString mode;
bool wasSecure = _secure;
_secure = !_peers.isEmpty();
- PeerHash::const_iterator peerIter;
- for (peerIter = _peers.constBegin(); peerIter != _peers.constEnd(); peerIter++) {
- _secure &= (*peerIter)->isSecure();
+ foreach (const AbstractPeer *peer, _peers) {
+ _secure &= peer->isSecure();
}
if (wasSecure != _secure)
#ifndef SIGNALPROXY_H
#define SIGNALPROXY_H
+#include <QAbstractSocket>
#include <QEvent>
#include <QList>
#include <QHash>
#include <QVariant>
#include <QVariantMap>
#include <QPair>
+#include <QSet>
#include <QString>
#include <QByteArray>
#include <QTimer>
{
Q_OBJECT
- class AbstractPeer;
- class IODevicePeer;
- class SignalProxyPeer;
-
class SignalRelay;
public:
+ class AbstractPeer;
+
+ class SyncMessage;
+ class RpcCall;
+ class InitRequest;
+ class InitData;
+
enum ProxyMode {
Server,
Client
};
- enum RequestType {
- Sync = 1,
- RpcCall,
- InitRequest,
- InitData,
- HeartBeat,
- HeartBeatReply
- };
-
- enum CustomEvents {
- PeerSignal = QEvent::User,
- RemovePeer
+ enum EventType {
+ RemovePeerEvent = QEvent::User
};
SignalProxy(QObject *parent);
SignalProxy(ProxyMode mode, QObject *parent);
- SignalProxy(ProxyMode mode, QIODevice *device, QObject *parent);
virtual ~SignalProxy();
void setProxyMode(ProxyMode mode);
void setMaxHeartBeatCount(int max);
inline int maxHeartBeatCount() const { return _maxHeartBeatCount; }
- bool addPeer(QIODevice *iodev);
- bool addPeer(SignalProxy *proxy);
- void removePeer(QObject *peer);
- void removeAllPeers();
+ bool addPeer(AbstractPeer *peer);
bool attachSignal(QObject *sender, const char *signal, const QByteArray &sigName = QByteArray());
bool attachSlot(const QByteArray &sigName, QObject *recv, const char *slot);
void synchronize(SyncableObject *obj);
void stopSynchronize(SyncableObject *obj);
- //! Writes a QVariant to a device.
- /** The data item is prefixed with the resulting blocksize,
- * so the corresponding function readDataFromDevice() can check if enough data is available
- * at the device to reread the item.
- */
- static void writeDataToDevice(QIODevice *dev, const QVariant &item, bool compressed = false);
-
- //! Reads a data item from a device that has been written by writeDataToDevice().
- /** If not enough data bytes are available, the function returns false and the QVariant reference
- * remains untouched.
- */
- static bool readDataFromDevice(QIODevice *dev, quint32 &blockSize, QVariant &item, bool compressed = false);
-
class ExtendedMetaObject;
ExtendedMetaObject *extendedMetaObject(const QMetaObject *meta) const;
ExtendedMetaObject *createExtendedMetaObject(const QMetaObject *meta, bool checkConflicts = false);
bool isSecure() const { return _secure; }
void dumpProxyStats();
+ void dumpSyncMap(SyncableObject *object);
+ inline int peerCount() const { return _peers.size(); }
public slots:
void detachObject(QObject *obj);
void renameObject(const SyncableObject *obj, const QString &newname, const QString &oldname);
private slots:
- void dataAvailable();
void removePeerBySender();
void objectRenamed(const QByteArray &classname, const QString &newname, const QString &oldname);
- void sendHeartBeat();
- void receiveHeartBeat(AbstractPeer *peer, const QVariantList ¶ms);
- void receiveHeartBeatReply(AbstractPeer *peer, const QVariantList ¶ms);
-
void updateSecureState();
signals:
- void peerRemoved(QIODevice *dev);
+ void peerRemoved(SignalProxy::AbstractPeer *peer);
void connected();
void disconnected();
void objectInitialized(SyncableObject *);
+ void heartBeatIntervalChanged(int secs);
+ void maxHeartBeatCountChanged(int max);
void lagUpdated(int lag);
- void securityChanged(bool);
void secureStateChanged(bool);
private:
+ template<class T>
+ class PeerMessageEvent;
+
void init();
void initServer();
void initClient();
static const QMetaObject *metaObject(const QObject *obj);
- void dispatchSignal(QIODevice *receiver, const RequestType &requestType, const QVariantList ¶ms);
- void dispatchSignal(const RequestType &requestType, const QVariantList ¶ms);
+ void removePeer(AbstractPeer *peer);
+ void removeAllPeers();
+
+ template<class T>
+ void dispatch(const T &protoMessage);
- void receivePackedFunc(AbstractPeer *sender, const QVariant &packedFunc);
- void receivePeerSignal(AbstractPeer *sender, const RequestType &requestType, const QVariantList ¶ms);
- void receivePeerSignal(SignalProxy *sender, const RequestType &requestType, const QVariantList ¶ms);
- void handleSync(AbstractPeer *sender, QVariantList params);
- void handleInitRequest(AbstractPeer *sender, const QVariantList ¶ms);
- void handleInitData(AbstractPeer *sender, const QVariantList ¶ms);
- void handleSignal(const QVariantList &data);
+ void handle(AbstractPeer *peer, const SyncMessage &syncMessage);
+ void handle(AbstractPeer *peer, const RpcCall &rpcCall);
+ void handle(AbstractPeer *peer, const InitRequest &initRequest);
+ void handle(AbstractPeer *peer, const InitData &initData);
bool invokeSlot(QObject *receiver, int methodId, const QVariantList ¶ms, QVariant &returnValue);
bool invokeSlot(QObject *receiver, int methodId, const QVariantList ¶ms = QVariantList());
QVariantMap initData(SyncableObject *obj) const;
void setInitData(SyncableObject *obj, const QVariantMap &properties);
- void updateLag(IODevicePeer *peer, int lag);
-
-public:
- void dumpSyncMap(SyncableObject *object);
- inline int peerCount() const { return _peers.size(); }
-
-private:
static void disconnectDevice(QIODevice *dev, const QString &reason = QString());
- // a Hash of the actual used communication object to it's corresponding peer
- // currently a communication object can either be an arbitrary QIODevice or another SignalProxy
- typedef QHash<QObject *, AbstractPeer *> PeerHash;
- PeerHash _peers;
+ QSet<AbstractPeer *> _peers;
// containg a list of argtypes for fast access
QHash<const QMetaObject *, ExtendedMetaObject *> _extendedMetaObjects;
QHash<QByteArray, ObjectId> _syncSlave;
ProxyMode _proxyMode;
- QTimer _heartBeatTimer;
int _heartBeatInterval;
int _maxHeartBeatCount;
friend class SignalRelay;
friend class SyncableObject;
+ friend class InternalConnection;
+ friend class RemoteConnection;
};
// ==================================================
-// Peers
+// AbstractPeer
// ==================================================
-class SignalProxy::AbstractPeer
+class SignalProxy::AbstractPeer : public QObject
{
+ Q_OBJECT
+
public:
- enum PeerType {
- NotAPeer = 0,
- IODevicePeer = 1,
- SignalProxyPeer = 2
- };
- AbstractPeer() : _type(NotAPeer) {}
- AbstractPeer(PeerType type) : _type(type) {}
- virtual ~AbstractPeer() {}
- inline PeerType type() const { return _type; }
- virtual void dispatchSignal(const RequestType &requestType, const QVariantList ¶ms) = 0;
+ AbstractPeer(QObject *parent = 0) : QObject(parent) {}
+
+ virtual QString description() const = 0;
+
+ virtual void setSignalProxy(SignalProxy *proxy) = 0;
+
+ virtual bool isOpen() const = 0;
virtual bool isSecure() const = 0;
-private:
- PeerType _type;
+ virtual bool isLocal() const = 0;
+
+ virtual QString errorString() const { return QString(); }
+
+ virtual int lag() const = 0;
+
+public slots:
+ virtual void dispatch(const SyncMessage &msg) = 0;
+ virtual void dispatch(const RpcCall &msg) = 0;
+ virtual void dispatch(const InitRequest &msg) = 0;
+ virtual void dispatch(const InitData &msg) = 0;
+
+ virtual void close(const QString &reason = QString()) = 0;
+
+signals:
+ void disconnected();
+ void error(QAbstractSocket::SocketError);
+ void secureStateChanged(bool secure = true);
+ void lagUpdated(int msecs);
};
-class SignalProxy::IODevicePeer : public SignalProxy::AbstractPeer
+// ==================================================
+// Protocol Messages
+// ==================================================
+class SignalProxy::SyncMessage
{
public:
- IODevicePeer(QIODevice *device, bool compress) : AbstractPeer(AbstractPeer::IODevicePeer), _device(device), byteCount(0), usesCompression(compress), sentHeartBeats(0), lag(0) {}
- virtual void dispatchSignal(const RequestType &requestType, const QVariantList ¶ms);
- virtual bool isSecure() const;
- inline void dispatchPackedFunc(const QVariant &packedFunc) { SignalProxy::writeDataToDevice(_device, packedFunc, usesCompression); }
- QString address() const;
- inline bool isOpen() const { return _device->isOpen(); }
- inline void close() const { _device->close(); }
- inline bool readData(QVariant &item) { return SignalProxy::readDataFromDevice(_device, byteCount, item, usesCompression); }
+ inline SyncMessage(const QByteArray &className, const QString &objectName, const QByteArray &slotName, const QVariantList ¶ms)
+ : _className(className), _objectName(objectName), _slotName(slotName), _params(params) {}
+
+ inline QByteArray className() const { return _className; }
+ inline QString objectName() const { return _objectName; }
+ inline QByteArray slotName() const { return _slotName; }
+
+ inline QVariantList params() const { return _params; }
+
private:
- QIODevice *_device;
- quint32 byteCount;
- bool usesCompression;
+ QByteArray _className;
+ QString _objectName;
+ QByteArray _slotName;
+ QVariantList _params;
+};
+
+
+class SignalProxy::RpcCall
+{
public:
- int sentHeartBeats;
- int lag;
+ inline RpcCall(const QByteArray &slotName, const QVariantList ¶ms)
+ : _slotName(slotName), _params(params) {}
+
+ inline QByteArray slotName() const { return _slotName; }
+ inline QVariantList params() const { return _params; }
+
+private:
+ QByteArray _slotName;
+ QVariantList _params;
};
-class SignalProxy::SignalProxyPeer : public SignalProxy::AbstractPeer
+class SignalProxy::InitRequest
{
public:
- SignalProxyPeer(SignalProxy *sender, SignalProxy *receiver) : AbstractPeer(AbstractPeer::SignalProxyPeer), sender(sender), receiver(receiver) {}
- virtual void dispatchSignal(const RequestType &requestType, const QVariantList ¶ms);
- virtual inline bool isSecure() const { return true; }
+ inline InitRequest(const QByteArray &className, const QString &objectName)
+ : _className(className), _objectName(objectName) {}
+
+ inline QByteArray className() const { return _className; }
+ inline QString objectName() const { return _objectName; }
+
private:
- SignalProxy *sender;
- SignalProxy *receiver;
+ QByteArray _className;
+ QString _objectName;
};
+class SignalProxy::InitData
+{
+public:
+ inline InitData(const QByteArray &className, const QString &objectName, const QVariantMap &initData)
+ : _className(className), _objectName(objectName), _initData(initData) {}
+
+ inline QByteArray className() const { return _className; }
+ inline QString objectName() const { return _objectName; }
+
+ inline QVariantMap initData() const { return _initData; }
+
+private:
+ QByteArray _className;
+ QString _objectName;
+ QVariantMap _initData;
+};
+
#endif
#include "core.h"
#include "coresession.h"
#include "coresettings.h"
+#include "internalconnection.h"
#include "postgresqlstorage.h"
#include "quassel.h"
#include "signalproxy.h"
#include "util.h"
+#include "protocols/legacy/legacyconnection.h"
+
// migration related
#include <QFile>
#ifdef Q_OS_WIN32
class AddClientEvent : public QEvent
{
public:
- AddClientEvent(QTcpSocket *socket, UserId uid) : QEvent(QEvent::Type(Core::AddClientEventId)), socket(socket), userId(uid) {}
- QTcpSocket *socket;
+ AddClientEvent(RemoteConnection *connection, UserId uid) : QEvent(QEvent::Type(Core::AddClientEventId)), connection(connection), userId(uid) {}
+ RemoteConnection *connection;
UserId userId;
};
Core::~Core()
{
- foreach(QTcpSocket *socket, blocksizes.keys()) {
- socket->disconnectFromHost(); // disconnect non authed clients
+ foreach(RemoteConnection *connection, clientInfo.keys()) {
+ connection->close(); // disconnect non authed clients
}
qDeleteAll(sessions);
qDeleteAll(_storageBackends);
Q_ASSERT(server);
while (server->hasPendingConnections()) {
QTcpSocket *socket = server->nextPendingConnection();
- connect(socket, SIGNAL(disconnected()), this, SLOT(clientDisconnected()));
- connect(socket, SIGNAL(readyRead()), this, SLOT(clientHasData()));
- connect(socket, SIGNAL(error(QAbstractSocket::SocketError)), this, SLOT(socketError(QAbstractSocket::SocketError)));
+ RemoteConnection *connection = new LegacyConnection(socket, this);
+
+ connect(connection, SIGNAL(disconnected()), SLOT(clientDisconnected()));
+ connect(connection, SIGNAL(dataReceived(QVariant)), SLOT(processClientMessage(QVariant)));
+ connect(connection, SIGNAL(error(QAbstractSocket::SocketError)), SLOT(socketError(QAbstractSocket::SocketError)));
- QVariantMap clientInfo;
- blocksizes.insert(socket, (quint32)0);
+ clientInfo.insert(connection, QVariantMap());
quInfo() << qPrintable(tr("Client connected from")) << qPrintable(socket->peerAddress().toString());
if (!_configured) {
}
-void Core::clientHasData()
+void Core::processClientMessage(const QVariant &data)
{
- QTcpSocket *socket = dynamic_cast<QTcpSocket *>(sender());
- Q_ASSERT(socket && blocksizes.contains(socket));
- QVariant item;
- while (SignalProxy::readDataFromDevice(socket, blocksizes[socket], item)) {
- QVariantMap msg = item.toMap();
- processClientMessage(socket, msg);
- if (!blocksizes.contains(socket)) break; // this socket is no longer ours to handle!
+ RemoteConnection *connection = qobject_cast<RemoteConnection *>(sender());
+ if (!connection) {
+ qWarning() << Q_FUNC_INFO << "Message not sent by RemoteConnection!";
+ return;
}
-}
-
-void Core::processClientMessage(QTcpSocket *socket, const QVariantMap &msg)
-{
+ QVariantMap msg = data.toMap();
if (!msg.contains("MsgType")) {
// Client is way too old, does not even use the current init format
qWarning() << qPrintable(tr("Antique client trying to connect... refusing."));
- socket->close();
+ connection->close();
return;
}
+
// OK, so we have at least an init message format we can understand
if (msg["MsgType"] == "ClientInit") {
QVariantMap reply;
reply["Error"] = tr("<b>Your Quassel Client is too old!</b><br>"
"This core needs at least client/core protocol version %1.<br>"
"Please consider upgrading your client.").arg(Quassel::buildInfo().coreNeedsProtocol);
- SignalProxy::writeDataToDevice(socket, reply);
- qWarning() << qPrintable(tr("Client")) << qPrintable(socket->peerAddress().toString()) << qPrintable(tr("too old, rejecting."));
- socket->close(); return;
+ connection->writeSocketData(reply);
+ qWarning() << qPrintable(tr("Client")) << connection->description() << qPrintable(tr("too old, rejecting."));
+ connection->close();
+ return;
}
reply["ProtocolVersion"] = Quassel::buildInfo().protocolVersion;
#ifdef HAVE_SSL
SslServer *sslServer = qobject_cast<SslServer *>(&_server);
- QSslSocket *sslSocket = qobject_cast<QSslSocket *>(socket);
- bool supportSsl = (bool)sslServer && (bool)sslSocket && sslServer->isCertValid();
+ QSslSocket *sslSocket = qobject_cast<QSslSocket *>(connection->socket());
+ bool supportSsl = sslServer && sslSocket && sslServer->isCertValid();
#else
bool supportSsl = false;
#endif
else {
reply["Configured"] = true;
}
- clientInfo[socket] = msg; // store for future reference
+ clientInfo[connection] = msg; // store for future reference
reply["MsgType"] = "ClientInitAck";
- SignalProxy::writeDataToDevice(socket, reply);
- socket->flush(); // ensure that the write cache is flushed before we switch to ssl
+ connection->writeSocketData(reply);
+ connection->socket()->flush(); // ensure that the write cache is flushed before we switch to ssl
#ifdef HAVE_SSL
// after we told the client that we are ssl capable we switch to ssl mode
if (supportSsl && msg["UseSsl"].toBool()) {
- qDebug() << qPrintable(tr("Starting TLS for Client:")) << qPrintable(socket->peerAddress().toString());
- connect(sslSocket, SIGNAL(sslErrors(const QList<QSslError> &)), this, SLOT(sslErrors(const QList<QSslError> &)));
+ qDebug() << qPrintable(tr("Starting TLS for Client:")) << connection->description();
+ connect(sslSocket, SIGNAL(sslErrors(const QList<QSslError> &)), SLOT(sslErrors(const QList<QSslError> &)));
sslSocket->startServerEncryption();
}
#endif
#ifndef QT_NO_COMPRESS
if (supportsCompression && msg["UseCompression"].toBool()) {
- socket->setProperty("UseCompression", true);
- qDebug() << "Using compression for Client:" << qPrintable(socket->peerAddress().toString());
+ connection->socket()->setProperty("UseCompression", true);
+ qDebug() << "Using compression for Client:" << qPrintable(connection->socket()->peerAddress().toString());
}
#endif
}
else {
// for the rest, we need an initialized connection
- if (!clientInfo.contains(socket)) {
+ if (!clientInfo.contains(connection)) {
QVariantMap reply;
reply["MsgType"] = "ClientLoginReject";
reply["Error"] = tr("<b>Client not initialized!</b><br>You need to send an init message before trying to login.");
- SignalProxy::writeDataToDevice(socket, reply);
- qWarning() << qPrintable(tr("Client")) << qPrintable(socket->peerAddress().toString()) << qPrintable(tr("did not send an init message before trying to login, rejecting."));
- socket->close(); return;
+ connection->writeSocketData(reply);
+ qWarning() << qPrintable(tr("Client")) << qPrintable(connection->socket()->peerAddress().toString()) << qPrintable(tr("did not send an init message before trying to login, rejecting."));
+ connection->close(); return;
}
if (msg["MsgType"] == "CoreSetupData") {
QVariantMap reply;
else {
reply["MsgType"] = "CoreSetupAck";
}
- SignalProxy::writeDataToDevice(socket, reply);
+ connection->writeSocketData(reply);
}
else if (msg["MsgType"] == "ClientLogin") {
QVariantMap reply;
if (uid == 0) {
reply["MsgType"] = "ClientLoginReject";
reply["Error"] = tr("<b>Invalid username or password!</b><br>The username/password combination you supplied could not be found in the database.");
- SignalProxy::writeDataToDevice(socket, reply);
+ connection->writeSocketData(reply);
return;
}
reply["MsgType"] = "ClientLoginAck";
- SignalProxy::writeDataToDevice(socket, reply);
- quInfo() << qPrintable(tr("Client")) << qPrintable(socket->peerAddress().toString()) << qPrintable(tr("initialized and authenticated successfully as \"%1\" (UserId: %2).").arg(msg["User"].toString()).arg(uid.toInt()));
- setupClientSession(socket, uid);
+ connection->writeSocketData(reply);
+ quInfo() << qPrintable(tr("Client")) << qPrintable(connection->socket()->peerAddress().toString()) << qPrintable(tr("initialized and authenticated successfully as \"%1\" (UserId: %2).").arg(msg["User"].toString()).arg(uid.toInt()));
+ setupClientSession(connection, uid);
}
}
}
// Potentially called during the initialization phase (before handing the connection off to the session)
void Core::clientDisconnected()
{
- QTcpSocket *socket = qobject_cast<QTcpSocket *>(sender());
- if (socket) {
- // here it's safe to call methods on socket!
- quInfo() << qPrintable(tr("Non-authed client disconnected.")) << qPrintable(socket->peerAddress().toString());
- blocksizes.remove(socket);
- clientInfo.remove(socket);
- socket->deleteLater();
- }
- else {
- // we have to crawl through the hashes and see if we find a victim to remove
- qDebug() << qPrintable(tr("Non-authed client disconnected. (socket allready destroyed)"));
+ RemoteConnection *connection = qobject_cast<RemoteConnection *>(sender());
+ Q_ASSERT(connection);
- // DO NOT CALL ANY METHODS ON socket!!
- socket = static_cast<QTcpSocket *>(sender());
-
- QHash<QTcpSocket *, quint32>::iterator blockSizeIter = blocksizes.begin();
- while (blockSizeIter != blocksizes.end()) {
- if (blockSizeIter.key() == socket) {
- blockSizeIter = blocksizes.erase(blockSizeIter);
- }
- else {
- blockSizeIter++;
- }
- }
-
- QHash<QTcpSocket *, QVariantMap>::iterator clientInfoIter = clientInfo.begin();
- while (clientInfoIter != clientInfo.end()) {
- if (clientInfoIter.key() == socket) {
- clientInfoIter = clientInfo.erase(clientInfoIter);
- }
- else {
- clientInfoIter++;
- }
- }
- }
+ quInfo() << qPrintable(tr("Non-authed client disconnected.")) << qPrintable(connection->socket()->peerAddress().toString());
+ clientInfo.remove(connection);
+ connection->deleteLater();
// make server listen again if still not configured
if (!_configured) {
}
-void Core::setupClientSession(QTcpSocket *socket, UserId uid)
+void Core::setupClientSession(RemoteConnection *connection, UserId uid)
{
// From now on everything is handled by the client session
- disconnect(socket, 0, this, 0);
- socket->flush();
- blocksizes.remove(socket);
- clientInfo.remove(socket);
+ disconnect(connection, 0, this, 0);
+ connection->socket()->flush();
+ clientInfo.remove(connection);
// Find or create session for validated user
SessionThread *session;
else {
session = createSession(uid);
if (!session) {
- qWarning() << qPrintable(tr("Could not initialize session for client:")) << qPrintable(socket->peerAddress().toString());
- socket->close();
+ qWarning() << qPrintable(tr("Could not initialize session for client:")) << qPrintable(connection->socket()->peerAddress().toString());
+ connection->close();
return;
}
}
// as we are currently handling an event triggered by incoming data on this socket
// it is unsafe to directly move the socket to the client thread.
- QCoreApplication::postEvent(this, new AddClientEvent(socket, uid));
+ QCoreApplication::postEvent(this, new AddClientEvent(connection, uid));
}
{
if (event->type() == AddClientEventId) {
AddClientEvent *addClientEvent = static_cast<AddClientEvent *>(event);
- addClientHelper(addClientEvent->socket, addClientEvent->userId);
+ addClientHelper(addClientEvent->connection, addClientEvent->userId);
return;
}
}
-void Core::addClientHelper(QTcpSocket *socket, UserId uid)
+void Core::addClientHelper(RemoteConnection *connection, UserId uid)
{
// Find or create session for validated user
if (!sessions.contains(uid)) {
- qWarning() << qPrintable(tr("Could not find a session for client:")) << qPrintable(socket->peerAddress().toString());
- socket->close();
+ qWarning() << qPrintable(tr("Could not find a session for client:")) << qPrintable(connection->socket()->peerAddress().toString());
+ connection->close();
return;
}
SessionThread *session = sessions[uid];
- session->addClient(socket);
+ session->addClient(connection);
}
-void Core::setupInternalClientSession(SignalProxy *proxy)
+void Core::setupInternalClientSession(InternalConnection *clientConnection)
{
if (!_configured) {
stopListening();
return;
}
+ InternalConnection *coreConnection = new InternalConnection(this);
+ coreConnection->setPeer(clientConnection);
+ clientConnection->setPeer(coreConnection);
+
// Find or create session for validated user
- SessionThread *sess;
+ SessionThread *sessionThread;
if (sessions.contains(uid))
- sess = sessions[uid];
+ sessionThread = sessions[uid];
else
- sess = createSession(uid);
- sess->addClient(proxy);
+ sessionThread = createSession(uid);
+
+ sessionThread->addClient(coreConnection);
}
void Core::socketError(QAbstractSocket::SocketError err)
{
- QAbstractSocket *socket = qobject_cast<QAbstractSocket *>(sender());
- if (socket && err != QAbstractSocket::RemoteHostClosedError)
- qWarning() << "Core::socketError()" << socket << err << socket->errorString();
+ RemoteConnection *connection = qobject_cast<RemoteConnection *>(sender());
+ if (connection && err != QAbstractSocket::RemoteHostClosedError)
+ qWarning() << "Core::socketError()" << connection->socket() << err << connection->socket()->errorString();
}
#include "types.h"
class CoreSession;
+class RemoteConnection;
+struct NetworkInfo;
class SessionThread;
class SignalProxy;
-struct NetworkInfo;
class AbstractSqlMigrationReader;
class AbstractSqlMigrationWriter;
/** \note This method is threadsafe.
*/
void syncStorage();
- void setupInternalClientSession(SignalProxy *proxy);
+ void setupInternalClientSession(InternalConnection *clientConnection);
signals:
//! Sent when a BufferInfo is updated in storage.
bool startListening();
void stopListening(const QString &msg = QString());
void incomingConnection();
- void clientHasData();
void clientDisconnected();
bool initStorage(const QString &backend, QVariantMap settings, bool setup = false);
#endif
void socketError(QAbstractSocket::SocketError);
+ void processClientMessage(const QVariant &data);
+
private:
Core();
~Core();
static Core *instanceptr;
SessionThread *createSession(UserId userId, bool restoreState = false);
- void setupClientSession(QTcpSocket *socket, UserId uid);
- void addClientHelper(QTcpSocket *socket, UserId uid);
- void processClientMessage(QTcpSocket *socket, const QVariantMap &msg);
+ void setupClientSession(RemoteConnection *connection, UserId uid);
+ void addClientHelper(RemoteConnection *connection, UserId uid);
//void processCoreSetup(QTcpSocket *socket, QVariantMap &msg);
QString setupCoreForInternalUsage();
QString setupCore(QVariantMap setupData);
OidentdConfigGenerator *_oidentdConfigGenerator;
- QHash<QTcpSocket *, quint32> blocksizes;
- QHash<QTcpSocket *, QVariantMap> clientInfo;
+ QHash<RemoteConnection *, QVariantMap> clientInfo;
QHash<QString, Storage *> _storageBackends;
#include "coreusersettings.h"
#include "ctcpparser.h"
#include "eventstringifier.h"
+#include "internalconnection.h"
#include "ircchannel.h"
#include "ircparser.h"
#include "ircuser.h"
#include "logger.h"
#include "messageevent.h"
-#include "signalproxy.h"
#include "storage.h"
#include "util.h"
+#include "protocols/legacy/legacyconnection.h"
+
class ProcessMessagesEvent : public QEvent
{
public:
CoreSession::CoreSession(UserId uid, bool restoreState, QObject *parent)
: QObject(parent),
_user(uid),
- _signalProxy(new SignalProxy(SignalProxy::Server, 0, this)),
+ _signalProxy(new SignalProxy(SignalProxy::Server, this)),
_aliasManager(this),
_bufferSyncer(new CoreBufferSyncer(this)),
_backlogManager(new CoreBacklogManager(this)),
p->setHeartBeatInterval(30);
p->setMaxHeartBeatCount(60); // 30 mins until we throw a dead socket out
- connect(p, SIGNAL(peerRemoved(QIODevice *)), this, SLOT(removeClient(QIODevice *)));
+ connect(p, SIGNAL(peerRemoved(SignalProxy::AbstractPeer*)), SLOT(removeClient(SignalProxy::AbstractPeer*)));
- connect(p, SIGNAL(connected()), this, SLOT(clientsConnected()));
- connect(p, SIGNAL(disconnected()), this, SLOT(clientsDisconnected()));
+ connect(p, SIGNAL(connected()), SLOT(clientsConnected()));
+ connect(p, SIGNAL(disconnected()), SLOT(clientsDisconnected()));
p->attachSlot(SIGNAL(sendInput(BufferInfo, QString)), this, SLOT(msgFromClient(BufferInfo, QString)));
p->attachSignal(this, SIGNAL(displayMsg(Message)));
}
-void CoreSession::addClient(QIODevice *device)
+void CoreSession::addClient(RemoteConnection *connection)
{
- if (!device) {
- qCritical() << "Invoking CoreSession::addClient with a QObject that is not a QIODevice!";
- }
- else {
- // if the socket is an orphan, the signalProxy adopts it.
- // -> we don't need to care about it anymore
- device->setParent(0);
- signalProxy()->addPeer(device);
- QVariantMap reply;
- reply["MsgType"] = "SessionInit";
- reply["SessionState"] = sessionState();
- SignalProxy::writeDataToDevice(device, reply);
- }
+ QVariantMap reply;
+ reply["MsgType"] = "SessionInit";
+ reply["SessionState"] = sessionState();
+ connection->writeSocketData(reply);
+ signalProxy()->addPeer(connection);
}
-void CoreSession::addClient(SignalProxy *proxy)
+void CoreSession::addClient(InternalConnection *connection)
{
- signalProxy()->addPeer(proxy);
+ signalProxy()->addPeer(connection);
emit sessionState(sessionState());
}
-void CoreSession::removeClient(QIODevice *iodev)
+void CoreSession::removeClient(SignalProxy::AbstractPeer *peer)
{
- QTcpSocket *socket = qobject_cast<QTcpSocket *>(iodev);
- if (socket)
- quInfo() << qPrintable(tr("Client")) << qPrintable(socket->peerAddress().toString()) << qPrintable(tr("disconnected (UserId: %1).").arg(user().toInt()));
+ RemoteConnection *connection = qobject_cast<RemoteConnection *>(peer);
+ if (connection)
+ quInfo() << qPrintable(tr("Client")) << connection->description() << qPrintable(tr("disconnected (UserId: %1).").arg(user().toInt()));
}
#include "corealiasmanager.h"
#include "coreignorelistmanager.h"
#include "message.h"
+#include "signalproxy.h"
#include "storage.h"
class CoreBacklogManager;
class CtcpParser;
class EventManager;
class EventStringifier;
+class InternalConnection;
class IrcParser;
class MessageEvent;
class NetworkConnection;
-class SignalProxy;
+class RemoteConnection;
struct NetworkInfo;
void restoreSessionState();
public slots:
- void addClient(QIODevice *device);
- void addClient(SignalProxy *proxy);
+ void addClient(RemoteConnection *connection);
+ void addClient(InternalConnection *connection);
void msgFromClient(BufferInfo, QString message);
virtual void customEvent(QEvent *event);
private slots:
- void removeClient(QIODevice *dev);
+ void removeClient(SignalProxy::AbstractPeer *peer);
void recvStatusMsgFromServer(QString msg);
void recvMessageFromServer(NetworkId networkId, Message::Type, BufferInfo::Type, const QString &target, const QString &text, const QString &sender = "", Message::Flags flags = Message::None);
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. *
***************************************************************************/
-#include <QMutexLocker>
-
+#include "core.h"
+#include "coresession.h"
+#include "internalconnection.h"
+#include "remoteconnection.h"
#include "sessionthread.h"
#include "signalproxy.h"
-#include "coresession.h"
-#include "core.h"
SessionThread::SessionThread(UserId uid, bool restoreState, QObject *parent)
: QThread(parent),
}
+// this and the following related methods are executed in the Core thread!
void SessionThread::addClient(QObject *peer)
{
if (isSessionInitialized()) {
void SessionThread::addClientToSession(QObject *peer)
{
- QIODevice *socket = qobject_cast<QIODevice *>(peer);
- if (socket) {
- addRemoteClientToSession(socket);
+ RemoteConnection *connection = qobject_cast<RemoteConnection *>(peer);
+ if (connection) {
+ addRemoteClientToSession(connection);
return;
}
- SignalProxy *proxy = qobject_cast<SignalProxy *>(peer);
- if (proxy) {
- addInternalClientToSession(proxy);
+ InternalConnection *internal = qobject_cast<InternalConnection *>(peer);
+ if (internal) {
+ addInternalClientToSession(internal);
return;
}
- qWarning() << "SessionThread::addClient() received neither QIODevice nor SignalProxy as peer!" << peer;
+ qWarning() << "SessionThread::addClient() received invalid peer!" << peer;
}
-void SessionThread::addRemoteClientToSession(QIODevice *socket)
+void SessionThread::addRemoteClientToSession(RemoteConnection *connection)
{
- socket->setParent(0);
- socket->moveToThread(session()->thread());
- emit addRemoteClient(socket);
+ connection->setParent(0);
+ connection->moveToThread(session()->thread());
+ emit addRemoteClient(connection);
}
-void SessionThread::addInternalClientToSession(SignalProxy *proxy)
+void SessionThread::addInternalClientToSession(InternalConnection *connection)
{
- emit addInternalClient(proxy);
+ connection->setParent(0);
+ connection->moveToThread(session()->thread());
+ emit addInternalClient(connection);
}
void SessionThread::run()
{
_session = new CoreSession(user(), _restoreState);
- connect(this, SIGNAL(addRemoteClient(QIODevice *)), _session, SLOT(addClient(QIODevice *)));
- connect(this, SIGNAL(addInternalClient(SignalProxy *)), _session, SLOT(addClient(SignalProxy *)));
- connect(_session, SIGNAL(sessionState(const QVariant &)), Core::instance(), SIGNAL(sessionState(const QVariant &)));
+ connect(this, SIGNAL(addRemoteClient(RemoteConnection*)), _session, SLOT(addClient(RemoteConnection*)));
+ connect(this, SIGNAL(addInternalClient(InternalConnection*)), _session, SLOT(addClient(InternalConnection*)));
+ connect(_session, SIGNAL(sessionState(QVariant)), Core::instance(), SIGNAL(sessionState(QVariant)));
emit initialized();
exec();
delete _session;
#include "types.h"
class CoreSession;
+class InternalConnection;
+class RemoteConnection;
class QIODevice;
-class SignalProxy;
class SessionThread : public QThread
{
void initialized();
void shutdown();
- void addRemoteClient(QIODevice *);
- void addInternalClient(SignalProxy *);
+ void addRemoteClient(RemoteConnection *);
+ void addInternalClient(InternalConnection *);
private:
CoreSession *_session;
bool isSessionInitialized();
void addClientToSession(QObject *peer);
- void addRemoteClientToSession(QIODevice *socket);
- void addInternalClientToSession(SignalProxy *proxy);
+ void addRemoteClientToSession(RemoteConnection *connection);
+ void addInternalClientToSession(InternalConnection *client);
};
#include "core.h"
#include "qtui.h"
+class InternalConnection;
+
MonolithicApplication::MonolithicApplication(int &argc, char **argv)
: QtUiApplication(argc, argv),
_internalInitDone(false)
connect(Client::coreConnection(), SIGNAL(startInternalCore()), SLOT(startInternalCore()));
+ // FIXME what's this for?
if (isOptionSet("port")) {
startInternalCore();
}
}
Core *core = Core::instance();
CoreConnection *connection = Client::coreConnection();
- connect(connection, SIGNAL(connectToInternalCore(SignalProxy *)), core, SLOT(setupInternalClientSession(SignalProxy *)));
- connect(core, SIGNAL(sessionState(const QVariant &)), connection, SLOT(internalSessionStateReceived(const QVariant &)));
+ connect(connection, SIGNAL(connectToInternalCore(InternalConnection*)), core, SLOT(setupInternalClientSession(InternalConnection*)));
+ connect(core, SIGNAL(sessionState(QVariant)), connection, SLOT(internalSessionStateReceived(QVariant)));
}