1 /***************************************************************************
2 * Copyright (C) 2005-2014 by the Quassel Project *
3 * devel@quassel-irc.org *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) version 3. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the *
17 * Free Software Foundation, Inc., *
18 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. *
19 ***************************************************************************/
21 #include <QHostAddress>
24 #include "datastreampeer.h"
27 using namespace Protocol;
29 DataStreamPeer::DataStreamPeer(::AuthHandler *authHandler, QTcpSocket *socket, quint16 features, QObject *parent)
30 : RemotePeer(authHandler, socket, parent),
32 _useCompression(false)
36 _stream.setDevice(socket);
37 _stream.setVersion(QDataStream::Qt_4_2);
41 void DataStreamPeer::setSignalProxy(::SignalProxy *proxy)
43 RemotePeer::setSignalProxy(proxy);
45 // FIXME only in compat mode
47 // enable compression now if requested - the initial handshake is uncompressed in the legacy protocol!
48 _useCompression = socket()->property("UseCompression").toBool();
50 qDebug() << "Using compression for peer:" << qPrintable(socket()->peerAddress().toString());
56 quint16 DataStreamPeer::supportedFeatures()
62 bool DataStreamPeer::acceptsFeatures(quint16 peerFeatures)
64 Q_UNUSED(peerFeatures);
69 quint16 DataStreamPeer::enabledFeatures() const
75 void DataStreamPeer::onSocketDataAvailable()
78 while (readSocketData(item)) {
79 // if no sigproxy is set, we're in handshake mode and let the data be handled elsewhere
81 handleHandshakeMessage(item);
83 handlePackedFunc(item);
88 bool DataStreamPeer::readSocketData(QVariant &item)
90 if (_blockSize == 0) {
91 if (socket()->bytesAvailable() < 4)
93 _stream >> _blockSize;
96 if (_blockSize > 1 << 22) {
97 close("Peer tried to send package larger than max package size!");
101 if (_blockSize == 0) {
102 close("Peer tried to send 0 byte package!");
106 if (socket()->bytesAvailable() < _blockSize) {
107 emit transferProgress(socket()->bytesAvailable(), _blockSize);
111 emit transferProgress(_blockSize, _blockSize);
115 if (_useCompression) {
119 int nbytes = rawItem.size();
121 const char *data = rawItem.constData();
122 if (nbytes < 4 || (data[0] != 0 || data[1] != 0 || data[2] != 0 || data[3] != 0)) {
123 close("Peer sent corrupted compressed data!");
128 rawItem = qUncompress(rawItem);
130 QDataStream itemStream(&rawItem, QIODevice::ReadOnly);
131 itemStream.setVersion(QDataStream::Qt_4_2);
138 if (!item.isValid()) {
139 close("Peer sent corrupt data: unable to load QVariant!");
147 void DataStreamPeer::writeSocketData(const QVariant &item)
149 if (!socket()->isOpen()) {
150 qWarning() << Q_FUNC_INFO << "Can't write to a closed socket!";
155 QDataStream out(&block, QIODevice::WriteOnly);
156 out.setVersion(QDataStream::Qt_4_2);
158 if (_useCompression) {
160 QDataStream itemStream(&rawItem, QIODevice::WriteOnly);
161 itemStream.setVersion(QDataStream::Qt_4_2);
164 rawItem = qCompress(rawItem);
172 _stream << block; // also writes the length as part of the serialization format
176 /*** Handshake messages ***/
178 /* These messages are transmitted during handshake phase, which in case of the legacy protocol means they have
179 * a structure different from those being used after the handshake.
180 * Also, the legacy handshake does not fully match the redesigned one, so we'll have to do various mappings here.
183 void DataStreamPeer::handleHandshakeMessage(const QVariant &msg)
185 QVariantMap m = msg.toMap();
187 QString msgType = m["MsgType"].toString();
188 if (msgType.isEmpty()) {
189 emit protocolError(tr("Invalid handshake message!"));
193 if (msgType == "ClientInit") {
194 #ifndef QT_NO_COMPRESS
195 // FIXME only in compat mode
196 if (m["UseCompression"].toBool()) {
197 socket()->setProperty("UseCompression", true);
200 handle(RegisterClient(m["ClientVersion"].toString(), false)); // UseSsl obsolete
203 else if (msgType == "ClientInitReject") {
204 handle(ClientDenied(m["Error"].toString()));
207 else if (msgType == "ClientInitAck") {
208 #ifndef QT_NO_COMPRESS
209 if (m["SupportsCompression"].toBool())
210 socket()->setProperty("UseCompression", true);
212 handle(ClientRegistered(m["CoreFeatures"].toUInt(), m["Configured"].toBool(), m["StorageBackends"].toList(), false, QDateTime())); // SupportsSsl and coreStartTime obsolete
215 else if (msgType == "CoreSetupData") {
216 QVariantMap map = m["SetupData"].toMap();
217 handle(SetupData(map["AdminUser"].toString(), map["AdminPasswd"].toString(), map["Backend"].toString(), map["ConnectionProperties"].toMap()));
220 else if (msgType == "CoreSetupReject") {
221 handle(SetupFailed(m["Error"].toString()));
224 else if (msgType == "CoreSetupAck") {
228 else if (msgType == "ClientLogin") {
229 handle(Login(m["User"].toString(), m["Password"].toString()));
232 else if (msgType == "ClientLoginReject") {
233 handle(LoginFailed(m["Error"].toString()));
236 else if (msgType == "ClientLoginAck") {
237 handle(LoginSuccess());
240 else if (msgType == "SessionInit") {
241 QVariantMap map = m["SessionState"].toMap();
242 handle(SessionState(map["Identities"].toList(), map["BufferInfos"].toList(), map["NetworkIds"].toList()));
246 emit protocolError(tr("Unknown protocol message of type %1").arg(msgType));
251 void DataStreamPeer::dispatch(const RegisterClient &msg) {
253 m["MsgType"] = "ClientInit";
254 m["ClientVersion"] = msg.clientVersion;
255 m["ClientDate"] = Quassel::buildInfo().buildDate;
261 void DataStreamPeer::dispatch(const ClientDenied &msg) {
263 m["MsgType"] = "ClientInitReject";
264 m["Error"] = msg.errorString;
270 void DataStreamPeer::dispatch(const ClientRegistered &msg) {
272 m["MsgType"] = "ClientInitAck";
273 m["CoreFeatures"] = msg.coreFeatures;
274 m["StorageBackends"] = msg.backendInfo;
275 m["LoginEnabled"] = m["Configured"] = msg.coreConfigured;
281 void DataStreamPeer::dispatch(const SetupData &msg)
284 map["AdminUser"] = msg.adminUser;
285 map["AdminPasswd"] = msg.adminPassword;
286 map["Backend"] = msg.backend;
287 map["ConnectionProperties"] = msg.setupData;
290 m["MsgType"] = "CoreSetupData";
291 m["SetupData"] = map;
296 void DataStreamPeer::dispatch(const SetupFailed &msg)
299 m["MsgType"] = "CoreSetupReject";
300 m["Error"] = msg.errorString;
306 void DataStreamPeer::dispatch(const SetupDone &msg)
311 m["MsgType"] = "CoreSetupAck";
317 void DataStreamPeer::dispatch(const Login &msg)
320 m["MsgType"] = "ClientLogin";
321 m["User"] = msg.user;
322 m["Password"] = msg.password;
328 void DataStreamPeer::dispatch(const LoginFailed &msg)
331 m["MsgType"] = "ClientLoginReject";
332 m["Error"] = msg.errorString;
338 void DataStreamPeer::dispatch(const LoginSuccess &msg)
343 m["MsgType"] = "ClientLoginAck";
349 void DataStreamPeer::dispatch(const SessionState &msg)
352 m["MsgType"] = "SessionInit";
355 map["BufferInfos"] = msg.bufferInfos;
356 map["NetworkIds"] = msg.networkIds;
357 map["Identities"] = msg.identities;
358 m["SessionState"] = map;
364 /*** Standard messages ***/
366 void DataStreamPeer::handlePackedFunc(const QVariant &packedFunc)
368 QVariantList params(packedFunc.toList());
370 if (params.isEmpty()) {
371 qWarning() << Q_FUNC_INFO << "Received incompatible data:" << packedFunc;
375 // TODO: make sure that this is a valid request type
376 RequestType requestType = (RequestType)params.takeFirst().value<int>();
377 switch (requestType) {
379 if (params.count() < 3) {
380 qWarning() << Q_FUNC_INFO << "Received invalid sync call:" << params;
383 QByteArray className = params.takeFirst().toByteArray();
384 QString objectName = params.takeFirst().toString();
385 QByteArray slotName = params.takeFirst().toByteArray();
386 handle(Protocol::SyncMessage(className, objectName, slotName, params));
390 if (params.empty()) {
391 qWarning() << Q_FUNC_INFO << "Received empty RPC call!";
394 QByteArray slotName = params.takeFirst().toByteArray();
395 handle(Protocol::RpcCall(slotName, params));
399 if (params.count() != 2) {
400 qWarning() << Q_FUNC_INFO << "Received invalid InitRequest:" << params;
403 QByteArray className = params[0].toByteArray();
404 QString objectName = params[1].toString();
405 handle(Protocol::InitRequest(className, objectName));
409 if (params.count() != 3) {
410 qWarning() << Q_FUNC_INFO << "Received invalid InitData:" << params;
413 QByteArray className = params[0].toByteArray();
414 QString objectName = params[1].toString();
415 QVariantMap initData = params[2].toMap();
416 handle(Protocol::InitData(className, objectName, initData));
420 if (params.count() != 1) {
421 qWarning() << Q_FUNC_INFO << "Received invalid HeartBeat:" << params;
424 // The legacy protocol would only send a QTime, no QDateTime
425 // so we assume it's sent today, which works in exactly the same cases as it did in the old implementation
426 QDateTime dateTime = QDateTime::currentDateTime().toUTC();
427 dateTime.setTime(params[0].toTime());
428 handle(Protocol::HeartBeat(dateTime));
431 case HeartBeatReply: {
432 if (params.count() != 1) {
433 qWarning() << Q_FUNC_INFO << "Received invalid HeartBeat:" << params;
436 // The legacy protocol would only send a QTime, no QDateTime
437 // so we assume it's sent today, which works in exactly the same cases as it did in the old implementation
438 QDateTime dateTime = QDateTime::currentDateTime().toUTC();
439 dateTime.setTime(params[0].toTime());
440 handle(Protocol::HeartBeatReply(dateTime));
448 void DataStreamPeer::dispatch(const Protocol::SyncMessage &msg)
450 dispatchPackedFunc(QVariantList() << (qint16)Sync << msg.className << msg.objectName << msg.slotName << msg.params);
454 void DataStreamPeer::dispatch(const Protocol::RpcCall &msg)
456 dispatchPackedFunc(QVariantList() << (qint16)RpcCall << msg.slotName << msg.params);
460 void DataStreamPeer::dispatch(const Protocol::InitRequest &msg)
462 dispatchPackedFunc(QVariantList() << (qint16)InitRequest << msg.className << msg.objectName);
466 void DataStreamPeer::dispatch(const Protocol::InitData &msg)
468 dispatchPackedFunc(QVariantList() << (qint16)InitData << msg.className << msg.objectName << msg.initData);
472 void DataStreamPeer::dispatch(const Protocol::HeartBeat &msg)
474 dispatchPackedFunc(QVariantList() << (qint16)HeartBeat << msg.timestamp.time());
478 void DataStreamPeer::dispatch(const Protocol::HeartBeatReply &msg)
480 dispatchPackedFunc(QVariantList() << (qint16)HeartBeatReply << msg.timestamp.time());
484 void DataStreamPeer::dispatchPackedFunc(const QVariantList &packedFunc)
486 writeSocketData(QVariant(packedFunc));