DataStreamPeer: Optimize message serialization format
[quassel.git] / src / common / protocols / datastream / datastreampeer.cpp
1 /***************************************************************************
2  *   Copyright (C) 2005-2014 by the Quassel Project                        *
3  *   devel@quassel-irc.org                                                 *
4  *                                                                         *
5  *   This program is free software; you can redistribute it and/or modify  *
6  *   it under the terms of the GNU General Public License as published by  *
7  *   the Free Software Foundation; either version 2 of the License, or     *
8  *   (at your option) version 3.                                           *
9  *                                                                         *
10  *   This program is distributed in the hope that it will be useful,       *
11  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
12  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the         *
13  *   GNU General Public License for more details.                          *
14  *                                                                         *
15  *   You should have received a copy of the GNU General Public License     *
16  *   along with this program; if not, write to the                         *
17  *   Free Software Foundation, Inc.,                                       *
18  *   51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.         *
19  ***************************************************************************/
20
21 #include <QtEndian>
22
23 #include <QHostAddress>
24 #include <QTcpSocket>
25
26 #include "datastreampeer.h"
27 #include "quassel.h"
28
29 using namespace Protocol;
30
31 DataStreamPeer::DataStreamPeer(::AuthHandler *authHandler, QTcpSocket *socket, quint16 features, QObject *parent)
32     : RemotePeer(authHandler, socket, parent),
33     _blockSize(0)
34 {
35     Q_UNUSED(features);
36
37     _stream.setDevice(socket);
38     _stream.setVersion(QDataStream::Qt_4_2);
39 }
40
41
42 quint16 DataStreamPeer::supportedFeatures()
43 {
44     return 0;
45 }
46
47
48 bool DataStreamPeer::acceptsFeatures(quint16 peerFeatures)
49 {
50     Q_UNUSED(peerFeatures);
51     return true;
52 }
53
54
55 quint16 DataStreamPeer::enabledFeatures() const
56 {
57     return 0;
58 }
59
60
61 // Note that we're already preparing for readSocketData() moving into RemotePeer, thus the slightly
62 // cumbersome type and stream handling here.
63 void DataStreamPeer::onSocketDataAvailable()
64 {
65     // don't try to read more data if we're already closing
66     if (socket()->state() !=  QAbstractSocket::ConnectedState)
67         return;
68
69     QByteArray data;
70     while (readSocketData(data)) {
71         // data contains always a serialized QVector<QVariant>
72         QDataStream stream(data);
73         stream.setVersion(QDataStream::Qt_4_2);
74         QVariantList list;
75         stream >> list;
76         if (stream.status() != QDataStream::Ok) {
77             close("Peer sent corrupt data, closing down!");
78             return;
79         }
80
81         // if no sigproxy is set, we're in handshake mode
82         if (!signalProxy())
83             handleHandshakeMessage(list);
84         else
85             handlePackedFunc(list);
86     }
87 }
88
89
90 bool DataStreamPeer::readSocketData(QByteArray &data)
91 {
92     if (_blockSize == 0) {
93         if (socket()->bytesAvailable() < 4)
94             return false;
95         // the block size is part of QByteArray's serialization format, so we don't actually read it now...
96         socket()->peek((char*)&_blockSize, 4);
97         _blockSize = qFromBigEndian<quint32>(_blockSize) + 4; // ... but of course we have to add its size to the total size of the block
98     }
99
100     if (_blockSize > 1 << 22) {
101         close("Peer tried to send package larger than max package size!");
102         return false;
103     }
104
105     if (_blockSize == 0) {
106         close("Peer tried to send 0 byte package!");
107         return false;
108     }
109
110     if (socket()->bytesAvailable() < _blockSize) {
111         emit transferProgress(socket()->bytesAvailable(), _blockSize);
112         return false;
113     }
114
115     emit transferProgress(_blockSize, _blockSize);
116
117     _stream >> data;
118     _blockSize = 0;
119
120     if (_stream.status() != QDataStream::Ok) {
121         close("Peer sent corrupt data, closing down!");
122         return false;
123     }
124
125     return true;
126 }
127
128
129 void DataStreamPeer::writeSocketData(const QVariantList &list)
130 {
131     if (!socket()->isOpen()) {
132         qWarning() << Q_FUNC_INFO << "Can't write to a closed socket!";
133         return;
134     }
135
136     QByteArray data;
137     QDataStream msgStream(&data, QIODevice::WriteOnly);
138     msgStream.setVersion(QDataStream::Qt_4_2);
139     msgStream << list;
140
141     _stream << data;  // also writes the block size as part of the serialization format
142     if (_stream.status() != QDataStream::Ok)
143         close("Could not serialize data for peer!");
144 }
145
146
147 void DataStreamPeer::writeSocketData(const QVariantMap &handshakeMsg)
148 {
149     QVariantList list;
150     QVariantMap::const_iterator it = handshakeMsg.begin();
151     while (it != handshakeMsg.end()) {
152         list << it.key().toUtf8() << it.value();
153         ++it;
154     }
155
156     writeSocketData(list);
157 }
158
159
160
161 /*** Handshake messages ***/
162
163 /* These messages are transmitted during handshake phase, which in case of the legacy protocol means they have
164  * a structure different from those being used after the handshake.
165  * Also, the legacy handshake does not fully match the redesigned one, so we'll have to do various mappings here.
166  */
167
168 void DataStreamPeer::handleHandshakeMessage(const QVariantList &mapData)
169 {
170     QVariantMap m;
171     for (int i = 0; i < mapData.count()/2; ++i)
172         m[QString::fromUtf8(mapData[2*i].toByteArray())] = mapData[2*i+1];
173
174     QString msgType = m["MsgType"].toString();
175     if (msgType.isEmpty()) {
176         emit protocolError(tr("Invalid handshake message!"));
177         return;
178     }
179
180     if (msgType == "ClientInit") {
181         handle(RegisterClient(m["ClientVersion"].toString(), false)); // UseSsl obsolete
182     }
183
184     else if (msgType == "ClientInitReject") {
185         handle(ClientDenied(m["Error"].toString()));
186     }
187
188     else if (msgType == "ClientInitAck") {
189         handle(ClientRegistered(m["CoreFeatures"].toUInt(), m["Configured"].toBool(), m["StorageBackends"].toList(), false, QDateTime())); // SupportsSsl and coreStartTime obsolete
190     }
191
192     else if (msgType == "CoreSetupData") {
193         QVariantMap map = m["SetupData"].toMap();
194         handle(SetupData(map["AdminUser"].toString(), map["AdminPasswd"].toString(), map["Backend"].toString(), map["ConnectionProperties"].toMap()));
195     }
196
197     else if (msgType == "CoreSetupReject") {
198         handle(SetupFailed(m["Error"].toString()));
199     }
200
201     else if (msgType == "CoreSetupAck") {
202         handle(SetupDone());
203     }
204
205     else if (msgType == "ClientLogin") {
206         handle(Login(m["User"].toString(), m["Password"].toString()));
207     }
208
209     else if (msgType == "ClientLoginReject") {
210         handle(LoginFailed(m["Error"].toString()));
211     }
212
213     else if (msgType == "ClientLoginAck") {
214         handle(LoginSuccess());
215     }
216
217     else if (msgType == "SessionInit") {
218         QVariantMap map = m["SessionState"].toMap();
219         handle(SessionState(map["Identities"].toList(), map["BufferInfos"].toList(), map["NetworkIds"].toList()));
220     }
221
222     else {
223         emit protocolError(tr("Unknown protocol message of type %1").arg(msgType));
224     }
225 }
226
227
228 void DataStreamPeer::dispatch(const RegisterClient &msg) {
229     QVariantMap m;
230     m["MsgType"] = "ClientInit";
231     m["ClientVersion"] = msg.clientVersion;
232     m["ClientDate"] = Quassel::buildInfo().buildDate;
233
234     writeSocketData(m);
235 }
236
237
238 void DataStreamPeer::dispatch(const ClientDenied &msg) {
239     QVariantMap m;
240     m["MsgType"] = "ClientInitReject";
241     m["Error"] = msg.errorString;
242
243     writeSocketData(m);
244 }
245
246
247 void DataStreamPeer::dispatch(const ClientRegistered &msg) {
248     QVariantMap m;
249     m["MsgType"] = "ClientInitAck";
250     m["CoreFeatures"] = msg.coreFeatures;
251     m["StorageBackends"] = msg.backendInfo;
252     m["LoginEnabled"] = m["Configured"] = msg.coreConfigured;
253
254     writeSocketData(m);
255 }
256
257
258 void DataStreamPeer::dispatch(const SetupData &msg)
259 {
260     QVariantMap map;
261     map["AdminUser"] = msg.adminUser;
262     map["AdminPasswd"] = msg.adminPassword;
263     map["Backend"] = msg.backend;
264     map["ConnectionProperties"] = msg.setupData;
265
266     QVariantMap m;
267     m["MsgType"] = "CoreSetupData";
268     m["SetupData"] = map;
269     writeSocketData(m);
270 }
271
272
273 void DataStreamPeer::dispatch(const SetupFailed &msg)
274 {
275     QVariantMap m;
276     m["MsgType"] = "CoreSetupReject";
277     m["Error"] = msg.errorString;
278
279     writeSocketData(m);
280 }
281
282
283 void DataStreamPeer::dispatch(const SetupDone &msg)
284 {
285     Q_UNUSED(msg)
286
287     QVariantMap m;
288     m["MsgType"] = "CoreSetupAck";
289
290     writeSocketData(m);
291 }
292
293
294 void DataStreamPeer::dispatch(const Login &msg)
295 {
296     QVariantMap m;
297     m["MsgType"] = "ClientLogin";
298     m["User"] = msg.user;
299     m["Password"] = msg.password;
300
301     writeSocketData(m);
302 }
303
304
305 void DataStreamPeer::dispatch(const LoginFailed &msg)
306 {
307     QVariantMap m;
308     m["MsgType"] = "ClientLoginReject";
309     m["Error"] = msg.errorString;
310
311     writeSocketData(m);
312 }
313
314
315 void DataStreamPeer::dispatch(const LoginSuccess &msg)
316 {
317     Q_UNUSED(msg)
318
319     QVariantMap m;
320     m["MsgType"] = "ClientLoginAck";
321
322     writeSocketData(m);
323 }
324
325
326 void DataStreamPeer::dispatch(const SessionState &msg)
327 {
328     QVariantMap m;
329     m["MsgType"] = "SessionInit";
330
331     QVariantMap map;
332     map["BufferInfos"] = msg.bufferInfos;
333     map["NetworkIds"] = msg.networkIds;
334     map["Identities"] = msg.identities;
335     m["SessionState"] = map;
336
337     writeSocketData(m);
338 }
339
340
341 /*** Standard messages ***/
342
343 void DataStreamPeer::handlePackedFunc(const QVariantList &packedFunc)
344 {
345     QVariantList params(packedFunc);
346
347     if (params.isEmpty()) {
348         qWarning() << Q_FUNC_INFO << "Received incompatible data:" << packedFunc;
349         return;
350     }
351
352     // TODO: make sure that this is a valid request type
353     RequestType requestType = (RequestType)params.takeFirst().value<qint16>();
354     switch (requestType) {
355         case Sync: {
356             if (params.count() < 3) {
357                 qWarning() << Q_FUNC_INFO << "Received invalid sync call:" << params;
358                 return;
359             }
360             QByteArray className = params.takeFirst().toByteArray();
361             QString objectName = QString::fromUtf8(params.takeFirst().toByteArray());
362             QByteArray slotName = params.takeFirst().toByteArray();
363             handle(Protocol::SyncMessage(className, objectName, slotName, params));
364             break;
365         }
366         case RpcCall: {
367             if (params.empty()) {
368                 qWarning() << Q_FUNC_INFO << "Received empty RPC call!";
369                 return;
370             }
371             QByteArray slotName = params.takeFirst().toByteArray();
372             handle(Protocol::RpcCall(slotName, params));
373             break;
374         }
375         case InitRequest: {
376             if (params.count() != 2) {
377                 qWarning() << Q_FUNC_INFO << "Received invalid InitRequest:" << params;
378                 return;
379             }
380             QByteArray className = params[0].toByteArray();
381             QString objectName = QString::fromUtf8(params[1].toByteArray());
382             handle(Protocol::InitRequest(className, objectName));
383             break;
384         }
385         case InitData: {
386             if (params.count() != 3) {
387                 qWarning() << Q_FUNC_INFO << "Received invalid InitData:" << params;
388                 return;
389             }
390             QByteArray className = params[0].toByteArray();
391             QString objectName = QString::fromUtf8(params[1].toByteArray());
392             QVariantMap initData = params[2].toMap();
393             handle(Protocol::InitData(className, objectName, initData));
394             break;
395         }
396         case HeartBeat: {
397             if (params.count() != 1) {
398                 qWarning() << Q_FUNC_INFO << "Received invalid HeartBeat:" << params;
399                 return;
400             }
401             // Note: QDateTime instead of QTime as in the legacy protocol!
402             handle(Protocol::HeartBeat(params[0].toDateTime()));
403             break;
404         }
405         case HeartBeatReply: {
406             if (params.count() != 1) {
407                 qWarning() << Q_FUNC_INFO << "Received invalid HeartBeat:" << params;
408                 return;
409             }
410             // Note: QDateTime instead of QTime as in the legacy protocol!
411             handle(Protocol::HeartBeatReply(params[0].toDateTime()));
412             break;
413         }
414
415     }
416 }
417
418
419 void DataStreamPeer::dispatch(const Protocol::SyncMessage &msg)
420 {
421     dispatchPackedFunc(QVariantList() << (qint16)Sync << msg.className << msg.objectName.toUtf8() << msg.slotName << msg.params);
422 }
423
424
425 void DataStreamPeer::dispatch(const Protocol::RpcCall &msg)
426 {
427     dispatchPackedFunc(QVariantList() << (qint16)RpcCall << msg.slotName << msg.params);
428 }
429
430
431 void DataStreamPeer::dispatch(const Protocol::InitRequest &msg)
432 {
433     dispatchPackedFunc(QVariantList() << (qint16)InitRequest << msg.className << msg.objectName.toUtf8());
434 }
435
436
437 void DataStreamPeer::dispatch(const Protocol::InitData &msg)
438 {
439     dispatchPackedFunc(QVariantList() << (qint16)InitData << msg.className << msg.objectName.toUtf8() << msg.initData);
440 }
441
442
443 void DataStreamPeer::dispatch(const Protocol::HeartBeat &msg)
444 {
445     dispatchPackedFunc(QVariantList() << (qint16)HeartBeat << msg.timestamp);
446 }
447
448
449 void DataStreamPeer::dispatch(const Protocol::HeartBeatReply &msg)
450 {
451     dispatchPackedFunc(QVariantList() << (qint16)HeartBeatReply << msg.timestamp);
452 }
453
454
455 void DataStreamPeer::dispatchPackedFunc(const QVariantList &packedFunc)
456 {
457     writeSocketData(packedFunc);
458 }