Hook the Compressor into RemotePeer
[quassel.git] / src / common / remotepeer.cpp
index 90da083..948bffa 100644 (file)
@@ -35,9 +35,10 @@ using namespace Protocol;
 
 const quint32 maxMessageSize = 64 * 1024 * 1024; // This is uncompressed size. 64 MB should be enough for any sort of initData or backlog chunk
 
-RemotePeer::RemotePeer(::AuthHandler *authHandler, QTcpSocket *socket, QObject *parent)
+RemotePeer::RemotePeer(::AuthHandler *authHandler, QTcpSocket *socket, Compressor::CompressionLevel level, QObject *parent)
     : Peer(authHandler, parent),
     _socket(socket),
+    _compressor(new Compressor(socket, level, this)),
     _signalProxy(0),
     _heartBeatTimer(new QTimer(this)),
     _heartBeatCount(0),
@@ -45,7 +46,6 @@ RemotePeer::RemotePeer(::AuthHandler *authHandler, QTcpSocket *socket, QObject *
     _msgSize(0)
 {
     socket->setParent(this);
-    connect(socket, SIGNAL(readyRead()), SLOT(onReadyRead()));
     connect(socket, SIGNAL(stateChanged(QAbstractSocket::SocketState)), SLOT(onSocketStateChanged(QAbstractSocket::SocketState)));
     connect(socket, SIGNAL(error(QAbstractSocket::SocketError)), SLOT(onSocketError(QAbstractSocket::SocketError)));
     connect(socket, SIGNAL(disconnected()), SIGNAL(disconnected()));
@@ -56,12 +56,10 @@ RemotePeer::RemotePeer(::AuthHandler *authHandler, QTcpSocket *socket, QObject *
         connect(sslSocket, SIGNAL(encrypted()), SIGNAL(secureStateChanged()));
 #endif
 
-    connect(_heartBeatTimer, SIGNAL(timeout()), SLOT(sendHeartBeat()));
+    connect(_compressor, SIGNAL(readyRead()), SLOT(onReadyRead()));
+    connect(_compressor, SIGNAL(error(Compressor::Error)), SLOT(onCompressionError(Compressor::Error)));
 
-    // It's possible that more data has already arrived during the handshake, so readyRead() wouldn't be triggered.
-    // However, we can't call a virtual function from the ctor, so let's do it asynchronously.
-    if (socket->bytesAvailable())
-        QTimer::singleShot(0, this, SLOT(onReadyRead()));
+    connect(_heartBeatTimer, SIGNAL(timeout()), SLOT(sendHeartBeat()));
 }
 
 
@@ -79,6 +77,12 @@ void RemotePeer::onSocketError(QAbstractSocket::SocketError error)
 }
 
 
+void RemotePeer::onCompressionError(Compressor::Error error)
+{
+    close(QString("Compression error %1").arg(error));
+}
+
+
 QString RemotePeer::description() const
 {
     if (socket())
@@ -187,10 +191,6 @@ void RemotePeer::close(const QString &reason)
 
 void RemotePeer::onReadyRead()
 {
-    // don't try to read more data if we're already closing
-    if (socket()->state() !=  QAbstractSocket::ConnectedState)
-        return;
-
     QByteArray msg;
     while (readMessage(msg))
         processMessage(msg);
@@ -200,9 +200,9 @@ void RemotePeer::onReadyRead()
 bool RemotePeer::readMessage(QByteArray &msg)
 {
     if (_msgSize == 0) {
-        if (socket()->bytesAvailable() < 4)
+        if (_compressor->bytesAvailable() < 4)
             return false;
-        socket()->read((char*)&_msgSize, 4);
+        _compressor->read((char*)&_msgSize, 4);
         _msgSize = qFromBigEndian<quint32>(_msgSize);
 
         if (_msgSize > maxMessageSize) {
@@ -216,7 +216,7 @@ bool RemotePeer::readMessage(QByteArray &msg)
         }
     }
 
-    if (socket()->bytesAvailable() < _msgSize) {
+    if (_compressor->bytesAvailable() < _msgSize) {
         emit transferProgress(socket()->bytesAvailable(), _msgSize);
         return false;
     }
@@ -224,7 +224,7 @@ bool RemotePeer::readMessage(QByteArray &msg)
     emit transferProgress(_msgSize, _msgSize);
 
     msg.resize(_msgSize);
-    qint64 bytesRead = socket()->read(msg.data(), _msgSize);
+    qint64 bytesRead = _compressor->read(msg.data(), _msgSize);
     if (bytesRead != _msgSize) {
         close("Premature end of data stream!");
         return false;
@@ -238,8 +238,8 @@ bool RemotePeer::readMessage(QByteArray &msg)
 void RemotePeer::writeMessage(const QByteArray &msg)
 {
     quint32 size = qToBigEndian<quint32>(msg.size());
-    socket()->write((const char*)&size, 4);
-    socket()->write(msg.constData(), msg.size());
+    _compressor->write((const char*)&size, 4, Compressor::NoFlush);
+    _compressor->write(msg.constData(), msg.size());
 }