Allow selecting the search result when searching for buffers
[quassel.git] / src / common / compressor.cpp
1 /***************************************************************************
2  *   Copyright (C) 2005-2018 by the Quassel Project                        *
3  *   devel@quassel-irc.org                                                 *
4  *                                                                         *
5  *   This program is free software; you can redistribute it and/or modify  *
6  *   it under the terms of the GNU General Public License as published by  *
7  *   the Free Software Foundation; either version 2 of the License, or     *
8  *   (at your option) version 3.                                           *
9  *                                                                         *
10  *   This program is distributed in the hope that it will be useful,       *
11  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
12  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the         *
13  *   GNU General Public License for more details.                          *
14  *                                                                         *
15  *   You should have received a copy of the GNU General Public License     *
16  *   along with this program; if not, write to the                         *
17  *   Free Software Foundation, Inc.,                                       *
18  *   51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.         *
19  ***************************************************************************/
20
21 #include "compressor.h"
22
23 #include <QTcpSocket>
24 #include <QTimer>
25
26 #ifdef HAVE_ZLIB
27 #    include <zlib.h>
28 #else
29 #    define MINIZ_HEADER_FILE_ONLY
30 #    include "../../3rdparty/miniz/miniz.c"
31 #endif
32
33 const int maxBufferSize = 64 * 1024 * 1024; // protect us from zip bombs
34 const int ioBufferSize = 64 * 1024;         // chunk size for inflate/deflate; should not be too large as we preallocate that space!
35
36 Compressor::Compressor(QTcpSocket *socket, Compressor::CompressionLevel level, QObject *parent)
37     : QObject(parent),
38     _socket(socket),
39     _level(level),
40     _inflater(0),
41     _deflater(0)
42 {
43     connect(socket, SIGNAL(readyRead()), SLOT(readData()));
44
45     bool ok = true;
46     if (level != NoCompression)
47         ok = initStreams();
48
49     if (!ok) {
50         // something went wrong during initialization... but we can only emit an error after RemotePeer has connected its signal
51         QTimer::singleShot(0, this, SIGNAL(error()));
52         return;
53     }
54
55     // It's possible that more data has already arrived during the handshake, so readyRead() wouldn't be triggered.
56     // However, we want to give RemotePeer a chance to connect to our signals, so trigger this asynchronously.
57     if (socket->bytesAvailable())
58         QTimer::singleShot(0, this, SLOT(readData()));
59 }
60
61
62 Compressor::~Compressor()
63 {
64     // release resources allocated by zlib
65     if (_inflater) {
66         inflateEnd(_inflater);
67         delete _inflater;
68     }
69     if (_deflater) {
70         deflateEnd(_deflater);
71         delete _deflater;
72     }
73 }
74
75
76 bool Compressor::initStreams()
77 {
78     int zlevel;
79     switch(compressionLevel()) {
80         case BestCompression:
81             zlevel = 9;
82             break;
83         case BestSpeed:
84             zlevel = 1;
85             break;
86         default:
87             zlevel = Z_DEFAULT_COMPRESSION;
88     }
89
90     _inflater = new z_stream;
91     memset(_inflater, 0, sizeof(z_stream));
92     if (Z_OK != inflateInit(_inflater)) {
93         qWarning() << "Could not initialize the inflate stream!";
94         return false;
95     }
96
97     _deflater = new z_stream;
98     memset(_deflater, 0, sizeof(z_stream));
99     if (Z_OK != deflateInit(_deflater, zlevel)) {
100         qWarning() << "Could not initialize the deflate stream!";
101         return false;
102     }
103
104     _inputBuffer.reserve(ioBufferSize); // pre-allocate space
105     _outputBuffer.resize(ioBufferSize); // not a typo; we never change the size of this buffer anyway (we *do* for _inputBuffer!)
106
107     qDebug() << "Enabling compression...";
108
109     return true;
110 }
111
112
113
114 qint64 Compressor::bytesAvailable() const
115 {
116     return _readBuffer.size();
117 }
118
119
120 qint64 Compressor::read(char *data, qint64 maxSize)
121 {
122     if (maxSize <= 0)
123         maxSize = _readBuffer.size();
124
125     qint64 n = qMin(maxSize, (qint64)_readBuffer.size());
126     memcpy(data, _readBuffer.constData(), n);
127
128     // TODO: don't copy for every read
129     if (n == _readBuffer.size())
130         _readBuffer.clear();
131     else
132         _readBuffer = _readBuffer.mid(n);
133
134     // If there's still data left in the socket buffer, make sure to schedule a read
135     if (_socket->bytesAvailable())
136         QTimer::singleShot(0, this, SLOT(readData()));
137
138     return n;
139 }
140
141
142 // The usual usage pattern is to write a blocksize first, followed by the actual data.
143 // By setting NoFlush, one can indicate that the write buffer should not immediately be
144 // written, which should make things a bit more efficient.
145 qint64 Compressor::write(const char *data, qint64 count, WriteBufferHint flush)
146 {
147     int pos = _writeBuffer.size();
148     _writeBuffer.resize(pos + count);
149     memcpy(_writeBuffer.data() + pos, data, count);
150
151     if (flush != NoFlush)
152         writeData();
153
154     return count;
155 }
156
157
158 void Compressor::readData()
159 {
160     // don't try to read more data if we're already closing
161     if (_socket->state() !=  QAbstractSocket::ConnectedState)
162         return;
163
164     if (!_socket->bytesAvailable() || _readBuffer.size() >= maxBufferSize)
165         return;
166
167     if (compressionLevel() == NoCompression) {
168         _readBuffer.append(_socket->read(maxBufferSize - _readBuffer.size()));
169         emit readyRead();
170         return;
171     }
172
173     // We let zlib directly append to the readBuffer, which means we pre-allocate extra space for ioBufferSize.
174     // Afterwards, we'll shrink the buffer appropriately. Since shrinking should not reallocate, the readBuffer's
175     // capacity should over time adapt to the largest message sizes we encounter. However, this is not a bad thing
176     // considering that otherwise (using an intermediate buffer) we'd copy around data for every single message.
177     // TODO: Benchmark if it would still make sense to squeeze the buffer from time to time (e.g. after initial sync)!
178
179     while (_socket->bytesAvailable() && _readBuffer.size() + ioBufferSize < maxBufferSize && _inputBuffer.size() < ioBufferSize) {
180         _readBuffer.resize(_readBuffer.size() + ioBufferSize);
181         _inputBuffer.append(_socket->read(ioBufferSize - _inputBuffer.size()));
182
183         _inflater->next_in = reinterpret_cast<unsigned char *>(_inputBuffer.data());
184         _inflater->avail_in = _inputBuffer.size();
185         _inflater->next_out = reinterpret_cast<unsigned char *>(_readBuffer.data() + _readBuffer.size() - ioBufferSize);
186         _inflater->avail_out = ioBufferSize;
187
188         const unsigned char *orig_out = _inflater->next_out; // so we see if we have actually produced any output
189
190         int status = inflate(_inflater, Z_SYNC_FLUSH); // get as much data as possible
191
192         // adjust input and output buffers
193         _readBuffer.resize(_inflater->next_out - reinterpret_cast<unsigned char *>(_readBuffer.data()));
194         if (_inflater->avail_in > 0)
195             memmove(_inputBuffer.data(), _inflater->next_in, _inflater->avail_in);
196         _inputBuffer.resize(_inflater->avail_in);
197
198         if (_inflater->next_out != orig_out)
199             emit readyRead();
200
201         switch(status) {
202             case Z_NEED_DICT:
203             case Z_DATA_ERROR:
204             case Z_MEM_ERROR:
205             case Z_STREAM_ERROR:
206                 qWarning() << "Error while decompressing stream:" << status;
207                 emit error(StreamError);
208                 return;
209             case Z_BUF_ERROR:
210                 // means that we need more input to continue, so this is not an actual error
211                 return;
212             case Z_STREAM_END:
213                 qWarning() << "Reached end of zlib stream!"; // this should really never happen
214                 return;
215             default:
216                 // just try to get more out of the stream
217                 break;
218         }
219     }
220     //qDebug() << "inflate in:" << _inflater->total_in << "out:" << _inflater->total_out << "ratio:" << (double)_inflater->total_in/_inflater->total_out;
221 }
222
223
224 void Compressor::writeData()
225 {
226     if (compressionLevel() == NoCompression) {
227         _socket->write(_writeBuffer);
228         _writeBuffer.clear();
229         return;
230     }
231
232     _deflater->next_in = reinterpret_cast<unsigned char *>(_writeBuffer.data());
233     _deflater->avail_in = _writeBuffer.size();
234
235     int status;
236     do {
237         _deflater->next_out = reinterpret_cast<unsigned char *>(_outputBuffer.data());
238         _deflater->avail_out = ioBufferSize;
239         status = deflate(_deflater, Z_PARTIAL_FLUSH);
240         if (status != Z_OK && status != Z_BUF_ERROR) {
241             qWarning() << "Error while compressing stream:" << status;
242             emit error(StreamError);
243             return;
244         }
245
246         if (_deflater->avail_out == static_cast<unsigned int>(ioBufferSize))
247             continue; // nothing to write here
248
249         if (!_socket->write(_outputBuffer.constData(), ioBufferSize - _deflater->avail_out)) {
250             qWarning() << "Error while writing to socket:" << _socket->errorString();
251             emit error(DeviceError);
252             return;
253         }
254     } while (_deflater->avail_out == 0); // the output buffer being full is the only reason we should have to loop here!
255
256     if (_deflater->avail_in > 0) {
257         qWarning() << "Oops, something weird happened: data still remaining in write buffer!";
258         emit error(StreamError);
259     }
260
261     _writeBuffer.resize(0);
262
263     //qDebug() << "deflate in:" << _deflater->total_in << "out:" << _deflater->total_out << "ratio:" << (double)_deflater->total_out/_deflater->total_in;
264 }
265
266
267 void Compressor::flush()
268 {
269     if (compressionLevel() == NoCompression && _socket->state() == QAbstractSocket::ConnectedState)
270         _socket->flush();
271
272     // FIXME: missing impl for enabled compression; but then we're not using this method yet
273 }