DataStreamPeer: Optimize the InitData message
[quassel.git] / src / common / protocols / datastream / datastreampeer.cpp
index acd0bc4..86fbf53 100644 (file)
@@ -18,6 +18,8 @@
  *   51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.         *
  ***************************************************************************/
 
+#include <QtEndian>
+
 #include <QHostAddress>
 #include <QTcpSocket>
 
@@ -56,25 +58,43 @@ quint16 DataStreamPeer::enabledFeatures() const
 }
 
 
+// Note that we're already preparing for readSocketData() moving into RemotePeer, thus the slightly
+// cumbersome type and stream handling here.
 void DataStreamPeer::onSocketDataAvailable()
 {
-    QVariant item;
-    while (readSocketData(item)) {
-        // if no sigproxy is set, we're in handshake mode and let the data be handled elsewhere
+    // don't try to read more data if we're already closing
+    if (socket()->state() !=  QAbstractSocket::ConnectedState)
+        return;
+
+    QByteArray data;
+    while (readSocketData(data)) {
+        // data contains always a serialized QVector<QVariant>
+        QDataStream stream(data);
+        stream.setVersion(QDataStream::Qt_4_2);
+        QVariantList list;
+        stream >> list;
+        if (stream.status() != QDataStream::Ok) {
+            close("Peer sent corrupt data, closing down!");
+            return;
+        }
+
+        // if no sigproxy is set, we're in handshake mode
         if (!signalProxy())
-            handleHandshakeMessage(item);
+            handleHandshakeMessage(list);
         else
-            handlePackedFunc(item);
+            handlePackedFunc(list);
     }
 }
 
 
-bool DataStreamPeer::readSocketData(QVariant &item)
+bool DataStreamPeer::readSocketData(QByteArray &data)
 {
     if (_blockSize == 0) {
         if (socket()->bytesAvailable() < 4)
             return false;
-        _stream >> _blockSize;
+        // the block size is part of QByteArray's serialization format, so we don't actually read it now...
+        socket()->peek((char*)&_blockSize, 4);
+        _blockSize = qFromBigEndian<quint32>(_blockSize) + 4; // ... but of course we have to add its size to the total size of the block
     }
 
     if (_blockSize > 1 << 22) {
@@ -94,11 +114,11 @@ bool DataStreamPeer::readSocketData(QVariant &item)
 
     emit transferProgress(_blockSize, _blockSize);
 
-    _stream >> item;
+    _stream >> data;
     _blockSize = 0;
 
-    if (!item.isValid()) {
-        close("Peer sent corrupt data: unable to load QVariant!");
+    if (_stream.status() != QDataStream::Ok) {
+        close("Peer sent corrupt data, closing down!");
         return false;
     }
 
@@ -106,23 +126,38 @@ bool DataStreamPeer::readSocketData(QVariant &item)
 }
 
 
-void DataStreamPeer::writeSocketData(const QVariant &item)
+void DataStreamPeer::writeSocketData(const QVariantList &list)
 {
     if (!socket()->isOpen()) {
         qWarning() << Q_FUNC_INFO << "Can't write to a closed socket!";
         return;
     }
 
-    QByteArray block;
-    QDataStream out(&block, QIODevice::WriteOnly);
-    out.setVersion(QDataStream::Qt_4_2);
+    QByteArray data;
+    QDataStream msgStream(&data, QIODevice::WriteOnly);
+    msgStream.setVersion(QDataStream::Qt_4_2);
+    msgStream << list;
+
+    _stream << data;  // also writes the block size as part of the serialization format
+    if (_stream.status() != QDataStream::Ok)
+        close("Could not serialize data for peer!");
+}
 
-    out << item;
 
-    _stream << block;  // also writes the length as part of the serialization format
+void DataStreamPeer::writeSocketData(const QVariantMap &handshakeMsg)
+{
+    QVariantList list;
+    QVariantMap::const_iterator it = handshakeMsg.begin();
+    while (it != handshakeMsg.end()) {
+        list << it.key().toUtf8() << it.value();
+        ++it;
+    }
+
+    writeSocketData(list);
 }
 
 
+
 /*** Handshake messages ***/
 
 /* These messages are transmitted during handshake phase, which in case of the legacy protocol means they have
@@ -130,9 +165,11 @@ void DataStreamPeer::writeSocketData(const QVariant &item)
  * Also, the legacy handshake does not fully match the redesigned one, so we'll have to do various mappings here.
  */
 
-void DataStreamPeer::handleHandshakeMessage(const QVariant &msg)
+void DataStreamPeer::handleHandshakeMessage(const QVariantList &mapData)
 {
-    QVariantMap m = msg.toMap();
+    QVariantMap m;
+    for (int i = 0; i < mapData.count()/2; ++i)
+        m[QString::fromUtf8(mapData[2*i].toByteArray())] = mapData[2*i+1];
 
     QString msgType = m["MsgType"].toString();
     if (msgType.isEmpty()) {
@@ -303,9 +340,9 @@ void DataStreamPeer::dispatch(const SessionState &msg)
 
 /*** Standard messages ***/
 
-void DataStreamPeer::handlePackedFunc(const QVariant &packedFunc)
+void DataStreamPeer::handlePackedFunc(const QVariantList &packedFunc)
 {
-    QVariantList params(packedFunc.toList());
+    QVariantList params(packedFunc);
 
     if (params.isEmpty()) {
         qWarning() << Q_FUNC_INFO << "Received incompatible data:" << packedFunc;
@@ -313,7 +350,7 @@ void DataStreamPeer::handlePackedFunc(const QVariant &packedFunc)
     }
 
     // TODO: make sure that this is a valid request type
-    RequestType requestType = (RequestType)params.takeFirst().value<int>();
+    RequestType requestType = (RequestType)params.takeFirst().value<qint16>();
     switch (requestType) {
         case Sync: {
             if (params.count() < 3) {
@@ -346,13 +383,15 @@ void DataStreamPeer::handlePackedFunc(const QVariant &packedFunc)
             break;
         }
         case InitData: {
-            if (params.count() != 3) {
+            if (params.count() < 2) {
                 qWarning() << Q_FUNC_INFO << "Received invalid InitData:" << params;
                 return;
             }
-            QByteArray className = params[0].toByteArray();
-            QString objectName = QString::fromUtf8(params[1].toByteArray());
-            QVariantMap initData = params[2].toMap();
+            QByteArray className = params.takeFirst().toByteArray();
+            QString objectName = QString::fromUtf8(params.takeFirst().toByteArray());
+            QVariantMap initData;
+            for (int i = 0; i < params.count()/2; ++i)
+                initData[QString::fromUtf8(params[2*i].toByteArray())] = params[2*i+1];
             handle(Protocol::InitData(className, objectName, initData));
             break;
         }
@@ -361,11 +400,8 @@ void DataStreamPeer::handlePackedFunc(const QVariant &packedFunc)
                 qWarning() << Q_FUNC_INFO << "Received invalid HeartBeat:" << params;
                 return;
             }
-            // The legacy protocol would only send a QTime, no QDateTime
-            // so we assume it's sent today, which works in exactly the same cases as it did in the old implementation
-            QDateTime dateTime = QDateTime::currentDateTime().toUTC();
-            dateTime.setTime(params[0].toTime());
-            handle(Protocol::HeartBeat(dateTime));
+            // Note: QDateTime instead of QTime as in the legacy protocol!
+            handle(Protocol::HeartBeat(params[0].toDateTime()));
             break;
         }
         case HeartBeatReply: {
@@ -373,11 +409,8 @@ void DataStreamPeer::handlePackedFunc(const QVariant &packedFunc)
                 qWarning() << Q_FUNC_INFO << "Received invalid HeartBeat:" << params;
                 return;
             }
-            // The legacy protocol would only send a QTime, no QDateTime
-            // so we assume it's sent today, which works in exactly the same cases as it did in the old implementation
-            QDateTime dateTime = QDateTime::currentDateTime().toUTC();
-            dateTime.setTime(params[0].toTime());
-            handle(Protocol::HeartBeatReply(dateTime));
+            // Note: QDateTime instead of QTime as in the legacy protocol!
+            handle(Protocol::HeartBeatReply(params[0].toDateTime()));
             break;
         }
 
@@ -405,23 +438,29 @@ void DataStreamPeer::dispatch(const Protocol::InitRequest &msg)
 
 void DataStreamPeer::dispatch(const Protocol::InitData &msg)
 {
-    dispatchPackedFunc(QVariantList() << (qint16)InitData << msg.className << msg.objectName.toUtf8() << msg.initData);
+    QVariantList initData;
+    QVariantMap::const_iterator it = msg.initData.begin();
+    while (it != msg.initData.end()) {
+        initData << it.key().toUtf8() << it.value();
+        ++it;
+    }
+    dispatchPackedFunc(QVariantList() << (qint16)InitData << msg.className << msg.objectName.toUtf8() << initData);
 }
 
 
 void DataStreamPeer::dispatch(const Protocol::HeartBeat &msg)
 {
-    dispatchPackedFunc(QVariantList() << (qint16)HeartBeat << msg.timestamp.time());
+    dispatchPackedFunc(QVariantList() << (qint16)HeartBeat << msg.timestamp);
 }
 
 
 void DataStreamPeer::dispatch(const Protocol::HeartBeatReply &msg)
 {
-    dispatchPackedFunc(QVariantList() << (qint16)HeartBeatReply << msg.timestamp.time());
+    dispatchPackedFunc(QVariantList() << (qint16)HeartBeatReply << msg.timestamp);
 }
 
 
 void DataStreamPeer::dispatchPackedFunc(const QVariantList &packedFunc)
 {
-    writeSocketData(QVariant(packedFunc));
+    writeSocketData(packedFunc);
 }