Implement streaming compression support
authorManuel Nickschas <sputnick@quassel-irc.org>
Mon, 17 Feb 2014 21:18:39 +0000 (22:18 +0100)
committerManuel Nickschas <sputnick@quassel-irc.org>
Mon, 17 Feb 2014 21:18:39 +0000 (22:18 +0100)
This leverages miniz in order to DEFLATE the data stream between
client and core. This is the standard format supported by zlib.

Note that streaming compression currently is unconditionally enabled
for the DataStream protocol, and unconditionally disabled for the
legacy protocol. We'll make them honor the handshake negotiation
later.

src/common/compressor.cpp
src/common/compressor.h

index 9ee877c..d2634e4 100644 (file)
 #include <QTcpSocket>
 #include <QTimer>
 
-const qint64 maxBufferSize = 64 * 1024 * 1024; // protect us from zip bombs
+#define MINIZ_HEADER_FILE_ONLY
+#include "../../3rdparty/miniz/miniz.c"
+
+const int maxBufferSize = 64 * 1024 * 1024; // protect us from zip bombs
+const int ioBufferSize = 64 * 1024;         // chunk size for inflate/deflate; should not be too large as we preallocate that space!
 
 Compressor::Compressor(QTcpSocket *socket, Compressor::CompressionLevel level, QObject *parent)
     : QObject(parent),
     _socket(socket),
-    _level(level)
+    _level(level),
+    _inflater(0),
+    _deflater(0)
 {
-    _level = NoCompression; // compression not implemented yet
-
     connect(socket, SIGNAL(readyRead()), SLOT(readData()));
 
+    bool ok = true;
+    if (level != NoCompression)
+        ok = initStreams();
+
+    if (!ok) {
+        // something went wrong during initialization... but we can only emit an error after RemotePeer has connected its signal
+        QTimer::singleShot(0, this, SIGNAL(error()));
+        return;
+    }
+
     // It's possible that more data has already arrived during the handshake, so readyRead() wouldn't be triggered.
     // However, we want to give RemotePeer a chance to connect to our signals, so trigger this asynchronously.
     if (socket->bytesAvailable())
@@ -41,6 +55,58 @@ Compressor::Compressor(QTcpSocket *socket, Compressor::CompressionLevel level, Q
 }
 
 
+Compressor::~Compressor()
+{
+    // release resources allocated by zlib
+    if (_inflater) {
+        inflateEnd(_inflater);
+        delete _inflater;
+    }
+    if (_deflater) {
+        deflateEnd(_deflater);
+        delete _deflater;
+    }
+}
+
+
+bool Compressor::initStreams()
+{
+    int zlevel;
+    switch(compressionLevel()) {
+        case BestCompression:
+            zlevel = 9;
+            break;
+        case BestSpeed:
+            zlevel = 1;
+            break;
+        default:
+            zlevel = Z_DEFAULT_COMPRESSION;
+    }
+
+    _inflater = new z_stream;
+    memset(_inflater, 0, sizeof(z_stream));
+    if (Z_OK != inflateInit(_inflater)) {
+        qWarning() << "Could not initialize the inflate stream!";
+        return false;
+    }
+
+    _deflater = new z_stream;
+    memset(_deflater, 0, sizeof(z_stream));
+    if (Z_OK != deflateInit(_deflater, zlevel)) {
+        qWarning() << "Could not intialize the deflate stream!";
+        return false;
+    }
+
+    _inputBuffer.reserve(ioBufferSize); // pre-allocate space
+    _outputBuffer.resize(ioBufferSize); // not a typo; we never change the size of this buffer anyway (we *do* for _inputBuffer!)
+
+    qDebug() << "Enabling compression...";
+
+    return true;
+}
+
+
+
 qint64 Compressor::bytesAvailable() const
 {
     return _readBuffer.size();
@@ -94,10 +160,60 @@ void Compressor::readData()
     if (!_socket->bytesAvailable() || _readBuffer.size() >= maxBufferSize)
         return;
 
-    if (compressionLevel() == NoCompression)
+    if (compressionLevel() == NoCompression) {
         _readBuffer.append(_socket->read(maxBufferSize - _readBuffer.size()));
+        emit readyRead();
+        return;
+    }
+
+    // We let zlib directly append to the readBuffer, which means we pre-allocate extra space for ioBufferSize.
+    // Afterwards, we'll shrink the buffer appropriately. Since shrinking should not reallocate, the readBuffer's
+    // capacity should over time adapt to the largest message sizes we encounter. However, this is not a bad thing
+    // considering that otherwise (using an intermediate buffer) we'd copy around data for every single message.
+    // TODO: Benchmark if it would still make sense to squeeze the buffer from time to time (e.g. after initial sync)!
+
+    while (_socket->bytesAvailable() && _readBuffer.size() + ioBufferSize < maxBufferSize && _inputBuffer.size() < ioBufferSize) {
+        _readBuffer.resize(_readBuffer.size() + ioBufferSize);
+        _inputBuffer.append(_socket->read(ioBufferSize - _inputBuffer.size()));
+
+        _inflater->next_in = reinterpret_cast<const unsigned char *>(_inputBuffer.constData());
+        _inflater->avail_in = _inputBuffer.size();
+        _inflater->next_out = reinterpret_cast<unsigned char *>(_readBuffer.data() + _readBuffer.size() - ioBufferSize);
+        _inflater->avail_out = ioBufferSize;
+
+        const unsigned char *orig_out = _inflater->next_out; // so we see if we have actually produced any output
+
+        int status = inflate(_inflater, Z_SYNC_FLUSH); // get as much data as possible
+
+        // adjust input and output buffers
+        _readBuffer.resize(_inflater->next_out - reinterpret_cast<const unsigned char *>(_readBuffer.constData()));
+        if (_inflater->avail_in > 0)
+            memmove(_inputBuffer.data(), _inflater->next_in, _inflater->avail_in);
+        _inputBuffer.resize(_inflater->avail_in);
 
-    emit readyRead();
+        if (_inflater->next_out != orig_out)
+            emit readyRead();
+
+        switch(status) {
+            case Z_NEED_DICT:
+            case Z_DATA_ERROR:
+            case Z_MEM_ERROR:
+            case Z_STREAM_ERROR:
+                qWarning() << "Error while decompressing stream:" << status;
+                emit error(StreamError);
+                return;
+            case Z_BUF_ERROR:
+                // means that we need more input to continue, so this is not an actual error
+                return;
+            case Z_STREAM_END:
+                qWarning() << "Reached end of zlib stream!"; // this should really never happen
+                return;
+            default:
+                // just try to get more out of the stream
+                break;
+        }
+    }
+    //qDebug() << "inflate in:" << _inflater->total_in << "out:" << _inflater->total_out << "ratio:" << (double)_inflater->total_in/_inflater->total_out;
 }
 
 
@@ -106,7 +222,41 @@ void Compressor::writeData()
     if (compressionLevel() == NoCompression) {
         _socket->write(_writeBuffer);
         _writeBuffer.clear();
+        return;
     }
+
+    _deflater->next_in = reinterpret_cast<const unsigned char *>(_writeBuffer.constData());
+    _deflater->avail_in = _writeBuffer.size();
+
+    int status;
+    do {
+        _deflater->next_out = reinterpret_cast<unsigned char *>(_outputBuffer.data());
+        _deflater->avail_out = ioBufferSize;
+        status = deflate(_deflater, Z_PARTIAL_FLUSH);
+        if (status != Z_OK && status != Z_BUF_ERROR) {
+            qWarning() << "Error while compressing stream:" << status;
+            emit error(StreamError);
+            return;
+        }
+
+        if (_deflater->avail_out == ioBufferSize)
+            continue; // nothing to write here
+
+        if (!_socket->write(_outputBuffer.constData(), ioBufferSize - _deflater->avail_out)) {
+            qWarning() << "Error while writing to socket:" << _socket->errorString();
+            emit error(DeviceError);
+            return;
+        }
+    } while (_deflater->avail_out == 0); // the output buffer being full is the only reason we should have to loop here!
+
+    if (_deflater->avail_in > 0) {
+        qWarning() << "Oops, something weird happened: data still remaining in write buffer!";
+        emit error(StreamError);
+    }
+
+    _writeBuffer.resize(0);
+
+    //qDebug() << "deflate in:" << _deflater->total_in << "out:" << _deflater->total_out << "ratio:" << (double)_deflater->total_out/_deflater->total_in;
 }
 
 
