1 /***************************************************************************
2 * Copyright (C) 2005-2019 by the Quassel Project *
3 * devel@quassel-irc.org *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) version 3. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the *
17 * Free Software Foundation, Inc., *
18 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. *
19 ***************************************************************************/
21 #include "compressor.h"
26 const int maxBufferSize = 64 * 1024 * 1024; // protect us from zip bombs
27 const int ioBufferSize = 64 * 1024; // chunk size for inflate/deflate; should not be too large as we preallocate that space!
29 Compressor::Compressor(QTcpSocket* socket, Compressor::CompressionLevel level, QObject* parent)
36 connect(socket, &QIODevice::readyRead, this, &Compressor::readData);
39 if (level != NoCompression)
43 // something went wrong during initialization... but we can only emit an error after RemotePeer has connected its signal
44 QTimer::singleShot(0, this, [this]() { emit error(); });
48 // It's possible that more data has already arrived during the handshake, so readyRead() wouldn't be triggered.
49 // However, we want to give RemotePeer a chance to connect to our signals, so trigger this asynchronously.
50 if (socket->bytesAvailable())
51 QTimer::singleShot(0, this, &Compressor::readData);
54 Compressor::~Compressor()
56 // release resources allocated by zlib
58 inflateEnd(_inflater);
62 deflateEnd(_deflater);
67 bool Compressor::initStreams()
70 switch (compressionLevel()) {
78 zlevel = Z_DEFAULT_COMPRESSION;
81 _inflater = new z_stream;
82 memset(_inflater, 0, sizeof(z_stream));
83 if (Z_OK != inflateInit(_inflater)) {
84 qWarning() << "Could not initialize the inflate stream!";
88 _deflater = new z_stream;
89 memset(_deflater, 0, sizeof(z_stream));
90 if (Z_OK != deflateInit(_deflater, zlevel)) {
91 qWarning() << "Could not initialize the deflate stream!";
95 _inputBuffer.reserve(ioBufferSize); // pre-allocate space
96 _outputBuffer.resize(ioBufferSize); // not a typo; we never change the size of this buffer anyway (we *do* for _inputBuffer!)
98 qDebug() << "Enabling compression...";
103 qint64 Compressor::bytesAvailable() const
105 return _readBuffer.size();
108 qint64 Compressor::read(char* data, qint64 maxSize)
111 maxSize = _readBuffer.size();
113 qint64 n = qMin(maxSize, (qint64)_readBuffer.size());
114 memcpy(data, _readBuffer.constData(), n);
116 // TODO: don't copy for every read
117 if (n == _readBuffer.size())
120 _readBuffer = _readBuffer.mid(n);
122 // If there's still data left in the socket buffer, make sure to schedule a read
123 if (_socket->bytesAvailable())
124 QTimer::singleShot(0, this, &Compressor::readData);
129 // The usual usage pattern is to write a blocksize first, followed by the actual data.
130 // By setting NoFlush, one can indicate that the write buffer should not immediately be
131 // written, which should make things a bit more efficient.
132 qint64 Compressor::write(const char* data, qint64 count, WriteBufferHint flush)
134 int pos = _writeBuffer.size();
135 _writeBuffer.resize(pos + count);
136 memcpy(_writeBuffer.data() + pos, data, count);
138 if (flush != NoFlush)
144 void Compressor::readData()
146 // don't try to read more data if we're already closing
147 if (_socket->state() != QAbstractSocket::ConnectedState)
150 if (!_socket->bytesAvailable() || _readBuffer.size() >= maxBufferSize)
153 if (compressionLevel() == NoCompression) {
154 _readBuffer.append(_socket->read(maxBufferSize - _readBuffer.size()));
159 // We let zlib directly append to the readBuffer, which means we pre-allocate extra space for ioBufferSize.
160 // Afterwards, we'll shrink the buffer appropriately. Since shrinking should not reallocate, the readBuffer's
161 // capacity should over time adapt to the largest message sizes we encounter. However, this is not a bad thing
162 // considering that otherwise (using an intermediate buffer) we'd copy around data for every single message.
163 // TODO: Benchmark if it would still make sense to squeeze the buffer from time to time (e.g. after initial sync)!
165 while (_socket->bytesAvailable() && _readBuffer.size() + ioBufferSize < maxBufferSize && _inputBuffer.size() < ioBufferSize) {
166 _readBuffer.resize(_readBuffer.size() + ioBufferSize);
167 _inputBuffer.append(_socket->read(ioBufferSize - _inputBuffer.size()));
169 _inflater->next_in = reinterpret_cast<unsigned char*>(_inputBuffer.data());
170 _inflater->avail_in = _inputBuffer.size();
171 _inflater->next_out = reinterpret_cast<unsigned char*>(_readBuffer.data() + _readBuffer.size() - ioBufferSize);
172 _inflater->avail_out = ioBufferSize;
174 const unsigned char* orig_out = _inflater->next_out; // so we see if we have actually produced any output
176 int status = inflate(_inflater, Z_SYNC_FLUSH); // get as much data as possible
178 // adjust input and output buffers
179 _readBuffer.resize(_inflater->next_out - reinterpret_cast<unsigned char*>(_readBuffer.data()));
180 if (_inflater->avail_in > 0)
181 memmove(_inputBuffer.data(), _inflater->next_in, _inflater->avail_in);
182 _inputBuffer.resize(_inflater->avail_in);
184 if (_inflater->next_out != orig_out)
192 qWarning() << "Error while decompressing stream:" << status;
193 emit error(StreamError);
196 // means that we need more input to continue, so this is not an actual error
199 qWarning() << "Reached end of zlib stream!"; // this should really never happen
202 // just try to get more out of the stream
206 // qDebug() << "inflate in:" << _inflater->total_in << "out:" << _inflater->total_out << "ratio:" << (double)_inflater->total_in/_inflater->total_out;
209 void Compressor::writeData()
211 if (compressionLevel() == NoCompression) {
212 _socket->write(_writeBuffer);
213 _writeBuffer.clear();
217 _deflater->next_in = reinterpret_cast<unsigned char*>(_writeBuffer.data());
218 _deflater->avail_in = _writeBuffer.size();
222 _deflater->next_out = reinterpret_cast<unsigned char*>(_outputBuffer.data());
223 _deflater->avail_out = ioBufferSize;
224 status = deflate(_deflater, Z_PARTIAL_FLUSH);
225 if (status != Z_OK && status != Z_BUF_ERROR) {
226 qWarning() << "Error while compressing stream:" << status;
227 emit error(StreamError);
231 if (_deflater->avail_out == static_cast<unsigned int>(ioBufferSize))
232 continue; // nothing to write here
234 if (!_socket->write(_outputBuffer.constData(), ioBufferSize - _deflater->avail_out)) {
235 qWarning() << "Error while writing to socket:" << _socket->errorString();
236 emit error(DeviceError);
239 } while (_deflater->avail_out == 0); // the output buffer being full is the only reason we should have to loop here!
241 if (_deflater->avail_in > 0) {
242 qWarning() << "Oops, something weird happened: data still remaining in write buffer!";
243 emit error(StreamError);
246 _writeBuffer.resize(0);
248 // qDebug() << "deflate in:" << _deflater->total_in << "out:" << _deflater->total_out << "ratio:" << (double)_deflater->total_out/_deflater->total_in;
251 void Compressor::flush()
253 if (compressionLevel() == NoCompression && _socket->state() == QAbstractSocket::ConnectedState)
256 // FIXME: missing impl for enabled compression; but then we're not using this method yet