X-Git-Url: https://git.quassel-irc.org/?a=blobdiff_plain;ds=sidebyside;f=src%2Fcommon%2Fremotepeer.cpp;h=d621632fb736c8cf6f6e958edd5d3366dd5c64d8;hb=e2188dc438be6f3eb0d9cdf47d28821aefe9835e;hp=90da083e99b985ded05adf98c420c654986dfd8f;hpb=b2169e5f4cbd3ce724c4808b62ddc2b8941219a5;p=quassel.git diff --git a/src/common/remotepeer.cpp b/src/common/remotepeer.cpp index 90da083e..d621632f 100644 --- a/src/common/remotepeer.cpp +++ b/src/common/remotepeer.cpp @@ -1,5 +1,5 @@ /*************************************************************************** - * Copyright (C) 2005-2014 by the Quassel Project * + * Copyright (C) 2005-2018 by the Quassel Project * * devel@quassel-irc.org * * * * This program is free software; you can redistribute it and/or modify * @@ -35,33 +35,31 @@ using namespace Protocol; const quint32 maxMessageSize = 64 * 1024 * 1024; // This is uncompressed size. 64 MB should be enough for any sort of initData or backlog chunk -RemotePeer::RemotePeer(::AuthHandler *authHandler, QTcpSocket *socket, QObject *parent) +RemotePeer::RemotePeer(::AuthHandler *authHandler, QTcpSocket *socket, Compressor::CompressionLevel level, QObject *parent) : Peer(authHandler, parent), _socket(socket), - _signalProxy(0), + _compressor(new Compressor(socket, level, this)), + _signalProxy(nullptr), _heartBeatTimer(new QTimer(this)), _heartBeatCount(0), _lag(0), _msgSize(0) { socket->setParent(this); - connect(socket, SIGNAL(readyRead()), SLOT(onReadyRead())); connect(socket, SIGNAL(stateChanged(QAbstractSocket::SocketState)), SLOT(onSocketStateChanged(QAbstractSocket::SocketState))); connect(socket, SIGNAL(error(QAbstractSocket::SocketError)), SLOT(onSocketError(QAbstractSocket::SocketError))); connect(socket, SIGNAL(disconnected()), SIGNAL(disconnected())); #ifdef HAVE_SSL - QSslSocket *sslSocket = qobject_cast(socket); + auto *sslSocket = qobject_cast(socket); if (sslSocket) connect(sslSocket, SIGNAL(encrypted()), SIGNAL(secureStateChanged())); #endif - connect(_heartBeatTimer, SIGNAL(timeout()), SLOT(sendHeartBeat())); + connect(_compressor, SIGNAL(readyRead()), SLOT(onReadyRead())); + connect(_compressor, SIGNAL(error(Compressor::Error)), SLOT(onCompressionError(Compressor::Error))); - // It's possible that more data has already arrived during the handshake, so readyRead() wouldn't be triggered. - // However, we can't call a virtual function from the ctor, so let's do it asynchronously. - if (socket->bytesAvailable()) - QTimer::singleShot(0, this, SLOT(onReadyRead())); + connect(_heartBeatTimer, SIGNAL(timeout()), SLOT(sendHeartBeat())); } @@ -79,6 +77,12 @@ void RemotePeer::onSocketError(QAbstractSocket::SocketError error) } +void RemotePeer::onCompressionError(Compressor::Error error) +{ + close(QString("Compression error %1").arg(error)); +} + + QString RemotePeer::description() const { if (socket()) @@ -87,6 +91,22 @@ QString RemotePeer::description() const return QString(); } +QString RemotePeer::address() const +{ + if (socket()) + return socket()->peerAddress().toString(); + + return QString(); +} + +quint16 RemotePeer::port() const +{ + if (socket()) + return socket()->peerPort(); + + return 0; +} + ::SignalProxy *RemotePeer::signalProxy() const { @@ -101,8 +121,8 @@ void RemotePeer::setSignalProxy(::SignalProxy *proxy) if (!proxy) { _heartBeatTimer->stop(); - disconnect(signalProxy(), 0, this, 0); - _signalProxy = 0; + disconnect(signalProxy(), nullptr, this, nullptr); + _signalProxy = nullptr; if (isOpen()) close(); } @@ -148,7 +168,7 @@ bool RemotePeer::isSecure() const if (isLocal()) return true; #ifdef HAVE_SSL - QSslSocket *sslSocket = qobject_cast(socket()); + auto *sslSocket = qobject_cast(socket()); if (sslSocket && sslSocket->isEncrypted()) return true; #endif @@ -187,22 +207,25 @@ void RemotePeer::close(const QString &reason) void RemotePeer::onReadyRead() { - // don't try to read more data if we're already closing - if (socket()->state() != QAbstractSocket::ConnectedState) - return; - QByteArray msg; - while (readMessage(msg)) + while (readMessage(msg)) { + if (SignalProxy::current()) + SignalProxy::current()->setSourcePeer(this); + processMessage(msg); + + if (SignalProxy::current()) + SignalProxy::current()->setSourcePeer(nullptr); + } } bool RemotePeer::readMessage(QByteArray &msg) { if (_msgSize == 0) { - if (socket()->bytesAvailable() < 4) + if (_compressor->bytesAvailable() < 4) return false; - socket()->read((char*)&_msgSize, 4); + _compressor->read((char*)&_msgSize, 4); _msgSize = qFromBigEndian(_msgSize); if (_msgSize > maxMessageSize) { @@ -216,7 +239,7 @@ bool RemotePeer::readMessage(QByteArray &msg) } } - if (socket()->bytesAvailable() < _msgSize) { + if (_compressor->bytesAvailable() < _msgSize) { emit transferProgress(socket()->bytesAvailable(), _msgSize); return false; } @@ -224,7 +247,7 @@ bool RemotePeer::readMessage(QByteArray &msg) emit transferProgress(_msgSize, _msgSize); msg.resize(_msgSize); - qint64 bytesRead = socket()->read(msg.data(), _msgSize); + qint64 bytesRead = _compressor->read(msg.data(), _msgSize); if (bytesRead != _msgSize) { close("Premature end of data stream!"); return false; @@ -237,9 +260,9 @@ bool RemotePeer::readMessage(QByteArray &msg) void RemotePeer::writeMessage(const QByteArray &msg) { - quint32 size = qToBigEndian(msg.size()); - socket()->write((const char*)&size, 4); - socket()->write(msg.constData(), msg.size()); + auto size = qToBigEndian(msg.size()); + _compressor->write((const char*)&size, 4, Compressor::NoFlush); + _compressor->write(msg.constData(), msg.size()); } @@ -252,11 +275,7 @@ void RemotePeer::handle(const HeartBeat &heartBeat) void RemotePeer::handle(const HeartBeatReply &heartBeatReply) { _heartBeatCount = 0; -#if QT_VERSION >= 0x040700 emit lagUpdated(heartBeatReply.timestamp.msecsTo(QDateTime::currentDateTime().toUTC()) / 2); -#else - emit lagUpdated(heartBeatReply.timestamp.time().msecsTo(QDateTime::currentDateTime().toUTC().time()) / 2); -#endif }