From 249a178a775bce425a83aefd7c7c39a868ebfe98 Mon Sep 17 00:00:00 2001 From: Manuel Nickschas Date: Sun, 16 Feb 2014 22:28:21 +0100 Subject: [PATCH] Hook the Compressor into RemotePeer Instead of reading from/writing to the socket directly, RemotePeer now goes through a Compressor instance. At this time, no actual compression will happen though. Note that the legacy protocol does not support streaming compression, which is why it sets a compression level of NoCompression. --- .../protocols/datastream/datastreampeer.cpp | 2 +- src/common/protocols/legacy/legacypeer.cpp | 2 +- src/common/remotepeer.cpp | 34 +++++++++---------- src/common/remotepeer.h | 6 ++-- 4 files changed, 23 insertions(+), 21 deletions(-) diff --git a/src/common/protocols/datastream/datastreampeer.cpp b/src/common/protocols/datastream/datastreampeer.cpp index 294dc6e6..7d9b8547 100644 --- a/src/common/protocols/datastream/datastreampeer.cpp +++ b/src/common/protocols/datastream/datastreampeer.cpp @@ -29,7 +29,7 @@ using namespace Protocol; DataStreamPeer::DataStreamPeer(::AuthHandler *authHandler, QTcpSocket *socket, quint16 features, QObject *parent) - : RemotePeer(authHandler, socket, parent) + : RemotePeer(authHandler, socket, Compressor::BestCompression, parent) { Q_UNUSED(features); } diff --git a/src/common/protocols/legacy/legacypeer.cpp b/src/common/protocols/legacy/legacypeer.cpp index da6c3a71..49c77fdd 100644 --- a/src/common/protocols/legacy/legacypeer.cpp +++ b/src/common/protocols/legacy/legacypeer.cpp @@ -32,7 +32,7 @@ const uint clientNeedsProtocol = protocolVersion; using namespace Protocol; LegacyPeer::LegacyPeer(::AuthHandler *authHandler, QTcpSocket *socket, QObject *parent) - : RemotePeer(authHandler, socket, parent), + : RemotePeer(authHandler, socket, Compressor::NoCompression, parent), _useCompression(false) { diff --git a/src/common/remotepeer.cpp b/src/common/remotepeer.cpp index 90da083e..948bffa7 100644 --- a/src/common/remotepeer.cpp +++ b/src/common/remotepeer.cpp @@ -35,9 +35,10 @@ using namespace Protocol; const quint32 maxMessageSize = 64 * 1024 * 1024; // This is uncompressed size. 64 MB should be enough for any sort of initData or backlog chunk -RemotePeer::RemotePeer(::AuthHandler *authHandler, QTcpSocket *socket, QObject *parent) +RemotePeer::RemotePeer(::AuthHandler *authHandler, QTcpSocket *socket, Compressor::CompressionLevel level, QObject *parent) : Peer(authHandler, parent), _socket(socket), + _compressor(new Compressor(socket, level, this)), _signalProxy(0), _heartBeatTimer(new QTimer(this)), _heartBeatCount(0), @@ -45,7 +46,6 @@ RemotePeer::RemotePeer(::AuthHandler *authHandler, QTcpSocket *socket, QObject * _msgSize(0) { socket->setParent(this); - connect(socket, SIGNAL(readyRead()), SLOT(onReadyRead())); connect(socket, SIGNAL(stateChanged(QAbstractSocket::SocketState)), SLOT(onSocketStateChanged(QAbstractSocket::SocketState))); connect(socket, SIGNAL(error(QAbstractSocket::SocketError)), SLOT(onSocketError(QAbstractSocket::SocketError))); connect(socket, SIGNAL(disconnected()), SIGNAL(disconnected())); @@ -56,12 +56,10 @@ RemotePeer::RemotePeer(::AuthHandler *authHandler, QTcpSocket *socket, QObject * connect(sslSocket, SIGNAL(encrypted()), SIGNAL(secureStateChanged())); #endif - connect(_heartBeatTimer, SIGNAL(timeout()), SLOT(sendHeartBeat())); + connect(_compressor, SIGNAL(readyRead()), SLOT(onReadyRead())); + connect(_compressor, SIGNAL(error(Compressor::Error)), SLOT(onCompressionError(Compressor::Error))); - // It's possible that more data has already arrived during the handshake, so readyRead() wouldn't be triggered. - // However, we can't call a virtual function from the ctor, so let's do it asynchronously. - if (socket->bytesAvailable()) - QTimer::singleShot(0, this, SLOT(onReadyRead())); + connect(_heartBeatTimer, SIGNAL(timeout()), SLOT(sendHeartBeat())); } @@ -79,6 +77,12 @@ void RemotePeer::onSocketError(QAbstractSocket::SocketError error) } +void RemotePeer::onCompressionError(Compressor::Error error) +{ + close(QString("Compression error %1").arg(error)); +} + + QString RemotePeer::description() const { if (socket()) @@ -187,10 +191,6 @@ void RemotePeer::close(const QString &reason) void RemotePeer::onReadyRead() { - // don't try to read more data if we're already closing - if (socket()->state() != QAbstractSocket::ConnectedState) - return; - QByteArray msg; while (readMessage(msg)) processMessage(msg); @@ -200,9 +200,9 @@ void RemotePeer::onReadyRead() bool RemotePeer::readMessage(QByteArray &msg) { if (_msgSize == 0) { - if (socket()->bytesAvailable() < 4) + if (_compressor->bytesAvailable() < 4) return false; - socket()->read((char*)&_msgSize, 4); + _compressor->read((char*)&_msgSize, 4); _msgSize = qFromBigEndian(_msgSize); if (_msgSize > maxMessageSize) { @@ -216,7 +216,7 @@ bool RemotePeer::readMessage(QByteArray &msg) } } - if (socket()->bytesAvailable() < _msgSize) { + if (_compressor->bytesAvailable() < _msgSize) { emit transferProgress(socket()->bytesAvailable(), _msgSize); return false; } @@ -224,7 +224,7 @@ bool RemotePeer::readMessage(QByteArray &msg) emit transferProgress(_msgSize, _msgSize); msg.resize(_msgSize); - qint64 bytesRead = socket()->read(msg.data(), _msgSize); + qint64 bytesRead = _compressor->read(msg.data(), _msgSize); if (bytesRead != _msgSize) { close("Premature end of data stream!"); return false; @@ -238,8 +238,8 @@ bool RemotePeer::readMessage(QByteArray &msg) void RemotePeer::writeMessage(const QByteArray &msg) { quint32 size = qToBigEndian(msg.size()); - socket()->write((const char*)&size, 4); - socket()->write(msg.constData(), msg.size()); + _compressor->write((const char*)&size, 4, Compressor::NoFlush); + _compressor->write(msg.constData(), msg.size()); } diff --git a/src/common/remotepeer.h b/src/common/remotepeer.h index b1c5bf4e..44898297 100644 --- a/src/common/remotepeer.h +++ b/src/common/remotepeer.h @@ -23,11 +23,11 @@ #include +#include "compressor.h" #include "peer.h" #include "protocol.h" #include "signalproxy.h" -class QTcpSocket; class QTimer; class AuthHandler; @@ -41,7 +41,7 @@ public: using Peer::handle; using Peer::dispatch; - RemotePeer(AuthHandler *authHandler, QTcpSocket *socket, QObject *parent = 0); + RemotePeer(AuthHandler *authHandler, QTcpSocket *socket, Compressor::CompressionLevel level, QObject *parent = 0); void setSignalProxy(SignalProxy *proxy); @@ -87,6 +87,7 @@ protected slots: private slots: void onReadyRead(); + void onCompressionError(Compressor::Error error); void sendHeartBeat(); void changeHeartBeatInterval(int secs); @@ -96,6 +97,7 @@ private: private: QTcpSocket *_socket; + Compressor *_compressor; SignalProxy *_signalProxy; QTimer *_heartBeatTimer; int _heartBeatCount; -- 2.20.1