tests: Convert ExpressionMatchTests into a GTest-based test case
[quassel.git] / src / common / compressor.cpp
1 /***************************************************************************
2  *   Copyright (C) 2005-2018 by the Quassel Project                        *
3  *   devel@quassel-irc.org                                                 *
4  *                                                                         *
5  *   This program is free software; you can redistribute it and/or modify  *
6  *   it under the terms of the GNU General Public License as published by  *
7  *   the Free Software Foundation; either version 2 of the License, or     *
8  *   (at your option) version 3.                                           *
9  *                                                                         *
10  *   This program is distributed in the hope that it will be useful,       *
11  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
12  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the         *
13  *   GNU General Public License for more details.                          *
14  *                                                                         *
15  *   You should have received a copy of the GNU General Public License     *
16  *   along with this program; if not, write to the                         *
17  *   Free Software Foundation, Inc.,                                       *
18  *   51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.         *
19  ***************************************************************************/
20
21 #include "compressor.h"
22
23 #include <QTcpSocket>
24 #include <QTimer>
25
26 const int maxBufferSize = 64 * 1024 * 1024; // protect us from zip bombs
27 const int ioBufferSize = 64 * 1024;         // chunk size for inflate/deflate; should not be too large as we preallocate that space!
28
29 Compressor::Compressor(QTcpSocket *socket, Compressor::CompressionLevel level, QObject *parent)
30     : QObject(parent),
31     _socket(socket),
32     _level(level),
33     _inflater(nullptr),
34     _deflater(nullptr)
35 {
36     connect(socket, &QIODevice::readyRead, this, &Compressor::readData);
37
38     bool ok = true;
39     if (level != NoCompression)
40         ok = initStreams();
41
42     if (!ok) {
43         // something went wrong during initialization... but we can only emit an error after RemotePeer has connected its signal
44         QTimer::singleShot(0, this, [this]() { emit error(); });
45         return;
46     }
47
48     // It's possible that more data has already arrived during the handshake, so readyRead() wouldn't be triggered.
49     // However, we want to give RemotePeer a chance to connect to our signals, so trigger this asynchronously.
50     if (socket->bytesAvailable())
51         QTimer::singleShot(0, this, &Compressor::readData);
52 }
53
54
55 Compressor::~Compressor()
56 {
57     // release resources allocated by zlib
58     if (_inflater) {
59         inflateEnd(_inflater);
60         delete _inflater;
61     }
62     if (_deflater) {
63         deflateEnd(_deflater);
64         delete _deflater;
65     }
66 }
67
68
69 bool Compressor::initStreams()
70 {
71     int zlevel;
72     switch(compressionLevel()) {
73         case BestCompression:
74             zlevel = 9;
75             break;
76         case BestSpeed:
77             zlevel = 1;
78             break;
79         default:
80             zlevel = Z_DEFAULT_COMPRESSION;
81     }
82
83     _inflater = new z_stream;
84     memset(_inflater, 0, sizeof(z_stream));
85     if (Z_OK != inflateInit(_inflater)) {
86         qWarning() << "Could not initialize the inflate stream!";
87         return false;
88     }
89
90     _deflater = new z_stream;
91     memset(_deflater, 0, sizeof(z_stream));
92     if (Z_OK != deflateInit(_deflater, zlevel)) {
93         qWarning() << "Could not initialize the deflate stream!";
94         return false;
95     }
96
97     _inputBuffer.reserve(ioBufferSize); // pre-allocate space
98     _outputBuffer.resize(ioBufferSize); // not a typo; we never change the size of this buffer anyway (we *do* for _inputBuffer!)
99
100     qDebug() << "Enabling compression...";
101
102     return true;
103 }
104
105
106
107 qint64 Compressor::bytesAvailable() const
108 {
109     return _readBuffer.size();
110 }
111
112
113 qint64 Compressor::read(char *data, qint64 maxSize)
114 {
115     if (maxSize <= 0)
116         maxSize = _readBuffer.size();
117
118     qint64 n = qMin(maxSize, (qint64)_readBuffer.size());
119     memcpy(data, _readBuffer.constData(), n);
120
121     // TODO: don't copy for every read
122     if (n == _readBuffer.size())
123         _readBuffer.clear();
124     else
125         _readBuffer = _readBuffer.mid(n);
126
127     // If there's still data left in the socket buffer, make sure to schedule a read
128     if (_socket->bytesAvailable())
129         QTimer::singleShot(0, this, &Compressor::readData);
130
131     return n;
132 }
133
134
135 // The usual usage pattern is to write a blocksize first, followed by the actual data.
136 // By setting NoFlush, one can indicate that the write buffer should not immediately be
137 // written, which should make things a bit more efficient.
138 qint64 Compressor::write(const char *data, qint64 count, WriteBufferHint flush)
139 {
140     int pos = _writeBuffer.size();
141     _writeBuffer.resize(pos + count);
142     memcpy(_writeBuffer.data() + pos, data, count);
143
144     if (flush != NoFlush)
145         writeData();
146
147     return count;
148 }
149
150
151 void Compressor::readData()
152 {
153     // don't try to read more data if we're already closing
154     if (_socket->state() !=  QAbstractSocket::ConnectedState)
155         return;
156
157     if (!_socket->bytesAvailable() || _readBuffer.size() >= maxBufferSize)
158         return;
159
160     if (compressionLevel() == NoCompression) {
161         _readBuffer.append(_socket->read(maxBufferSize - _readBuffer.size()));
162         emit readyRead();
163         return;
164     }
165
166     // We let zlib directly append to the readBuffer, which means we pre-allocate extra space for ioBufferSize.
167     // Afterwards, we'll shrink the buffer appropriately. Since shrinking should not reallocate, the readBuffer's
168     // capacity should over time adapt to the largest message sizes we encounter. However, this is not a bad thing
169     // considering that otherwise (using an intermediate buffer) we'd copy around data for every single message.
170     // TODO: Benchmark if it would still make sense to squeeze the buffer from time to time (e.g. after initial sync)!
171
172     while (_socket->bytesAvailable() && _readBuffer.size() + ioBufferSize < maxBufferSize && _inputBuffer.size() < ioBufferSize) {
173         _readBuffer.resize(_readBuffer.size() + ioBufferSize);
174         _inputBuffer.append(_socket->read(ioBufferSize - _inputBuffer.size()));
175
176         _inflater->next_in = reinterpret_cast<unsigned char *>(_inputBuffer.data());
177         _inflater->avail_in = _inputBuffer.size();
178         _inflater->next_out = reinterpret_cast<unsigned char *>(_readBuffer.data() + _readBuffer.size() - ioBufferSize);
179         _inflater->avail_out = ioBufferSize;
180
181         const unsigned char *orig_out = _inflater->next_out; // so we see if we have actually produced any output
182
183         int status = inflate(_inflater, Z_SYNC_FLUSH); // get as much data as possible
184
185         // adjust input and output buffers
186         _readBuffer.resize(_inflater->next_out - reinterpret_cast<unsigned char *>(_readBuffer.data()));
187         if (_inflater->avail_in > 0)
188             memmove(_inputBuffer.data(), _inflater->next_in, _inflater->avail_in);
189         _inputBuffer.resize(_inflater->avail_in);
190
191         if (_inflater->next_out != orig_out)
192             emit readyRead();
193
194         switch(status) {
195             case Z_NEED_DICT:
196             case Z_DATA_ERROR:
197             case Z_MEM_ERROR:
198             case Z_STREAM_ERROR:
199                 qWarning() << "Error while decompressing stream:" << status;
200                 emit error(StreamError);
201                 return;
202             case Z_BUF_ERROR:
203                 // means that we need more input to continue, so this is not an actual error
204                 return;
205             case Z_STREAM_END:
206                 qWarning() << "Reached end of zlib stream!"; // this should really never happen
207                 return;
208             default:
209                 // just try to get more out of the stream
210                 break;
211         }
212     }
213     //qDebug() << "inflate in:" << _inflater->total_in << "out:" << _inflater->total_out << "ratio:" << (double)_inflater->total_in/_inflater->total_out;
214 }
215
216
217 void Compressor::writeData()
218 {
219     if (compressionLevel() == NoCompression) {
220         _socket->write(_writeBuffer);
221         _writeBuffer.clear();
222         return;
223     }
224
225     _deflater->next_in = reinterpret_cast<unsigned char *>(_writeBuffer.data());
226     _deflater->avail_in = _writeBuffer.size();
227
228     int status;
229     do {
230         _deflater->next_out = reinterpret_cast<unsigned char *>(_outputBuffer.data());
231         _deflater->avail_out = ioBufferSize;
232         status = deflate(_deflater, Z_PARTIAL_FLUSH);
233         if (status != Z_OK && status != Z_BUF_ERROR) {
234             qWarning() << "Error while compressing stream:" << status;
235             emit error(StreamError);
236             return;
237         }
238
239         if (_deflater->avail_out == static_cast<unsigned int>(ioBufferSize))
240             continue; // nothing to write here
241
242         if (!_socket->write(_outputBuffer.constData(), ioBufferSize - _deflater->avail_out)) {
243             qWarning() << "Error while writing to socket:" << _socket->errorString();
244             emit error(DeviceError);
245             return;
246         }
247     } while (_deflater->avail_out == 0); // the output buffer being full is the only reason we should have to loop here!
248
249     if (_deflater->avail_in > 0) {
250         qWarning() << "Oops, something weird happened: data still remaining in write buffer!";
251         emit error(StreamError);
252     }
253
254     _writeBuffer.resize(0);
255
256     //qDebug() << "deflate in:" << _deflater->total_in << "out:" << _deflater->total_out << "ratio:" << (double)_deflater->total_out/_deflater->total_in;
257 }
258
259
260 void Compressor::flush()
261 {
262     if (compressionLevel() == NoCompression && _socket->state() == QAbstractSocket::ConnectedState)
263         _socket->flush();
264
265     // FIXME: missing impl for enabled compression; but then we're not using this method yet
266 }