Hook the Compressor into RemotePeer
authorManuel Nickschas <sputnick@quassel-irc.org>
Sun, 16 Feb 2014 21:28:21 +0000 (22:28 +0100)
committerManuel Nickschas <sputnick@quassel-irc.org>
Sun, 16 Feb 2014 21:28:21 +0000 (22:28 +0100)
Instead of reading from/writing to the socket directly, RemotePeer now
goes through a Compressor instance. At this time, no actual compression
will happen though.

Note that the legacy protocol does not support streaming compression, which
is why it sets a compression level of NoCompression.

src/common/protocols/datastream/datastreampeer.cpp
src/common/protocols/legacy/legacypeer.cpp
src/common/remotepeer.cpp
src/common/remotepeer.h

index 294dc6e..7d9b854 100644 (file)
@@ -29,7 +29,7 @@
 using namespace Protocol;
 
 DataStreamPeer::DataStreamPeer(::AuthHandler *authHandler, QTcpSocket *socket, quint16 features, QObject *parent)
-    : RemotePeer(authHandler, socket, parent)
+    : RemotePeer(authHandler, socket, Compressor::BestCompression, parent)
 {
     Q_UNUSED(features);
 }
index da6c3a7..49c77fd 100644 (file)
@@ -32,7 +32,7 @@ const uint clientNeedsProtocol = protocolVersion;
 using namespace Protocol;
 
 LegacyPeer::LegacyPeer(::AuthHandler *authHandler, QTcpSocket *socket, QObject *parent)
