X-Git-Url: https://git.quassel-irc.org/?p=quassel.git;a=blobdiff_plain;f=src%2Fcommon%2Fprotocols%2Fdatastream%2Fdatastreampeer.cpp;h=7c45b4618bcc64aebbf10b352fa1cfb926742603;hp=86fbf53a42ae728ad45807d1e291bf03a9ca9502;hb=f111b1c5f4056c20fba0ab0bb1570e1cc156fe87;hpb=d1d1e6ef1d2073d629bf54fd8e0d31f647f9cb88 diff --git a/src/common/protocols/datastream/datastreampeer.cpp b/src/common/protocols/datastream/datastreampeer.cpp index 86fbf53a..7c45b461 100644 --- a/src/common/protocols/datastream/datastreampeer.cpp +++ b/src/common/protocols/datastream/datastreampeer.cpp @@ -19,23 +19,18 @@ ***************************************************************************/ #include - +#include #include #include #include "datastreampeer.h" -#include "quassel.h" using namespace Protocol; -DataStreamPeer::DataStreamPeer(::AuthHandler *authHandler, QTcpSocket *socket, quint16 features, QObject *parent) - : RemotePeer(authHandler, socket, parent), - _blockSize(0) +DataStreamPeer::DataStreamPeer(::AuthHandler *authHandler, QTcpSocket *socket, quint16 features, Compressor::CompressionLevel level, QObject *parent) + : RemotePeer(authHandler, socket, level, parent) { Q_UNUSED(features); - - _stream.setDevice(socket); - _stream.setVersion(QDataStream::Qt_4_2); } @@ -58,93 +53,26 @@ quint16 DataStreamPeer::enabledFeatures() const } -// Note that we're already preparing for readSocketData() moving into RemotePeer, thus the slightly -// cumbersome type and stream handling here. -void DataStreamPeer::onSocketDataAvailable() +void DataStreamPeer::processMessage(const QByteArray &msg) { - // don't try to read more data if we're already closing - if (socket()->state() != QAbstractSocket::ConnectedState) - return; - - QByteArray data; - while (readSocketData(data)) { - // data contains always a serialized QVector - QDataStream stream(data); - stream.setVersion(QDataStream::Qt_4_2); - QVariantList list; - stream >> list; - if (stream.status() != QDataStream::Ok) { - close("Peer sent corrupt data, closing down!"); - return; - } - - // if no sigproxy is set, we're in handshake mode - if (!signalProxy()) - handleHandshakeMessage(list); - else - handlePackedFunc(list); - } -} - - -bool DataStreamPeer::readSocketData(QByteArray &data) -{ - if (_blockSize == 0) { - if (socket()->bytesAvailable() < 4) - return false; - // the block size is part of QByteArray's serialization format, so we don't actually read it now... - socket()->peek((char*)&_blockSize, 4); - _blockSize = qFromBigEndian(_blockSize) + 4; // ... but of course we have to add its size to the total size of the block - } - - if (_blockSize > 1 << 22) { - close("Peer tried to send package larger than max package size!"); - return false; - } - - if (_blockSize == 0) { - close("Peer tried to send 0 byte package!"); - return false; - } - - if (socket()->bytesAvailable() < _blockSize) { - emit transferProgress(socket()->bytesAvailable(), _blockSize); - return false; - } - - emit transferProgress(_blockSize, _blockSize); - - _stream >> data; - _blockSize = 0; - - if (_stream.status() != QDataStream::Ok) { + QDataStream stream(msg); + stream.setVersion(QDataStream::Qt_4_2); + QVariantList list; + stream >> list; + if (stream.status() != QDataStream::Ok) { close("Peer sent corrupt data, closing down!"); - return false; - } - - return true; -} - - -void DataStreamPeer::writeSocketData(const QVariantList &list) -{ - if (!socket()->isOpen()) { - qWarning() << Q_FUNC_INFO << "Can't write to a closed socket!"; return; } - QByteArray data; - QDataStream msgStream(&data, QIODevice::WriteOnly); - msgStream.setVersion(QDataStream::Qt_4_2); - msgStream << list; - - _stream << data; // also writes the block size as part of the serialization format - if (_stream.status() != QDataStream::Ok) - close("Could not serialize data for peer!"); + // if no sigproxy is set, we're in handshake mode + if (!signalProxy()) + handleHandshakeMessage(list); + else + handlePackedFunc(list); } -void DataStreamPeer::writeSocketData(const QVariantMap &handshakeMsg) +void DataStreamPeer::writeMessage(const QVariantMap &handshakeMsg) { QVariantList list; QVariantMap::const_iterator it = handshakeMsg.begin(); @@ -153,10 +81,20 @@ void DataStreamPeer::writeSocketData(const QVariantMap &handshakeMsg) ++it; } - writeSocketData(list); + writeMessage(list); } +void DataStreamPeer::writeMessage(const QVariantList &sigProxyMsg) +{ + QByteArray data; + QDataStream msgStream(&data, QIODevice::WriteOnly); + msgStream.setVersion(QDataStream::Qt_4_2); + msgStream << sigProxyMsg; + + writeMessage(data); +} + /*** Handshake messages ***/ @@ -178,7 +116,7 @@ void DataStreamPeer::handleHandshakeMessage(const QVariantList &mapData) } if (msgType == "ClientInit") { - handle(RegisterClient(m["ClientVersion"].toString(), false)); // UseSsl obsolete + handle(RegisterClient(m["ClientVersion"].toString(), m["ClientDate"].toString(), false)); // UseSsl obsolete } else if (msgType == "ClientInitReject") { @@ -229,9 +167,9 @@ void DataStreamPeer::dispatch(const RegisterClient &msg) { QVariantMap m; m["MsgType"] = "ClientInit"; m["ClientVersion"] = msg.clientVersion; - m["ClientDate"] = Quassel::buildInfo().buildDate; + m["ClientDate"] = msg.buildDate; - writeSocketData(m); + writeMessage(m); } @@ -240,7 +178,7 @@ void DataStreamPeer::dispatch(const ClientDenied &msg) { m["MsgType"] = "ClientInitReject"; m["Error"] = msg.errorString; - writeSocketData(m); + writeMessage(m); } @@ -251,7 +189,7 @@ void DataStreamPeer::dispatch(const ClientRegistered &msg) { m["StorageBackends"] = msg.backendInfo; m["LoginEnabled"] = m["Configured"] = msg.coreConfigured; - writeSocketData(m); + writeMessage(m); } @@ -266,7 +204,8 @@ void DataStreamPeer::dispatch(const SetupData &msg) QVariantMap m; m["MsgType"] = "CoreSetupData"; m["SetupData"] = map; - writeSocketData(m); + + writeMessage(m); } @@ -276,7 +215,7 @@ void DataStreamPeer::dispatch(const SetupFailed &msg) m["MsgType"] = "CoreSetupReject"; m["Error"] = msg.errorString; - writeSocketData(m); + writeMessage(m); } @@ -287,7 +226,7 @@ void DataStreamPeer::dispatch(const SetupDone &msg) QVariantMap m; m["MsgType"] = "CoreSetupAck"; - writeSocketData(m); + writeMessage(m); } @@ -298,7 +237,7 @@ void DataStreamPeer::dispatch(const Login &msg) m["User"] = msg.user; m["Password"] = msg.password; - writeSocketData(m); + writeMessage(m); } @@ -308,7 +247,7 @@ void DataStreamPeer::dispatch(const LoginFailed &msg) m["MsgType"] = "ClientLoginReject"; m["Error"] = msg.errorString; - writeSocketData(m); + writeMessage(m); } @@ -319,7 +258,7 @@ void DataStreamPeer::dispatch(const LoginSuccess &msg) QVariantMap m; m["MsgType"] = "ClientLoginAck"; - writeSocketData(m); + writeMessage(m); } @@ -334,7 +273,7 @@ void DataStreamPeer::dispatch(const SessionState &msg) map["Identities"] = msg.identities; m["SessionState"] = map; - writeSocketData(m); + writeMessage(m); } @@ -462,5 +401,5 @@ void DataStreamPeer::dispatch(const Protocol::HeartBeatReply &msg) void DataStreamPeer::dispatchPackedFunc(const QVariantList &packedFunc) { - writeSocketData(packedFunc); + writeMessage(packedFunc); }