Add a flag to enable Qt deprecation warnings on Qt < 5.13
[quassel.git] / src / common / compressor.cpp
1 /***************************************************************************
2  *   Copyright (C) 2005-2019 by the Quassel Project                        *
3  *   devel@quassel-irc.org                                                 *
4  *                                                                         *
5  *   This program is free software; you can redistribute it and/or modify  *
6  *   it under the terms of the GNU General Public License as published by  *
7  *   the Free Software Foundation; either version 2 of the License, or     *
8  *   (at your option) version 3.                                           *
9  *                                                                         *
10  *   This program is distributed in the hope that it will be useful,       *
11  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
12  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the         *
13  *   GNU General Public License for more details.                          *
14  *                                                                         *
15  *   You should have received a copy of the GNU General Public License     *
16  *   along with this program; if not, write to the                         *
17  *   Free Software Foundation, Inc.,                                       *
18  *   51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.         *
19  ***************************************************************************/
20
21 #include "compressor.h"
22
23 #include <QTcpSocket>
24 #include <QTimer>
25
26 const int maxBufferSize = 64 * 1024 * 1024;  // protect us from zip bombs
27 const int ioBufferSize = 64 * 1024;          // chunk size for inflate/deflate; should not be too large as we preallocate that space!
28
29 Compressor::Compressor(QTcpSocket* socket, Compressor::CompressionLevel level, QObject* parent)
30     : QObject(parent)
31     , _socket(socket)
32     , _level(level)
33     , _inflater(nullptr)
34     , _deflater(nullptr)
35 {
36     connect(socket, &QIODevice::readyRead, this, &Compressor::readData);
37
38     bool ok = true;
39     if (level != NoCompression)
40         ok = initStreams();
41
42     if (!ok) {
43         // something went wrong during initialization... but we can only emit an error after RemotePeer has connected its signal
44         QTimer::singleShot(0, this, [this]() { emit error(); });
45         return;
46     }
47
48     // It's possible that more data has already arrived during the handshake, so readyRead() wouldn't be triggered.
49     // However, we want to give RemotePeer a chance to connect to our signals, so trigger this asynchronously.
50     if (socket->bytesAvailable())
51         QTimer::singleShot(0, this, &Compressor::readData);
52 }
53
54 Compressor::~Compressor()
55 {
56     // release resources allocated by zlib
57     if (_inflater) {
58         inflateEnd(_inflater);
59         delete _inflater;
60     }
61     if (_deflater) {
62         deflateEnd(_deflater);
63         delete _deflater;
64     }
65 }
66
67 bool Compressor::initStreams()
68 {
69     int zlevel;
70     switch (compressionLevel()) {
71     case BestCompression:
72         zlevel = 9;
73         break;
74     case BestSpeed:
75         zlevel = 1;
76         break;
77     default:
78         zlevel = Z_DEFAULT_COMPRESSION;
79     }
80
81     _inflater = new z_stream;
82     memset(_inflater, 0, sizeof(z_stream));
83     if (Z_OK != inflateInit(_inflater)) {
84         qWarning() << "Could not initialize the inflate stream!";
85         return false;
86     }
87
88     _deflater = new z_stream;
89     memset(_deflater, 0, sizeof(z_stream));
90     if (Z_OK != deflateInit(_deflater, zlevel)) {
91         qWarning() << "Could not initialize the deflate stream!";
92         return false;
93     }
94
95     _inputBuffer.reserve(ioBufferSize);  // pre-allocate space
96     _outputBuffer.resize(ioBufferSize);  // not a typo; we never change the size of this buffer anyway (we *do* for _inputBuffer!)
97
98     qDebug() << "Enabling compression...";
99
100     return true;
101 }
102
103 qint64 Compressor::bytesAvailable() const
104 {
105     return _readBuffer.size();
106 }
107
108 qint64 Compressor::read(char* data, qint64 maxSize)
109 {
110     if (maxSize <= 0)
111         maxSize = _readBuffer.size();
112
113     qint64 n = qMin(maxSize, (qint64)_readBuffer.size());
114     memcpy(data, _readBuffer.constData(), n);
115
116     // TODO: don't copy for every read
117     if (n == _readBuffer.size())
118         _readBuffer.clear();
119     else
120         _readBuffer = _readBuffer.mid(n);
121
122     // If there's still data left in the socket buffer, make sure to schedule a read
123     if (_socket->bytesAvailable())
124         QTimer::singleShot(0, this, &Compressor::readData);
125
126     return n;
127 }
128
129 // The usual usage pattern is to write a blocksize first, followed by the actual data.
130 // By setting NoFlush, one can indicate that the write buffer should not immediately be
131 // written, which should make things a bit more efficient.
132 qint64 Compressor::write(const char* data, qint64 count, WriteBufferHint flush)
133 {
134     int pos = _writeBuffer.size();
135     _writeBuffer.resize(pos + count);
136     memcpy(_writeBuffer.data() + pos, data, count);
137
138     if (flush != NoFlush)
139         writeData();
140
141     return count;
142 }
143
144 void Compressor::readData()
145 {
146     // don't try to read more data if we're already closing
147     if (_socket->state() != QAbstractSocket::ConnectedState)
148         return;
149
150     if (!_socket->bytesAvailable() || _readBuffer.size() >= maxBufferSize)
151         return;
152
153     if (compressionLevel() == NoCompression) {
154         _readBuffer.append(_socket->read(maxBufferSize - _readBuffer.size()));
155         emit readyRead();
156         return;
157     }
158
159     // We let zlib directly append to the readBuffer, which means we pre-allocate extra space for ioBufferSize.
160     // Afterwards, we'll shrink the buffer appropriately. Since shrinking should not reallocate, the readBuffer's
161     // capacity should over time adapt to the largest message sizes we encounter. However, this is not a bad thing
162     // considering that otherwise (using an intermediate buffer) we'd copy around data for every single message.
163     // TODO: Benchmark if it would still make sense to squeeze the buffer from time to time (e.g. after initial sync)!
164
165     while (_socket->bytesAvailable() && _readBuffer.size() + ioBufferSize < maxBufferSize && _inputBuffer.size() < ioBufferSize) {
166         _readBuffer.resize(_readBuffer.size() + ioBufferSize);
167         _inputBuffer.append(_socket->read(ioBufferSize - _inputBuffer.size()));
168
169         _inflater->next_in = reinterpret_cast<unsigned char*>(_inputBuffer.data());
170         _inflater->avail_in = _inputBuffer.size();
171         _inflater->next_out = reinterpret_cast<unsigned char*>(_readBuffer.data() + _readBuffer.size() - ioBufferSize);
172         _inflater->avail_out = ioBufferSize;
173
174         const unsigned char* orig_out = _inflater->next_out;  // so we see if we have actually produced any output
175
176         int status = inflate(_inflater, Z_SYNC_FLUSH);  // get as much data as possible
177
178         // adjust input and output buffers
179         _readBuffer.resize(_inflater->next_out - reinterpret_cast<unsigned char*>(_readBuffer.data()));
180         if (_inflater->avail_in > 0)
181             memmove(_inputBuffer.data(), _inflater->next_in, _inflater->avail_in);
182         _inputBuffer.resize(_inflater->avail_in);
183
184         if (_inflater->next_out != orig_out)
185             emit readyRead();
186
187         switch (status) {
188         case Z_NEED_DICT:
189         case Z_DATA_ERROR:
190         case Z_MEM_ERROR:
191         case Z_STREAM_ERROR:
192             qWarning() << "Error while decompressing stream:" << status;
193             emit error(StreamError);
194             return;
195         case Z_BUF_ERROR:
196             // means that we need more input to continue, so this is not an actual error
197             return;
198         case Z_STREAM_END:
199             qWarning() << "Reached end of zlib stream!";  // this should really never happen
200             return;
201         default:
202             // just try to get more out of the stream
203             break;
204         }
205     }
206     // qDebug() << "inflate in:" << _inflater->total_in << "out:" << _inflater->total_out << "ratio:" << (double)_inflater->total_in/_inflater->total_out;
207 }
208
209 void Compressor::writeData()
210 {
211     if (compressionLevel() == NoCompression) {
212         _socket->write(_writeBuffer);
213         _writeBuffer.clear();
214         return;
215     }
216
217     _deflater->next_in = reinterpret_cast<unsigned char*>(_writeBuffer.data());
218     _deflater->avail_in = _writeBuffer.size();
219
220     int status;
221     do {
222         _deflater->next_out = reinterpret_cast<unsigned char*>(_outputBuffer.data());
223         _deflater->avail_out = ioBufferSize;
224         status = deflate(_deflater, Z_PARTIAL_FLUSH);
225         if (status != Z_OK && status != Z_BUF_ERROR) {
226             qWarning() << "Error while compressing stream:" << status;
227             emit error(StreamError);
228             return;
229         }
230
231         if (_deflater->avail_out == static_cast<unsigned int>(ioBufferSize))
232             continue;  // nothing to write here
233
234         if (!_socket->write(_outputBuffer.constData(), ioBufferSize - _deflater->avail_out)) {
235             qWarning() << "Error while writing to socket:" << _socket->errorString();
236             emit error(DeviceError);
237             return;
238         }
239     } while (_deflater->avail_out == 0);  // the output buffer being full is the only reason we should have to loop here!
240
241     if (_deflater->avail_in > 0) {
242         qWarning() << "Oops, something weird happened: data still remaining in write buffer!";
243         emit error(StreamError);
244     }
245
246     _writeBuffer.resize(0);
247
248     // qDebug() << "deflate in:" << _deflater->total_in << "out:" << _deflater->total_out << "ratio:" << (double)_deflater->total_out/_deflater->total_in;
249 }
250
251 void Compressor::flush()
252 {
253     if (compressionLevel() == NoCompression && _socket->state() == QAbstractSocket::ConnectedState)
254         _socket->flush();
255
256     // FIXME: missing impl for enabled compression; but then we're not using this method yet
257 }