X-Git-Url: https://git.quassel-irc.org/?p=quassel.git;a=blobdiff_plain;f=src%2Fcommon%2Fcompressor.cpp;h=f391a43bac3f1c29e193a92e8b54834101a8aae5;hp=9ee877c2d4310d7b59cd52228305bafe5a98320b;hb=4a82836fd9c03eb930e934e8839d560694145b6c;hpb=9ad83fb2c64caf43b3f565cc79def6d43d30a5c1 diff --git a/src/common/compressor.cpp b/src/common/compressor.cpp index 9ee877c2..f391a43b 100644 --- a/src/common/compressor.cpp +++ b/src/common/compressor.cpp @@ -23,17 +23,35 @@ #include #include -const qint64 maxBufferSize = 64 * 1024 * 1024; // protect us from zip bombs +#ifdef HAVE_ZLIB +# include +#else +# define MINIZ_HEADER_FILE_ONLY +# include "../../3rdparty/miniz/miniz.c" +#endif + +const int maxBufferSize = 64 * 1024 * 1024; // protect us from zip bombs +const int ioBufferSize = 64 * 1024; // chunk size for inflate/deflate; should not be too large as we preallocate that space! Compressor::Compressor(QTcpSocket *socket, Compressor::CompressionLevel level, QObject *parent) : QObject(parent), _socket(socket), - _level(level) + _level(level), + _inflater(0), + _deflater(0) { - _level = NoCompression; // compression not implemented yet - connect(socket, SIGNAL(readyRead()), SLOT(readData())); + bool ok = true; + if (level != NoCompression) + ok = initStreams(); + + if (!ok) { + // something went wrong during initialization... but we can only emit an error after RemotePeer has connected its signal + QTimer::singleShot(0, this, SIGNAL(error())); + return; + } + // It's possible that more data has already arrived during the handshake, so readyRead() wouldn't be triggered. // However, we want to give RemotePeer a chance to connect to our signals, so trigger this asynchronously. if (socket->bytesAvailable()) @@ -41,6 +59,58 @@ Compressor::Compressor(QTcpSocket *socket, Compressor::CompressionLevel level, Q } +Compressor::~Compressor() +{ + // release resources allocated by zlib + if (_inflater) { + inflateEnd(_inflater); + delete _inflater; + } + if (_deflater) { + deflateEnd(_deflater); + delete _deflater; + } +} + + +bool Compressor::initStreams() +{ + int zlevel; + switch(compressionLevel()) { + case BestCompression: + zlevel = 9; + break; + case BestSpeed: + zlevel = 1; + break; + default: + zlevel = Z_DEFAULT_COMPRESSION; + } + + _inflater = new z_stream; + memset(_inflater, 0, sizeof(z_stream)); + if (Z_OK != inflateInit(_inflater)) { + qWarning() << "Could not initialize the inflate stream!"; + return false; + } + + _deflater = new z_stream; + memset(_deflater, 0, sizeof(z_stream)); + if (Z_OK != deflateInit(_deflater, zlevel)) { + qWarning() << "Could not intialize the deflate stream!"; + return false; + } + + _inputBuffer.reserve(ioBufferSize); // pre-allocate space + _outputBuffer.resize(ioBufferSize); // not a typo; we never change the size of this buffer anyway (we *do* for _inputBuffer!) + + qDebug() << "Enabling compression..."; + + return true; +} + + + qint64 Compressor::bytesAvailable() const { return _readBuffer.size(); @@ -94,10 +164,60 @@ void Compressor::readData() if (!_socket->bytesAvailable() || _readBuffer.size() >= maxBufferSize) return; - if (compressionLevel() == NoCompression) + if (compressionLevel() == NoCompression) { _readBuffer.append(_socket->read(maxBufferSize - _readBuffer.size())); + emit readyRead(); + return; + } + + // We let zlib directly append to the readBuffer, which means we pre-allocate extra space for ioBufferSize. + // Afterwards, we'll shrink the buffer appropriately. Since shrinking should not reallocate, the readBuffer's + // capacity should over time adapt to the largest message sizes we encounter. However, this is not a bad thing + // considering that otherwise (using an intermediate buffer) we'd copy around data for every single message. + // TODO: Benchmark if it would still make sense to squeeze the buffer from time to time (e.g. after initial sync)! + + while (_socket->bytesAvailable() && _readBuffer.size() + ioBufferSize < maxBufferSize && _inputBuffer.size() < ioBufferSize) { + _readBuffer.resize(_readBuffer.size() + ioBufferSize); + _inputBuffer.append(_socket->read(ioBufferSize - _inputBuffer.size())); + + _inflater->next_in = reinterpret_cast(_inputBuffer.data()); + _inflater->avail_in = _inputBuffer.size(); + _inflater->next_out = reinterpret_cast(_readBuffer.data() + _readBuffer.size() - ioBufferSize); + _inflater->avail_out = ioBufferSize; + + const unsigned char *orig_out = _inflater->next_out; // so we see if we have actually produced any output + + int status = inflate(_inflater, Z_SYNC_FLUSH); // get as much data as possible + + // adjust input and output buffers + _readBuffer.resize(_inflater->next_out - reinterpret_cast(_readBuffer.data())); + if (_inflater->avail_in > 0) + memmove(_inputBuffer.data(), _inflater->next_in, _inflater->avail_in); + _inputBuffer.resize(_inflater->avail_in); - emit readyRead(); + if (_inflater->next_out != orig_out) + emit readyRead(); + + switch(status) { + case Z_NEED_DICT: + case Z_DATA_ERROR: + case Z_MEM_ERROR: + case Z_STREAM_ERROR: + qWarning() << "Error while decompressing stream:" << status; + emit error(StreamError); + return; + case Z_BUF_ERROR: + // means that we need more input to continue, so this is not an actual error + return; + case Z_STREAM_END: + qWarning() << "Reached end of zlib stream!"; // this should really never happen + return; + default: + // just try to get more out of the stream + break; + } + } + //qDebug() << "inflate in:" << _inflater->total_in << "out:" << _inflater->total_out << "ratio:" << (double)_inflater->total_in/_inflater->total_out; } @@ -106,7 +226,41 @@ void Compressor::writeData() if (compressionLevel() == NoCompression) { _socket->write(_writeBuffer); _writeBuffer.clear(); + return; } + + _deflater->next_in = reinterpret_cast(_writeBuffer.data()); + _deflater->avail_in = _writeBuffer.size(); + + int status; + do { + _deflater->next_out = reinterpret_cast(_outputBuffer.data()); + _deflater->avail_out = ioBufferSize; + status = deflate(_deflater, Z_PARTIAL_FLUSH); + if (status != Z_OK && status != Z_BUF_ERROR) { + qWarning() << "Error while compressing stream:" << status; + emit error(StreamError); + return; + } + + if (_deflater->avail_out == static_cast(ioBufferSize)) + continue; // nothing to write here + + if (!_socket->write(_outputBuffer.constData(), ioBufferSize - _deflater->avail_out)) { + qWarning() << "Error while writing to socket:" << _socket->errorString(); + emit error(DeviceError); + return; + } + } while (_deflater->avail_out == 0); // the output buffer being full is the only reason we should have to loop here! + + if (_deflater->avail_in > 0) { + qWarning() << "Oops, something weird happened: data still remaining in write buffer!"; + emit error(StreamError); + } + + _writeBuffer.resize(0); + + //qDebug() << "deflate in:" << _deflater->total_in << "out:" << _deflater->total_out << "ratio:" << (double)_deflater->total_out/_deflater->total_in; } @@ -115,4 +269,5 @@ void Compressor::flush() if (compressionLevel() == NoCompression && _socket->state() == QAbstractSocket::ConnectedState) _socket->flush(); + // FIXME: missing impl for enabled compression; but then we're not using this method yet }