-    : RemotePeer(authHandler, socket, parent),
+    : RemotePeer(authHandler, socket, Compressor::NoCompression, parent),
     _useCompression(false)
 {
 
index 90da083..948bffa 100644 (file)
@@ -35,9 +35,10 @@ using namespace Protocol;
 
 const quint32 maxMessageSize = 64 * 1024 * 1024; // This is uncompressed size. 64 MB should be enough for any sort of initData or backlog chunk
 
-RemotePeer::RemotePeer(::AuthHandler *authHandler, QTcpSocket *socket, QObject *parent)
+RemotePeer::RemotePeer(::AuthHandler *authHandler, QTcpSocket *socket, Compressor::CompressionLevel level, QObject *parent)
     : Peer(authHandler, parent),
     _socket(socket),
+    _compressor(new Compressor(socket, level, this)),
     _signalProxy(0),
     _heartBeatTimer(new QTimer(this)),
     _heartBeatCount(0),
@@ -45,7 +46,6 @@ RemotePeer::RemotePeer(::AuthHandler *authHandler, QTcpSocket *socket, QObject *
     _msgSize(0)
 {
     socket->setParent(this);
-    connect(socket, SIGNAL(readyRead()), SLOT(onReadyRead()));
     connect(socket, SIGNAL(stateChanged(QAbstractSocket::SocketState)), SLOT(onSocketStateChanged(QAbstractSocket::SocketState)));
     connect(socket, SIGNAL(error(QAbstractSocket::SocketError)), SLOT(onSocketError(QAbstractSocket::SocketError)));
     connect(socket, SIGNAL(disconnected()), SIGNAL(disconnected()));
@@ -56,12 +56,10 @@ RemotePeer::RemotePeer(::AuthHandler *authHandler, QTcpSocket *socket, QObject *
         connect(sslSocket, SIGNAL(encrypted()), SIGNAL(secureStateChanged()));
 #endif
 
-    connect(_heartBeatTimer, SIGNAL(timeout()), SLOT(sendHeartBeat()));
+    connect(_compressor, SIGNAL(readyRead()), SLOT(onReadyRead()));
+    connect(_compressor, SIGNAL(error(Compressor::Error)), SLOT(onCompressionError(Compressor::Error)));
 
-    // It's possible that more data has already arrived during the handshake, so readyRead() wouldn't be triggered.
-    // However, we can't call a virtual function from the ctor, so let's do it asynchronously.
-    if (socket->bytesAvailable())
-        QTimer::singleShot(0, this, SLOT(onReadyRead()));
+    connect(_heartBeatTimer, SIGNAL(timeout()), SLOT(sendHeartBeat()));
 }
 
 
@@ -79,6 +77,12 @@ void RemotePeer::onSocketError(QAbstractSocket::SocketError error)
 }
 
 
+void RemotePeer::onCompressionError(Compressor::Error error)
+{
+    close(QString("Compression error %1").arg(error));
+}
+
+
 QString RemotePeer::description() const
 {
     if (socket())
@@ -187,10 +191,6 @@ void RemotePeer::close(const QString &reason)
 
 void RemotePeer::onReadyRead()
 {
-    // don't try to read more data if we're already closing
-    if (socket()->state() !=  QAbstractSocket::ConnectedState)
-        return;
-
     QByteArray msg;
     while (readMessage(msg))
         processMessage(msg);
@@ -200,9 +200,9 @@ void RemotePeer::onReadyRead()
 bool RemotePeer::readMessage(QByteArray &msg)
 {
     if (_msgSize == 0) {
-        if (socket()->bytesAvailable() < 4)
+        if (_compressor->bytesAvailable() < 4)
             return false;
-        socket()->read((char*)&_msgSize, 4);
+        _compressor->read((char*)&_msgSize, 4);
         _msgSize = qFromBigEndian<quint32>(_msgSize);
 
         if (_msgSize > maxMessageSize) {
@@ -216,7 +216,7 @@ bool RemotePeer::readMessage(QByteArray &msg)
         }
     }
 
-    if (socket()->bytesAvailable() < _msgSize) {
+    if (_compressor->bytesAvailable() < _msgSize) {
         emit transferProgress(socket()->bytesAvailable(), _msgSize);
         return false;
     }
@@ -224,7 +224,7 @@ bool RemotePeer::readMessage(QByteArray &msg)
     emit transferProgress(_msgSize, _msgSize);
 
     msg.resize(_msgSize);
-    qint64 bytesRead = socket()->read(msg.data(), _msgSize);
+    qint64 bytesRead = _compressor->read(msg.data(), _msgSize);
     if (bytesRead != _msgSize) {
         close("Premature end of data stream!");
         return false;
@@ -238,8 +238,8 @@ bool RemotePeer::readMessage(QByteArray &msg)
 void RemotePeer::writeMessage(const QByteArray &msg)
 {
     quint32 size = qToBigEndian<quint32>(msg.size());
-    socket()->write((const char*)&size, 4);
-    socket()->write(msg.constData(), msg.size());
+    _compressor->write((const char*)&size, 4, Compressor::NoFlush);
+    _compressor->write(msg.constData(), msg.size());
 }
 
 
index b1c5bf4..4489829 100644 (file)
 
 #include <QDateTime>
 
+#include "compressor.h"
 #include "peer.h"
 #include "protocol.h"
 #include "signalproxy.h"
 
-class QTcpSocket;
 class QTimer;
 
 class AuthHandler;
@@ -41,7 +41,7 @@ public:
     using Peer::handle;
     using Peer::dispatch;
 
-    RemotePeer(AuthHandler *authHandler, QTcpSocket *socket, QObject *parent = 0);
+    RemotePeer(AuthHandler *authHandler, QTcpSocket *socket, Compressor::CompressionLevel level, QObject *parent = 0);
 
     void setSignalProxy(SignalProxy *proxy);
 
@@ -87,6 +87,7 @@ protected slots:
 
 private slots:
     void onReadyRead();
+    void onCompressionError(Compressor::Error error);
 
     void sendHeartBeat();
     void changeHeartBeatInterval(int secs);
@@ -96,6 +97,7 @@ private:
 
 private:
     QTcpSocket *_socket;
+    Compressor *_compressor;
     SignalProxy *_signalProxy;
     QTimer *_heartBeatTimer;
     int _heartBeatCount;