X-Git-Url: https://git.quassel-irc.org/?p=quassel.git;a=blobdiff_plain;f=src%2Fcommon%2Fremotepeer.cpp;h=4cdb4b3741abbcc9789d4348491d019911536222;hp=e33f56a9ebd3a69c7a4276b5446bf5b482f15331;hb=HEAD;hpb=9fc57dc2c000e80fb8bd746a090e2e8210e1278e diff --git a/src/common/remotepeer.cpp b/src/common/remotepeer.cpp index e33f56a9..36eb7788 100644 --- a/src/common/remotepeer.cpp +++ b/src/common/remotepeer.cpp @@ -1,5 +1,5 @@ /*************************************************************************** - * Copyright (C) 2005-2013 by the Quassel Project * + * Copyright (C) 2005-2022 by the Quassel Project * * devel@quassel-irc.org * * * * This program is free software; you can redistribute it and/or modify * @@ -18,63 +18,122 @@ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * ***************************************************************************/ +#include + +#include + #include +#include #include -#ifdef HAVE_SSL -# include -#endif - +#include "proxyline.h" #include "remotepeer.h" +#include "util.h" using namespace Protocol; -RemotePeer::RemotePeer(QTcpSocket *socket, QObject *parent) - : SignalProxy::AbstractPeer(parent), - _socket(socket), - _signalProxy(0), - _heartBeatTimer(new QTimer(this)), - _heartBeatCount(0), - _lag(0) +const quint32 maxMessageSize = 64 * 1024 + * 1024; // This is uncompressed size. 64 MB should be enough for any sort of initData or backlog chunk + +RemotePeer::RemotePeer(::AuthHandler* authHandler, QTcpSocket* socket, Compressor::CompressionLevel level, QObject* parent) + : Peer(authHandler, parent) + , _socket(socket) + , _compressor(new Compressor(socket, level, this)) + , _signalProxy(nullptr) + , _proxyLine({}) + , _useProxyLine(false) + , _heartBeatTimer(new QTimer(this)) + , _heartBeatCount(0) + , _lag(0) + , _msgSize(0) { socket->setParent(this); - connect(socket, SIGNAL(disconnected()), SIGNAL(disconnected())); - connect(socket, SIGNAL(error(QAbstractSocket::SocketError)), SIGNAL(error(QAbstractSocket::SocketError))); + connect(socket, &QAbstractSocket::stateChanged, this, &RemotePeer::onSocketStateChanged); + connect(socket, selectOverload(&QAbstractSocket::error), this, &RemotePeer::onSocketError); + connect(socket, &QAbstractSocket::disconnected, this, &Peer::disconnected); + + auto* sslSocket = qobject_cast(socket); + if (sslSocket) { + connect(sslSocket, &QSslSocket::encrypted, this, [this]() { emit secureStateChanged(true); }); + } -#ifdef HAVE_SSL - QSslSocket *sslSocket = qobject_cast(socket); - if (sslSocket) - connect(sslSocket, SIGNAL(encrypted()), SIGNAL(secureStateChanged())); -#endif + connect(_compressor, &Compressor::readyRead, this, &RemotePeer::onReadyRead); + connect(_compressor, &Compressor::error, this, &RemotePeer::onCompressionError); - connect(_heartBeatTimer, SIGNAL(timeout()), SLOT(sendHeartBeat())); + connect(_heartBeatTimer, &QTimer::timeout, this, &RemotePeer::sendHeartBeat); } +void RemotePeer::onSocketStateChanged(QAbstractSocket::SocketState state) +{ + if (state == QAbstractSocket::ClosingState) { + emit statusMessage(tr("Disconnecting...")); + } +} + +void RemotePeer::onSocketError(QAbstractSocket::SocketError error) +{ + emit socketError(error, socket()->errorString()); +} + +void RemotePeer::onCompressionError(Compressor::Error error) +{ + close(QString("Compression error %1").arg(error)); +} QString RemotePeer::description() const { - if (socket()) - return socket()->peerAddress().toString(); + return address(); +} + +QHostAddress RemotePeer::hostAddress() const +{ + if (_useProxyLine) { + return _proxyLine.sourceHost; + } + else if (socket()) { + return socket()->peerAddress(); + } - return QString(); + return {}; } +QString RemotePeer::address() const +{ + QHostAddress address = hostAddress(); + if (address.isNull()) { + return {}; + } + else { + return address.toString(); + } +} -::SignalProxy *RemotePeer::signalProxy() const +quint16 RemotePeer::port() const { - return _signalProxy; + if (_useProxyLine) { + return _proxyLine.sourcePort; + } + else if (socket()) { + return socket()->peerPort(); + } + + return 0; } +::SignalProxy* RemotePeer::signalProxy() const +{ + return _signalProxy; +} -void RemotePeer::setSignalProxy(::SignalProxy *proxy) +void RemotePeer::setSignalProxy(::SignalProxy* proxy) { if (proxy == _signalProxy) return; if (!proxy) { _heartBeatTimer->stop(); - disconnect(signalProxy(), 0, this, 0); - _signalProxy = 0; + disconnect(signalProxy(), nullptr, this, nullptr); + _signalProxy = nullptr; if (isOpen()) close(); } @@ -84,16 +143,15 @@ void RemotePeer::setSignalProxy(::SignalProxy *proxy) return; } _signalProxy = proxy; - connect(proxy, SIGNAL(heartBeatIntervalChanged(int)), SLOT(changeHeartBeatInterval(int))); + connect(proxy, &SignalProxy::heartBeatIntervalChanged, this, &RemotePeer::changeHeartBeatInterval); _heartBeatTimer->setInterval(proxy->heartBeatInterval() * 1000); _heartBeatTimer->start(); } } - void RemotePeer::changeHeartBeatInterval(int secs) { - if(secs <= 0) + if (secs <= 0) _heartBeatTimer->stop(); else { _heartBeatTimer->setInterval(secs * 1000); @@ -101,51 +159,40 @@ void RemotePeer::changeHeartBeatInterval(int secs) } } - int RemotePeer::lag() const { return _lag; } - -QTcpSocket *RemotePeer::socket() const +QTcpSocket* RemotePeer::socket() const { return _socket; } - bool RemotePeer::isSecure() const { if (socket()) { if (isLocal()) return true; -#ifdef HAVE_SSL - QSslSocket *sslSocket = qobject_cast(socket()); + auto* sslSocket = qobject_cast(socket()); if (sslSocket && sslSocket->isEncrypted()) return true; -#endif } return false; } - bool RemotePeer::isLocal() const { - if (socket()) { - if (socket()->peerAddress() == QHostAddress::LocalHost || socket()->peerAddress() == QHostAddress::LocalHostIPv6) - return true; - } - return false; + return hostAddress() == QHostAddress::LocalHost || + hostAddress() == QHostAddress::LocalHostIPv6; } - bool RemotePeer::isOpen() const { return socket() && socket()->state() == QTcpSocket::ConnectedState; } - -void RemotePeer::close(const QString &reason) +void RemotePeer::close(const QString& reason) { if (!reason.isEmpty()) { qWarning() << "Disconnecting:" << reason; @@ -156,29 +203,80 @@ void RemotePeer::close(const QString &reason) } } +void RemotePeer::onReadyRead() +{ + QByteArray msg; + while (readMessage(msg)) { + if (SignalProxy::current()) + SignalProxy::current()->setSourcePeer(this); + + processMessage(msg); + + if (SignalProxy::current()) + SignalProxy::current()->setSourcePeer(nullptr); + } +} + +bool RemotePeer::readMessage(QByteArray& msg) +{ + if (_msgSize == 0) { + if (_compressor->bytesAvailable() < 4) + return false; + _compressor->read((char*) &_msgSize, 4); + _msgSize = qFromBigEndian(_msgSize); + + if (_msgSize > maxMessageSize) { + close("Peer tried to send package larger than max package size!"); + return false; + } + + if (_msgSize == 0) { + close("Peer tried to send an empty message!"); + return false; + } + } + + if (_compressor->bytesAvailable() < _msgSize) { + emit transferProgress(socket()->bytesAvailable(), _msgSize); + return false; + } + + emit transferProgress(_msgSize, _msgSize); + + msg.resize(_msgSize); + qint64 bytesRead = _compressor->read(msg.data(), _msgSize); + if (bytesRead != _msgSize) { + close("Premature end of data stream!"); + return false; + } + + _msgSize = 0; + return true; +} -void RemotePeer::handle(const HeartBeat &heartBeat) +void RemotePeer::writeMessage(const QByteArray& msg) { - dispatch(HeartBeatReply(heartBeat.timestamp())); + auto size = qToBigEndian(msg.size()); + _compressor->write((const char*)&size, 4, Compressor::NoFlush); + _compressor->write(msg.constData(), msg.size()); } +void RemotePeer::handle(const HeartBeat& heartBeat) +{ + dispatch(HeartBeatReply(heartBeat.timestamp)); +} -void RemotePeer::handle(const HeartBeatReply &heartBeatReply) +void RemotePeer::handle(const HeartBeatReply& heartBeatReply) { _heartBeatCount = 0; -#if QT_VERSION < 0x040700 - emit lagUpdated(heartBeatReply.timestamp().time().msecsTo(QTime::currentTime()) / 2); -#else - emit lagUpdated(heartBeatReply.timestamp().msecsTo(QDateTime::currentDateTime()) / 2); -#endif + emit lagUpdated(heartBeatReply.timestamp.msecsTo(QDateTime::currentDateTime().toUTC()) / 2); } - void RemotePeer::sendHeartBeat() { if (signalProxy()->maxHeartBeatCount() > 0 && _heartBeatCount >= signalProxy()->maxHeartBeatCount()) { - qWarning() << "Disconnecting peer:" << description() - << "(didn't receive a heartbeat for over" << _heartBeatCount *_heartBeatTimer->interval() / 1000 << "seconds)"; + qWarning() << "Disconnecting peer:" << description() << "(didn't receive a heartbeat for over" + << _heartBeatCount * _heartBeatTimer->interval() / 1000 << "seconds)"; socket()->close(); _heartBeatTimer->stop(); return; @@ -189,8 +287,24 @@ void RemotePeer::sendHeartBeat() emit lagUpdated(_lag); } - dispatch(HeartBeat(QDateTime::currentDateTime())); + dispatch(HeartBeat(QDateTime::currentDateTime().toUTC())); ++_heartBeatCount; } +void RemotePeer::setProxyLine(ProxyLine proxyLine) +{ + _proxyLine = std::move(proxyLine); + if (socket()) { + if (_proxyLine.protocol != QAbstractSocket::UnknownNetworkLayerProtocol) { + QList subnets = Quassel::optionValue("proxy-cidr").split(","); + for (const QString& subnet : subnets) { + if (socket()->peerAddress().isInSubnet(QHostAddress::parseSubnet(subnet))) { + _useProxyLine = true; + return; + } + } + } + } + _useProxyLine = false; +}