@@ -115,4 +265,5 @@ void Compressor::flush()
     if (compressionLevel() == NoCompression && _socket->state() == QAbstractSocket::ConnectedState)
         _socket->flush();
 
+    // FIXME: missing impl for enabled compression; but then we're not using this method yet
 }
index b8b9e71..85817f1 100644 (file)
@@ -25,6 +25,8 @@
 
 class QTcpSocket;
 
+typedef struct mz_stream_s *z_streamp;
+
 class Compressor : public QObject
 {
     Q_OBJECT
@@ -49,6 +51,7 @@ public:
     };
 
     Compressor(QTcpSocket *socket, CompressionLevel level, QObject *parent = 0);
+    ~Compressor();
 
     CompressionLevel compressionLevel() const { return _level; }
 
@@ -61,12 +64,13 @@ public:
 
 signals:
     void readyRead();
-    void error(Compressor::Error errorCode);
+    void error(Compressor::Error errorCode = StreamError);
 
 private slots:
     void readData();
 
 private:
+    bool initStreams();
     void writeData();
 
 private:
@@ -75,6 +79,12 @@ private:
 
     QByteArray _readBuffer;
     QByteArray _writeBuffer;
+
+    QByteArray _inputBuffer;
+    QByteArray _outputBuffer;
+
+    z_streamp _inflater;
+    z_streamp _deflater;
 };
 
 #endif