From b2169e5f4cbd3ce724c4808b62ddc2b8941219a5 Mon Sep 17 00:00:00 2001 From: Manuel Nickschas Date: Sun, 16 Feb 2014 17:29:28 +0100 Subject: [PATCH] Move all socket handling into RemotePeer Previously we had the RemotePeer subclasses reading from and writing to the socket themselves, resulting in code duplication and also making it hard to have a single point for implementing compression support later. We assume that any protocol we may come up with will send the size of a datagram followed by the datagram itself, thus there's no need to implement that part in the individual peers. And if we ever come up with something else, we can always make some methods virtual and override them for the special case. We avoid using QDataStream in RemotePeer, as we are only reading/writing QByteArrays anyway. That way, we can also replace the direct calls to the socket by the layer implementing compression later, without it being a proper QIODevice. --- .../protocols/datastream/datastreampeer.cpp | 132 +++++------------- .../protocols/datastream/datastreampeer.h | 15 +- src/common/protocols/legacy/legacypeer.cpp | 93 ++++-------- src/common/protocols/legacy/legacypeer.h | 15 +- src/common/remotepeer.cpp | 69 ++++++++- src/common/remotepeer.h | 10 +- 6 files changed, 147 insertions(+), 187 deletions(-) diff --git a/src/common/protocols/datastream/datastreampeer.cpp b/src/common/protocols/datastream/datastreampeer.cpp index 86fbf53a..294dc6e6 100644 --- a/src/common/protocols/datastream/datastreampeer.cpp +++ b/src/common/protocols/datastream/datastreampeer.cpp @@ -29,13 +29,9 @@ using namespace Protocol; DataStreamPeer::DataStreamPeer(::AuthHandler *authHandler, QTcpSocket *socket, quint16 features, QObject *parent) - : RemotePeer(authHandler, socket, parent), - _blockSize(0) + : RemotePeer(authHandler, socket, parent) { Q_UNUSED(features); - - _stream.setDevice(socket); - _stream.setVersion(QDataStream::Qt_4_2); } @@ -58,93 +54,26 @@ quint16 DataStreamPeer::enabledFeatures() const } -// Note that we're already preparing for readSocketData() moving into RemotePeer, thus the slightly -// cumbersome type and stream handling here. -void DataStreamPeer::onSocketDataAvailable() -{ - // don't try to read more data if we're already closing - if (socket()->state() != QAbstractSocket::ConnectedState) - return; - - QByteArray data; - while (readSocketData(data)) { - // data contains always a serialized QVector - QDataStream stream(data); - stream.setVersion(QDataStream::Qt_4_2); - QVariantList list; - stream >> list; - if (stream.status() != QDataStream::Ok) { - close("Peer sent corrupt data, closing down!"); - return; - } - - // if no sigproxy is set, we're in handshake mode - if (!signalProxy()) - handleHandshakeMessage(list); - else - handlePackedFunc(list); - } -} - - -bool DataStreamPeer::readSocketData(QByteArray &data) +void DataStreamPeer::processMessage(const QByteArray &msg) { - if (_blockSize == 0) { - if (socket()->bytesAvailable() < 4) - return false; - // the block size is part of QByteArray's serialization format, so we don't actually read it now... - socket()->peek((char*)&_blockSize, 4); - _blockSize = qFromBigEndian(_blockSize) + 4; // ... but of course we have to add its size to the total size of the block - } - - if (_blockSize > 1 << 22) { - close("Peer tried to send package larger than max package size!"); - return false; - } - - if (_blockSize == 0) { - close("Peer tried to send 0 byte package!"); - return false; - } - - if (socket()->bytesAvailable() < _blockSize) { - emit transferProgress(socket()->bytesAvailable(), _blockSize); - return false; - } - - emit transferProgress(_blockSize, _blockSize); - - _stream >> data; - _blockSize = 0; - - if (_stream.status() != QDataStream::Ok) { + QDataStream stream(msg); + stream.setVersion(QDataStream::Qt_4_2); + QVariantList list; + stream >> list; + if (stream.status() != QDataStream::Ok) { close("Peer sent corrupt data, closing down!"); - return false; - } - - return true; -} - - -void DataStreamPeer::writeSocketData(const QVariantList &list) -{ - if (!socket()->isOpen()) { - qWarning() << Q_FUNC_INFO << "Can't write to a closed socket!"; return; } - QByteArray data; - QDataStream msgStream(&data, QIODevice::WriteOnly); - msgStream.setVersion(QDataStream::Qt_4_2); - msgStream << list; - - _stream << data; // also writes the block size as part of the serialization format - if (_stream.status() != QDataStream::Ok) - close("Could not serialize data for peer!"); + // if no sigproxy is set, we're in handshake mode + if (!signalProxy()) + handleHandshakeMessage(list); + else + handlePackedFunc(list); } -void DataStreamPeer::writeSocketData(const QVariantMap &handshakeMsg) +void DataStreamPeer::writeMessage(const QVariantMap &handshakeMsg) { QVariantList list; QVariantMap::const_iterator it = handshakeMsg.begin(); @@ -153,10 +82,20 @@ void DataStreamPeer::writeSocketData(const QVariantMap &handshakeMsg) ++it; } - writeSocketData(list); + writeMessage(list); } +void DataStreamPeer::writeMessage(const QVariantList &sigProxyMsg) +{ + QByteArray data; + QDataStream msgStream(&data, QIODevice::WriteOnly); + msgStream.setVersion(QDataStream::Qt_4_2); + msgStream << sigProxyMsg; + + writeMessage(data); +} + /*** Handshake messages ***/ @@ -231,7 +170,7 @@ void DataStreamPeer::dispatch(const RegisterClient &msg) { m["ClientVersion"] = msg.clientVersion; m["ClientDate"] = Quassel::buildInfo().buildDate; - writeSocketData(m); + writeMessage(m); } @@ -240,7 +179,7 @@ void DataStreamPeer::dispatch(const ClientDenied &msg) { m["MsgType"] = "ClientInitReject"; m["Error"] = msg.errorString; - writeSocketData(m); + writeMessage(m); } @@ -251,7 +190,7 @@ void DataStreamPeer::dispatch(const ClientRegistered &msg) { m["StorageBackends"] = msg.backendInfo; m["LoginEnabled"] = m["Configured"] = msg.coreConfigured; - writeSocketData(m); + writeMessage(m); } @@ -266,7 +205,8 @@ void DataStreamPeer::dispatch(const SetupData &msg) QVariantMap m; m["MsgType"] = "CoreSetupData"; m["SetupData"] = map; - writeSocketData(m); + + writeMessage(m); } @@ -276,7 +216,7 @@ void DataStreamPeer::dispatch(const SetupFailed &msg) m["MsgType"] = "CoreSetupReject"; m["Error"] = msg.errorString; - writeSocketData(m); + writeMessage(m); } @@ -287,7 +227,7 @@ void DataStreamPeer::dispatch(const SetupDone &msg) QVariantMap m; m["MsgType"] = "CoreSetupAck"; - writeSocketData(m); + writeMessage(m); } @@ -298,7 +238,7 @@ void DataStreamPeer::dispatch(const Login &msg) m["User"] = msg.user; m["Password"] = msg.password; - writeSocketData(m); + writeMessage(m); } @@ -308,7 +248,7 @@ void DataStreamPeer::dispatch(const LoginFailed &msg) m["MsgType"] = "ClientLoginReject"; m["Error"] = msg.errorString; - writeSocketData(m); + writeMessage(m); } @@ -319,7 +259,7 @@ void DataStreamPeer::dispatch(const LoginSuccess &msg) QVariantMap m; m["MsgType"] = "ClientLoginAck"; - writeSocketData(m); + writeMessage(m); } @@ -334,7 +274,7 @@ void DataStreamPeer::dispatch(const SessionState &msg) map["Identities"] = msg.identities; m["SessionState"] = map; - writeSocketData(m); + writeMessage(m); } @@ -462,5 +402,5 @@ void DataStreamPeer::dispatch(const Protocol::HeartBeatReply &msg) void DataStreamPeer::dispatchPackedFunc(const QVariantList &packedFunc) { - writeSocketData(packedFunc); + writeMessage(packedFunc); } diff --git a/src/common/protocols/datastream/datastreampeer.h b/src/common/protocols/datastream/datastreampeer.h index c733fda2..7d04f4b5 100644 --- a/src/common/protocols/datastream/datastreampeer.h +++ b/src/common/protocols/datastream/datastreampeer.h @@ -21,8 +21,6 @@ #ifndef DATASTREAMPEER_H #define DATASTREAMPEER_H -#include - #include "../../remotepeer.h" class QDataStream; @@ -72,20 +70,15 @@ public: signals: void protocolError(const QString &errorString); -protected slots: - void onSocketDataAvailable(); - private: - bool readSocketData(QByteArray &data); - void writeSocketData(const QVariantList &list); - void writeSocketData(const QVariantMap &handshakeMsg); + using RemotePeer::writeMessage; + void writeMessage(const QVariantMap &handshakeMsg); + void writeMessage(const QVariantList &sigProxyMsg); + void processMessage(const QByteArray &msg); void handleHandshakeMessage(const QVariantList &mapData); void handlePackedFunc(const QVariantList &packedFunc); void dispatchPackedFunc(const QVariantList &packedFunc); - - QDataStream _stream; - quint32 _blockSize; }; #endif diff --git a/src/common/protocols/legacy/legacypeer.cpp b/src/common/protocols/legacy/legacypeer.cpp index b292223e..da6c3a71 100644 --- a/src/common/protocols/legacy/legacypeer.cpp +++ b/src/common/protocols/legacy/legacypeer.cpp @@ -33,11 +33,9 @@ using namespace Protocol; LegacyPeer::LegacyPeer(::AuthHandler *authHandler, QTcpSocket *socket, QObject *parent) : RemotePeer(authHandler, socket, parent), - _blockSize(0), _useCompression(false) { - _stream.setDevice(socket); - _stream.setVersion(QDataStream::Qt_4_2); + } @@ -56,56 +54,22 @@ void LegacyPeer::setSignalProxy(::SignalProxy *proxy) } -void LegacyPeer::onSocketDataAvailable() -{ - QVariant item; - while (readSocketData(item)) { - // if no sigproxy is set, we're in handshake mode and let the data be handled elsewhere - if (!signalProxy()) - handleHandshakeMessage(item); - else - handlePackedFunc(item); - } -} - - -bool LegacyPeer::readSocketData(QVariant &item) +void LegacyPeer::processMessage(const QByteArray &msg) { - if (_blockSize == 0) { - if (socket()->bytesAvailable() < 4) - return false; - _stream >> _blockSize; - } - - if (_blockSize > 1 << 22) { - close("Peer tried to send package larger than max package size!"); - return false; - } - - if (_blockSize == 0) { - close("Peer tried to send 0 byte package!"); - return false; - } - - if (socket()->bytesAvailable() < _blockSize) { - emit transferProgress(socket()->bytesAvailable(), _blockSize); - return false; - } - - emit transferProgress(_blockSize, _blockSize); - - _blockSize = 0; + QDataStream stream(msg); + stream.setVersion(QDataStream::Qt_4_2); + QVariant item; if (_useCompression) { QByteArray rawItem; - _stream >> rawItem; + stream >> rawItem; int nbytes = rawItem.size(); if (nbytes <= 4) { const char *data = rawItem.constData(); if (nbytes < 4 || (data[0] != 0 || data[1] != 0 || data[2] != 0 || data[3] != 0)) { close("Peer sent corrupted compressed data!"); - return false; + return; } } @@ -116,25 +80,24 @@ bool LegacyPeer::readSocketData(QVariant &item) itemStream >> item; } else { - _stream >> item; + stream >> item; } - if (!item.isValid()) { + if (stream.status() != QDataStream::Ok || !item.isValid()) { close("Peer sent corrupt data: unable to load QVariant!"); - return false; + return; } - return true; + // if no sigproxy is set, we're in handshake mode and let the data be handled elsewhere + if (!signalProxy()) + handleHandshakeMessage(item); + else + handlePackedFunc(item); } -void LegacyPeer::writeSocketData(const QVariant &item) +void LegacyPeer::writeMessage(const QVariant &item) { - if (!socket()->isOpen()) { - qWarning() << Q_FUNC_INFO << "Can't write to a closed socket!"; - return; - } - QByteArray block; QDataStream out(&block, QIODevice::WriteOnly); out.setVersion(QDataStream::Qt_4_2); @@ -153,7 +116,7 @@ void LegacyPeer::writeSocketData(const QVariant &item) out << item; } - _stream << block; // also writes the length as part of the serialization format + writeMessage(block); } @@ -261,7 +224,7 @@ void LegacyPeer::dispatch(const RegisterClient &msg) { m["UseCompression"] = false; #endif - writeSocketData(m); + writeMessage(m); } @@ -270,7 +233,7 @@ void LegacyPeer::dispatch(const ClientDenied &msg) { m["MsgType"] = "ClientInitReject"; m["Error"] = msg.errorString; - writeSocketData(m); + writeMessage(m); } @@ -298,7 +261,7 @@ void LegacyPeer::dispatch(const ClientRegistered &msg) { m["LoginEnabled"] = m["Configured"] = msg.coreConfigured; - writeSocketData(m); + writeMessage(m); } @@ -313,7 +276,7 @@ void LegacyPeer::dispatch(const SetupData &msg) QVariantMap m; m["MsgType"] = "CoreSetupData"; m["SetupData"] = map; - writeSocketData(m); + writeMessage(m); } @@ -323,7 +286,7 @@ void LegacyPeer::dispatch(const SetupFailed &msg) m["MsgType"] = "CoreSetupReject"; m["Error"] = msg.errorString; - writeSocketData(m); + writeMessage(m); } @@ -334,7 +297,7 @@ void LegacyPeer::dispatch(const SetupDone &msg) QVariantMap m; m["MsgType"] = "CoreSetupAck"; - writeSocketData(m); + writeMessage(m); } @@ -345,7 +308,7 @@ void LegacyPeer::dispatch(const Login &msg) m["User"] = msg.user; m["Password"] = msg.password; - writeSocketData(m); + writeMessage(m); } @@ -355,7 +318,7 @@ void LegacyPeer::dispatch(const LoginFailed &msg) m["MsgType"] = "ClientLoginReject"; m["Error"] = msg.errorString; - writeSocketData(m); + writeMessage(m); } @@ -366,7 +329,7 @@ void LegacyPeer::dispatch(const LoginSuccess &msg) QVariantMap m; m["MsgType"] = "ClientLoginAck"; - writeSocketData(m); + writeMessage(m); } @@ -381,7 +344,7 @@ void LegacyPeer::dispatch(const SessionState &msg) map["Identities"] = msg.identities; m["SessionState"] = map; - writeSocketData(m); + writeMessage(m); } @@ -518,7 +481,7 @@ void LegacyPeer::dispatch(const Protocol::HeartBeatReply &msg) void LegacyPeer::dispatchPackedFunc(const QVariantList &packedFunc) { - writeSocketData(QVariant(packedFunc)); + writeMessage(QVariant(packedFunc)); } diff --git a/src/common/protocols/legacy/legacypeer.h b/src/common/protocols/legacy/legacypeer.h index a4c7a490..a063616c 100644 --- a/src/common/protocols/legacy/legacypeer.h +++ b/src/common/protocols/legacy/legacypeer.h @@ -21,12 +21,8 @@ #ifndef LEGACYPEER_H #define LEGACYPEER_H -#include - #include "../../remotepeer.h" -class QDataStream; - class LegacyPeer : public RemotePeer { Q_OBJECT @@ -73,12 +69,11 @@ signals: // only used in compat mode void protocolVersionMismatch(int actual, int expected); -protected slots: - void onSocketDataAvailable(); - private: - bool readSocketData(QVariant &item); - void writeSocketData(const QVariant &item); + using RemotePeer::writeMessage; + void writeMessage(const QVariant &item); + void processMessage(const QByteArray &msg); + void handleHandshakeMessage(const QVariant &msg); void handlePackedFunc(const QVariant &packedFunc); void dispatchPackedFunc(const QVariantList &packedFunc); @@ -86,8 +81,6 @@ private: void toLegacyIrcUsersAndChannels(QVariantMap &initData); void fromLegacyIrcUsersAndChannels(QVariantMap &initData); - QDataStream _stream; - quint32 _blockSize; bool _useCompression; }; diff --git a/src/common/remotepeer.cpp b/src/common/remotepeer.cpp index e8b682d7..90da083e 100644 --- a/src/common/remotepeer.cpp +++ b/src/common/remotepeer.cpp @@ -18,6 +18,8 @@ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * ***************************************************************************/ +#include + #include #include @@ -31,16 +33,19 @@ using namespace Protocol; +const quint32 maxMessageSize = 64 * 1024 * 1024; // This is uncompressed size. 64 MB should be enough for any sort of initData or backlog chunk + RemotePeer::RemotePeer(::AuthHandler *authHandler, QTcpSocket *socket, QObject *parent) : Peer(authHandler, parent), _socket(socket), _signalProxy(0), _heartBeatTimer(new QTimer(this)), _heartBeatCount(0), - _lag(0) + _lag(0), + _msgSize(0) { socket->setParent(this); - connect(socket, SIGNAL(readyRead()), SLOT(onSocketDataAvailable())); + connect(socket, SIGNAL(readyRead()), SLOT(onReadyRead())); connect(socket, SIGNAL(stateChanged(QAbstractSocket::SocketState)), SLOT(onSocketStateChanged(QAbstractSocket::SocketState))); connect(socket, SIGNAL(error(QAbstractSocket::SocketError)), SLOT(onSocketError(QAbstractSocket::SocketError))); connect(socket, SIGNAL(disconnected()), SIGNAL(disconnected())); @@ -56,7 +61,7 @@ RemotePeer::RemotePeer(::AuthHandler *authHandler, QTcpSocket *socket, QObject * // It's possible that more data has already arrived during the handshake, so readyRead() wouldn't be triggered. // However, we can't call a virtual function from the ctor, so let's do it asynchronously. if (socket->bytesAvailable()) - QTimer::singleShot(0, this, SLOT(onSocketDataAvailable())); + QTimer::singleShot(0, this, SLOT(onReadyRead())); } @@ -180,6 +185,64 @@ void RemotePeer::close(const QString &reason) } +void RemotePeer::onReadyRead() +{ + // don't try to read more data if we're already closing + if (socket()->state() != QAbstractSocket::ConnectedState) + return; + + QByteArray msg; + while (readMessage(msg)) + processMessage(msg); +} + + +bool RemotePeer::readMessage(QByteArray &msg) +{ + if (_msgSize == 0) { + if (socket()->bytesAvailable() < 4) + return false; + socket()->read((char*)&_msgSize, 4); + _msgSize = qFromBigEndian(_msgSize); + + if (_msgSize > maxMessageSize) { + close("Peer tried to send package larger than max package size!"); + return false; + } + + if (_msgSize == 0) { + close("Peer tried to send an empty message!"); + return false; + } + } + + if (socket()->bytesAvailable() < _msgSize) { + emit transferProgress(socket()->bytesAvailable(), _msgSize); + return false; + } + + emit transferProgress(_msgSize, _msgSize); + + msg.resize(_msgSize); + qint64 bytesRead = socket()->read(msg.data(), _msgSize); + if (bytesRead != _msgSize) { + close("Premature end of data stream!"); + return false; + } + + _msgSize = 0; + return true; +} + + +void RemotePeer::writeMessage(const QByteArray &msg) +{ + quint32 size = qToBigEndian(msg.size()); + socket()->write((const char*)&size, 4); + socket()->write(msg.constData(), msg.size()); +} + + void RemotePeer::handle(const HeartBeat &heartBeat) { dispatch(HeartBeatReply(heartBeat.timestamp)); diff --git a/src/common/remotepeer.h b/src/common/remotepeer.h index e58d140b..b1c5bf4e 100644 --- a/src/common/remotepeer.h +++ b/src/common/remotepeer.h @@ -72,6 +72,9 @@ signals: protected: SignalProxy *signalProxy() const; + void writeMessage(const QByteArray &msg); + virtual void processMessage(const QByteArray &msg) = 0; + // These protocol messages get handled internally and won't reach SignalProxy void handle(const Protocol::HeartBeat &heartBeat); void handle(const Protocol::HeartBeatReply &heartBeatReply); @@ -79,20 +82,25 @@ protected: virtual void dispatch(const Protocol::HeartBeatReply &msg) = 0; protected slots: - virtual void onSocketDataAvailable() = 0; virtual void onSocketStateChanged(QAbstractSocket::SocketState state); virtual void onSocketError(QAbstractSocket::SocketError error); private slots: + void onReadyRead(); + void sendHeartBeat(); void changeHeartBeatInterval(int secs); +private: + bool readMessage(QByteArray &msg); + private: QTcpSocket *_socket; SignalProxy *_signalProxy; QTimer *_heartBeatTimer; int _heartBeatCount; int _lag; + quint32 _msgSize; }; #endif -- 2.20.1