/***************************************************************************
- * Copyright (C) 2005-2014 by the Quassel Project *
+ * Copyright (C) 2005-2016 by the Quassel Project *
* devel@quassel-irc.org *
* *
* This program is free software; you can redistribute it and/or modify *
#include <QTcpSocket>
#include <QTimer>
-const qint64 maxBufferSize = 64 * 1024 * 1024; // protect us from zip bombs
+#ifdef HAVE_ZLIB
+# include <zlib.h>
+#else
+# define MINIZ_HEADER_FILE_ONLY
+# include "../../3rdparty/miniz/miniz.c"
+#endif
+
+const int maxBufferSize = 64 * 1024 * 1024; // protect us from zip bombs
+const int ioBufferSize = 64 * 1024; // chunk size for inflate/deflate; should not be too large as we preallocate that space!
Compressor::Compressor(QTcpSocket *socket, Compressor::CompressionLevel level, QObject *parent)
: QObject(parent),
_socket(socket),
- _level(level)
+ _level(level),
+ _inflater(0),
+ _deflater(0)
{
- _level = NoCompression; // compression not implemented yet
-
connect(socket, SIGNAL(readyRead()), SLOT(readData()));
+ bool ok = true;
+ if (level != NoCompression)
+ ok = initStreams();
+
+ if (!ok) {
+ // something went wrong during initialization... but we can only emit an error after RemotePeer has connected its signal
+ QTimer::singleShot(0, this, SIGNAL(error()));
+ return;
+ }
+
// It's possible that more data has already arrived during the handshake, so readyRead() wouldn't be triggered.
// However, we want to give RemotePeer a chance to connect to our signals, so trigger this asynchronously.
if (socket->bytesAvailable())
}
+Compressor::~Compressor()
+{
+ // release resources allocated by zlib
+ if (_inflater) {
+ inflateEnd(_inflater);
+ delete _inflater;
+ }
+ if (_deflater) {
+ deflateEnd(_deflater);
+ delete _deflater;
+ }
+}
+
+
+bool Compressor::initStreams()
+{
+ int zlevel;
+ switch(compressionLevel()) {
+ case BestCompression:
+ zlevel = 9;
+ break;
+ case BestSpeed:
+ zlevel = 1;
+ break;
+ default:
+ zlevel = Z_DEFAULT_COMPRESSION;
+ }
+
+ _inflater = new z_stream;
+ memset(_inflater, 0, sizeof(z_stream));
+ if (Z_OK != inflateInit(_inflater)) {
+ qWarning() << "Could not initialize the inflate stream!";
+ return false;
+ }
+
+ _deflater = new z_stream;
+ memset(_deflater, 0, sizeof(z_stream));
+ if (Z_OK != deflateInit(_deflater, zlevel)) {
+ qWarning() << "Could not initialize the deflate stream!";
+ return false;
+ }
+
+ _inputBuffer.reserve(ioBufferSize); // pre-allocate space
+ _outputBuffer.resize(ioBufferSize); // not a typo; we never change the size of this buffer anyway (we *do* for _inputBuffer!)
+
+ qDebug() << "Enabling compression...";
+
+ return true;
+}
+
+
+
qint64 Compressor::bytesAvailable() const
{
return _readBuffer.size();
if (!_socket->bytesAvailable() || _readBuffer.size() >= maxBufferSize)
return;
- if (compressionLevel() == NoCompression)
+ if (compressionLevel() == NoCompression) {
_readBuffer.append(_socket->read(maxBufferSize - _readBuffer.size()));
+ emit readyRead();
+ return;
+ }
+
+ // We let zlib directly append to the readBuffer, which means we pre-allocate extra space for ioBufferSize.
+ // Afterwards, we'll shrink the buffer appropriately. Since shrinking should not reallocate, the readBuffer's
+ // capacity should over time adapt to the largest message sizes we encounter. However, this is not a bad thing
+ // considering that otherwise (using an intermediate buffer) we'd copy around data for every single message.
+ // TODO: Benchmark if it would still make sense to squeeze the buffer from time to time (e.g. after initial sync)!
+
+ while (_socket->bytesAvailable() && _readBuffer.size() + ioBufferSize < maxBufferSize && _inputBuffer.size() < ioBufferSize) {
+ _readBuffer.resize(_readBuffer.size() + ioBufferSize);
+ _inputBuffer.append(_socket->read(ioBufferSize - _inputBuffer.size()));
+
+ _inflater->next_in = reinterpret_cast<unsigned char *>(_inputBuffer.data());
+ _inflater->avail_in = _inputBuffer.size();
+ _inflater->next_out = reinterpret_cast<unsigned char *>(_readBuffer.data() + _readBuffer.size() - ioBufferSize);
+ _inflater->avail_out = ioBufferSize;
+
+ const unsigned char *orig_out = _inflater->next_out; // so we see if we have actually produced any output
+
+ int status = inflate(_inflater, Z_SYNC_FLUSH); // get as much data as possible
+
+ // adjust input and output buffers
+ _readBuffer.resize(_inflater->next_out - reinterpret_cast<unsigned char *>(_readBuffer.data()));
+ if (_inflater->avail_in > 0)
+ memmove(_inputBuffer.data(), _inflater->next_in, _inflater->avail_in);
+ _inputBuffer.resize(_inflater->avail_in);
- emit readyRead();
+ if (_inflater->next_out != orig_out)
+ emit readyRead();
+
+ switch(status) {
+ case Z_NEED_DICT:
+ case Z_DATA_ERROR:
+ case Z_MEM_ERROR:
+ case Z_STREAM_ERROR:
+ qWarning() << "Error while decompressing stream:" << status;
+ emit error(StreamError);
+ return;
+ case Z_BUF_ERROR:
+ // means that we need more input to continue, so this is not an actual error
+ return;
+ case Z_STREAM_END:
+ qWarning() << "Reached end of zlib stream!"; // this should really never happen
+ return;
+ default:
+ // just try to get more out of the stream
+ break;
+ }
+ }
+ //qDebug() << "inflate in:" << _inflater->total_in << "out:" << _inflater->total_out << "ratio:" << (double)_inflater->total_in/_inflater->total_out;
}
if (compressionLevel() == NoCompression) {
_socket->write(_writeBuffer);
_writeBuffer.clear();
+ return;
}
+
+ _deflater->next_in = reinterpret_cast<unsigned char *>(_writeBuffer.data());
+ _deflater->avail_in = _writeBuffer.size();
+
+ int status;
+ do {
+ _deflater->next_out = reinterpret_cast<unsigned char *>(_outputBuffer.data());
+ _deflater->avail_out = ioBufferSize;
+ status = deflate(_deflater, Z_PARTIAL_FLUSH);
+ if (status != Z_OK && status != Z_BUF_ERROR) {
+ qWarning() << "Error while compressing stream:" << status;
+ emit error(StreamError);
+ return;
+ }
+
+ if (_deflater->avail_out == static_cast<unsigned int>(ioBufferSize))
+ continue; // nothing to write here
+
+ if (!_socket->write(_outputBuffer.constData(), ioBufferSize - _deflater->avail_out)) {
+ qWarning() << "Error while writing to socket:" << _socket->errorString();
+ emit error(DeviceError);
+ return;
+ }
+ } while (_deflater->avail_out == 0); // the output buffer being full is the only reason we should have to loop here!
+
+ if (_deflater->avail_in > 0) {
+ qWarning() << "Oops, something weird happened: data still remaining in write buffer!";
+ emit error(StreamError);
+ }
+
+ _writeBuffer.resize(0);
+
+ //qDebug() << "deflate in:" << _deflater->total_in << "out:" << _deflater->total_out << "ratio:" << (double)_deflater->total_out/_deflater->total_in;
}
if (compressionLevel() == NoCompression && _socket->state() == QAbstractSocket::ConnectedState)
_socket->flush();
+ // FIXME: missing impl for enabled compression; but then we're not using this method yet
}