X-Git-Url: https://git.quassel-irc.org/?p=quassel.git;a=blobdiff_plain;f=src%2Fcommon%2Fremotepeer.cpp;h=e8fa578d90f6bb0b001acef2954f4bee8288f408;hp=e33f56a9ebd3a69c7a4276b5446bf5b482f15331;hb=4ff76cab24698482ce759cb40a903b9bc26c8fae;hpb=bae8c28f27159ec803daff083da125b313c128d1 diff --git a/src/common/remotepeer.cpp b/src/common/remotepeer.cpp index e33f56a9..e8fa578d 100644 --- a/src/common/remotepeer.cpp +++ b/src/common/remotepeer.cpp @@ -1,5 +1,5 @@ /*************************************************************************** - * Copyright (C) 2005-2013 by the Quassel Project * + * Copyright (C) 2005-2015 by the Quassel Project * * devel@quassel-irc.org * * * * This program is free software; you can redistribute it and/or modify * @@ -18,28 +18,37 @@ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * ***************************************************************************/ +#include + #include #include #ifdef HAVE_SSL # include +#else +# include #endif #include "remotepeer.h" using namespace Protocol; -RemotePeer::RemotePeer(QTcpSocket *socket, QObject *parent) - : SignalProxy::AbstractPeer(parent), +const quint32 maxMessageSize = 64 * 1024 * 1024; // This is uncompressed size. 64 MB should be enough for any sort of initData or backlog chunk + +RemotePeer::RemotePeer(::AuthHandler *authHandler, QTcpSocket *socket, Compressor::CompressionLevel level, QObject *parent) + : Peer(authHandler, parent), _socket(socket), + _compressor(new Compressor(socket, level, this)), _signalProxy(0), _heartBeatTimer(new QTimer(this)), _heartBeatCount(0), - _lag(0) + _lag(0), + _msgSize(0) { socket->setParent(this); + connect(socket, SIGNAL(stateChanged(QAbstractSocket::SocketState)), SLOT(onSocketStateChanged(QAbstractSocket::SocketState))); + connect(socket, SIGNAL(error(QAbstractSocket::SocketError)), SLOT(onSocketError(QAbstractSocket::SocketError))); connect(socket, SIGNAL(disconnected()), SIGNAL(disconnected())); - connect(socket, SIGNAL(error(QAbstractSocket::SocketError)), SIGNAL(error(QAbstractSocket::SocketError))); #ifdef HAVE_SSL QSslSocket *sslSocket = qobject_cast(socket); @@ -47,10 +56,33 @@ RemotePeer::RemotePeer(QTcpSocket *socket, QObject *parent) connect(sslSocket, SIGNAL(encrypted()), SIGNAL(secureStateChanged())); #endif + connect(_compressor, SIGNAL(readyRead()), SLOT(onReadyRead())); + connect(_compressor, SIGNAL(error(Compressor::Error)), SLOT(onCompressionError(Compressor::Error))); + connect(_heartBeatTimer, SIGNAL(timeout()), SLOT(sendHeartBeat())); } +void RemotePeer::onSocketStateChanged(QAbstractSocket::SocketState state) +{ + if (state == QAbstractSocket::ClosingState) { + emit statusMessage(tr("Disconnecting...")); + } +} + + +void RemotePeer::onSocketError(QAbstractSocket::SocketError error) +{ + emit socketError(error, socket()->errorString()); +} + + +void RemotePeer::onCompressionError(Compressor::Error error) +{ + close(QString("Compression error %1").arg(error)); +} + + QString RemotePeer::description() const { if (socket()) @@ -157,20 +189,70 @@ void RemotePeer::close(const QString &reason) } +void RemotePeer::onReadyRead() +{ + QByteArray msg; + while (readMessage(msg)) + processMessage(msg); +} + + +bool RemotePeer::readMessage(QByteArray &msg) +{ + if (_msgSize == 0) { + if (_compressor->bytesAvailable() < 4) + return false; + _compressor->read((char*)&_msgSize, 4); + _msgSize = qFromBigEndian(_msgSize); + + if (_msgSize > maxMessageSize) { + close("Peer tried to send package larger than max package size!"); + return false; + } + + if (_msgSize == 0) { + close("Peer tried to send an empty message!"); + return false; + } + } + + if (_compressor->bytesAvailable() < _msgSize) { + emit transferProgress(socket()->bytesAvailable(), _msgSize); + return false; + } + + emit transferProgress(_msgSize, _msgSize); + + msg.resize(_msgSize); + qint64 bytesRead = _compressor->read(msg.data(), _msgSize); + if (bytesRead != _msgSize) { + close("Premature end of data stream!"); + return false; + } + + _msgSize = 0; + return true; +} + + +void RemotePeer::writeMessage(const QByteArray &msg) +{ + quint32 size = qToBigEndian(msg.size()); + _compressor->write((const char*)&size, 4, Compressor::NoFlush); + _compressor->write(msg.constData(), msg.size()); +} + + void RemotePeer::handle(const HeartBeat &heartBeat) { - dispatch(HeartBeatReply(heartBeat.timestamp())); + dispatch(HeartBeatReply(heartBeat.timestamp)); } void RemotePeer::handle(const HeartBeatReply &heartBeatReply) { _heartBeatCount = 0; -#if QT_VERSION < 0x040700 - emit lagUpdated(heartBeatReply.timestamp().time().msecsTo(QTime::currentTime()) / 2); -#else - emit lagUpdated(heartBeatReply.timestamp().msecsTo(QDateTime::currentDateTime()) / 2); -#endif + emit lagUpdated(heartBeatReply.timestamp.msecsTo(QDateTime::currentDateTime().toUTC()) / 2); } @@ -189,8 +271,6 @@ void RemotePeer::sendHeartBeat() emit lagUpdated(_lag); } - dispatch(HeartBeat(QDateTime::currentDateTime())); + dispatch(HeartBeat(QDateTime::currentDateTime().toUTC())); ++_heartBeatCount; } - -