DataStreamPeer: Optimize message serialization format
authorManuel Nickschas <sputnick@quassel-irc.org>
Tue, 28 Jan 2014 23:08:34 +0000 (00:08 +0100)
committerManuel Nickschas <sputnick@quassel-irc.org>
Sat, 15 Feb 2014 21:01:01 +0000 (22:01 +0100)
The legacy protocol serializes all messages as QVariant-in-a-QByteArray.
Except with compression, then it is QVariant-in-a-compressed-
QByteArray-in-a-QByteArray. While the actual messages either
comes as a QVariantList or a QVariantMap which is encapsulated
in above QVariant-in-a-mess.

In other words, this seems a bit excessive and causes unneeded overhead.

So for the DataStreamPeer, we're simplifying this. As all post-handshake messages
are structured as a QVariantList, we're now making this the on-wire format
as well. Since we need to know the message size for deserialization, we
first send the total message size as a quint32, followed by the number of items
as a quint32, followed by the QVariants. [1]

Since handshake messages are QVariantMaps, and we want to avoid over-nesting things,
we now send those as such a QVariantList as well, where the items are key-value
pairs - the key as UTF-8 string (as a QByteArray), and the value as a QVariant.

[1] This happens to be the same format as a QByteArray containing a QVariantList,
    so the code for that gets rather simple.

src/common/protocols/datastream/datastreampeer.cpp
src/common/protocols/datastream/datastreampeer.h

index a14af64..a1b4e91 100644 (file)
@@ -18,6 +18,8 @@
  *   51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.         *
  ***************************************************************************/
 
+#include <QtEndian>
+
 #include <QHostAddress>
 #include <QTcpSocket>
 
@@ -56,25 +58,43 @@ quint16 DataStreamPeer::enabledFeatures() const
 }
 
 
+// Note that we're already preparing for readSocketData() moving into RemotePeer, thus the slightly
+// cumbersome type and stream handling here.
 void DataStreamPeer::onSocketDataAvailable()
 {
-    QVariant item;
-    while (readSocketData(item)) {
-        // if no sigproxy is set, we're in handshake mode and let the data be handled elsewhere
+    // don't try to read more data if we're already closing
+    if (socket()->state() !=  QAbstractSocket::ConnectedState)
+        return;
+
+    QByteArray data;
+    while (readSocketData(data)) {
+        // data contains always a serialized QVector<QVariant>
+        QDataStream stream(data);
+        stream.setVersion(QDataStream::Qt_4_2);
+        QVariantList list;
+        stream >> list;
+        if (stream.status() != QDataStream::Ok) {
+            close("Peer sent corrupt data, closing down!");
+            return;
+        }
+
+        // if no sigproxy is set, we're in handshake mode
         if (!signalProxy())
-            handleHandshakeMessage(item);
+            handleHandshakeMessage(list);
         else
-            handlePackedFunc(item);
+            handlePackedFunc(list);
     }
 }
 
 
-bool DataStreamPeer::readSocketData(QVariant &item)
+bool DataStreamPeer::readSocketData(QByteArray &data)
 {
     if (_blockSize == 0) {
         if (socket()->bytesAvailable() < 4)
             return false;
-        _stream >> _blockSize;
+        // the block size is part of QByteArray's serialization format, so we don't actually read it now...
+        socket()->peek((char*)&_blockSize, 4);
+        _blockSize = qFromBigEndian<quint32>(_blockSize) + 4; // ... but of course we have to add its size to the total size of the block
     }
 
     if (_blockSize > 1 << 22) {
@@ -94,11 +114,11 @@ bool DataStreamPeer::readSocketData(QVariant &item)
 
     emit transferProgress(_blockSize, _blockSize);
 
-    _stream >> item;
+    _stream >> data;
     _blockSize = 0;
 
-    if (!item.isValid()) {
-        close("Peer sent corrupt data: unable to load QVariant!");
+    if (_stream.status() != QDataStream::Ok) {
+        close("Peer sent corrupt data, closing down!");
         return false;
     }
 
@@ -106,23 +126,38 @@ bool DataStreamPeer::readSocketData(QVariant &item)
 }
 
 
-void DataStreamPeer::writeSocketData(const QVariant &item)
+void DataStreamPeer::writeSocketData(const QVariantList &list)
 {
     if (!socket()->isOpen()) {
         qWarning() << Q_FUNC_INFO << "Can't write to a closed socket!";
         return;
     }
 
-    QByteArray block;
-    QDataStream out(&block, QIODevice::WriteOnly);
-    out.setVersion(QDataStream::Qt_4_2);
+    QByteArray data;
+    QDataStream msgStream(&data, QIODevice::WriteOnly);
+    msgStream.setVersion(QDataStream::Qt_4_2);
+    msgStream << list;
+
+    _stream << data;  // also writes the block size as part of the serialization format
+    if (_stream.status() != QDataStream::Ok)
+        close("Could not serialize data for peer!");
+}
+
 
-    out << item;
+void DataStreamPeer::writeSocketData(const QVariantMap &handshakeMsg)
+{
+    QVariantList list;
+    QVariantMap::const_iterator it = handshakeMsg.begin();
+    while (it != handshakeMsg.end()) {
+        list << it.key().toUtf8() << it.value();
+        ++it;
+    }
 
-    _stream << block;  // also writes the length as part of the serialization format
+    writeSocketData(list);
 }
 
 
+
 /*** Handshake messages ***/
 
 /* These messages are transmitted during handshake phase, which in case of the legacy protocol means they have
@@ -130,9 +165,11 @@ void DataStreamPeer::writeSocketData(const QVariant &item)
  * Also, the legacy handshake does not fully match the redesigned one, so we'll have to do various mappings here.
  */
 
-void DataStreamPeer::handleHandshakeMessage(const QVariant &msg)
+void DataStreamPeer::handleHandshakeMessage(const QVariantList &mapData)
 {
-    QVariantMap m = msg.toMap();
+    QVariantMap m;
+    for (int i = 0; i < mapData.count()/2; ++i)
+        m[QString::fromUtf8(mapData[2*i].toByteArray())] = mapData[2*i+1];
 
     QString msgType = m["MsgType"].toString();
     if (msgType.isEmpty()) {
@@ -303,9 +340,9 @@ void DataStreamPeer::dispatch(const SessionState &msg)
 
 /*** Standard messages ***/
 
-void DataStreamPeer::handlePackedFunc(const QVariant &packedFunc)
+void DataStreamPeer::handlePackedFunc(const QVariantList &packedFunc)
 {
-    QVariantList params(packedFunc.toList());
+    QVariantList params(packedFunc);
 
     if (params.isEmpty()) {
         qWarning() << Q_FUNC_INFO << "Received incompatible data:" << packedFunc;
@@ -313,7 +350,7 @@ void DataStreamPeer::handlePackedFunc(const QVariant &packedFunc)
     }
 
     // TODO: make sure that this is a valid request type
-    RequestType requestType = (RequestType)params.takeFirst().value<int>();
+    RequestType requestType = (RequestType)params.takeFirst().value<qint16>();
     switch (requestType) {
         case Sync: {
             if (params.count() < 3) {
@@ -417,5 +454,5 @@ void DataStreamPeer::dispatch(const Protocol::HeartBeatReply &msg)
 
 void DataStreamPeer::dispatchPackedFunc(const QVariantList &packedFunc)
 {
-    writeSocketData(QVariant(packedFunc));
+    writeSocketData(packedFunc);
 }
index 687f2ee..6f8a66c 100644 (file)
@@ -75,10 +75,12 @@ protected slots:
     void onSocketDataAvailable();
 
 private:
-    bool readSocketData(QVariant &item);
-    void writeSocketData(const QVariant &item);
-    void handleHandshakeMessage(const QVariant &msg);
-    void handlePackedFunc(const QVariant &packedFunc);
+    bool readSocketData(QByteArray &data);
+    void writeSocketData(const QVariantList &list);
+    void writeSocketData(const QVariantMap &handshakeMsg);
+
+    void handleHandshakeMessage(const QVariantList &mapData);
+    void handlePackedFunc(const QVariantList &packedFunc);
     void dispatchPackedFunc(const QVariantList &packedFunc);
 
     QDataStream _stream;