X-Git-Url: https://git.quassel-irc.org/?p=quassel.git;a=blobdiff_plain;f=src%2Fcommon%2Fremotepeer.cpp;h=4cdb4b3741abbcc9789d4348491d019911536222;hp=e8b682d7e060020f5fa5870d24ed3b974ac90ccf;hb=HEAD;hpb=bb584446aa5e60086ec3a7d14069681f3cfb17fa diff --git a/src/common/remotepeer.cpp b/src/common/remotepeer.cpp index e8b682d7..36eb7788 100644 --- a/src/common/remotepeer.cpp +++ b/src/common/remotepeer.cpp @@ -1,5 +1,5 @@ /*************************************************************************** - * Copyright (C) 2005-2014 by the Quassel Project * + * Copyright (C) 2005-2022 by the Quassel Project * * devel@quassel-irc.org * * * * This program is free software; you can redistribute it and/or modify * @@ -18,48 +18,51 @@ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * ***************************************************************************/ +#include + +#include + #include +#include #include -#ifdef HAVE_SSL -# include -#else -# include -#endif - +#include "proxyline.h" #include "remotepeer.h" +#include "util.h" using namespace Protocol; -RemotePeer::RemotePeer(::AuthHandler *authHandler, QTcpSocket *socket, QObject *parent) - : Peer(authHandler, parent), - _socket(socket), - _signalProxy(0), - _heartBeatTimer(new QTimer(this)), - _heartBeatCount(0), - _lag(0) +const quint32 maxMessageSize = 64 * 1024 + * 1024; // This is uncompressed size. 64 MB should be enough for any sort of initData or backlog chunk + +RemotePeer::RemotePeer(::AuthHandler* authHandler, QTcpSocket* socket, Compressor::CompressionLevel level, QObject* parent) + : Peer(authHandler, parent) + , _socket(socket) + , _compressor(new Compressor(socket, level, this)) + , _signalProxy(nullptr) + , _proxyLine({}) + , _useProxyLine(false) + , _heartBeatTimer(new QTimer(this)) + , _heartBeatCount(0) + , _lag(0) + , _msgSize(0) { socket->setParent(this); - connect(socket, SIGNAL(readyRead()), SLOT(onSocketDataAvailable())); - connect(socket, SIGNAL(stateChanged(QAbstractSocket::SocketState)), SLOT(onSocketStateChanged(QAbstractSocket::SocketState))); - connect(socket, SIGNAL(error(QAbstractSocket::SocketError)), SLOT(onSocketError(QAbstractSocket::SocketError))); - connect(socket, SIGNAL(disconnected()), SIGNAL(disconnected())); + connect(socket, &QAbstractSocket::stateChanged, this, &RemotePeer::onSocketStateChanged); + connect(socket, selectOverload(&QAbstractSocket::error), this, &RemotePeer::onSocketError); + connect(socket, &QAbstractSocket::disconnected, this, &Peer::disconnected); -#ifdef HAVE_SSL - QSslSocket *sslSocket = qobject_cast(socket); - if (sslSocket) - connect(sslSocket, SIGNAL(encrypted()), SIGNAL(secureStateChanged())); -#endif + auto* sslSocket = qobject_cast(socket); + if (sslSocket) { + connect(sslSocket, &QSslSocket::encrypted, this, [this]() { emit secureStateChanged(true); }); + } - connect(_heartBeatTimer, SIGNAL(timeout()), SLOT(sendHeartBeat())); + connect(_compressor, &Compressor::readyRead, this, &RemotePeer::onReadyRead); + connect(_compressor, &Compressor::error, this, &RemotePeer::onCompressionError); - // It's possible that more data has already arrived during the handshake, so readyRead() wouldn't be triggered. - // However, we can't call a virtual function from the ctor, so let's do it asynchronously. - if (socket->bytesAvailable()) - QTimer::singleShot(0, this, SLOT(onSocketDataAvailable())); + connect(_heartBeatTimer, &QTimer::timeout, this, &RemotePeer::sendHeartBeat); } - void RemotePeer::onSocketStateChanged(QAbstractSocket::SocketState state) { if (state == QAbstractSocket::ClosingState) { @@ -67,37 +70,70 @@ void RemotePeer::onSocketStateChanged(QAbstractSocket::SocketState state) } } - void RemotePeer::onSocketError(QAbstractSocket::SocketError error) { emit socketError(error, socket()->errorString()); } +void RemotePeer::onCompressionError(Compressor::Error error) +{ + close(QString("Compression error %1").arg(error)); +} QString RemotePeer::description() const { - if (socket()) - return socket()->peerAddress().toString(); + return address(); +} - return QString(); +QHostAddress RemotePeer::hostAddress() const +{ + if (_useProxyLine) { + return _proxyLine.sourceHost; + } + else if (socket()) { + return socket()->peerAddress(); + } + + return {}; } +QString RemotePeer::address() const +{ + QHostAddress address = hostAddress(); + if (address.isNull()) { + return {}; + } + else { + return address.toString(); + } +} -::SignalProxy *RemotePeer::signalProxy() const +quint16 RemotePeer::port() const { - return _signalProxy; + if (_useProxyLine) { + return _proxyLine.sourcePort; + } + else if (socket()) { + return socket()->peerPort(); + } + + return 0; } +::SignalProxy* RemotePeer::signalProxy() const +{ + return _signalProxy; +} -void RemotePeer::setSignalProxy(::SignalProxy *proxy) +void RemotePeer::setSignalProxy(::SignalProxy* proxy) { if (proxy == _signalProxy) return; if (!proxy) { _heartBeatTimer->stop(); - disconnect(signalProxy(), 0, this, 0); - _signalProxy = 0; + disconnect(signalProxy(), nullptr, this, nullptr); + _signalProxy = nullptr; if (isOpen()) close(); } @@ -107,16 +143,15 @@ void RemotePeer::setSignalProxy(::SignalProxy *proxy) return; } _signalProxy = proxy; - connect(proxy, SIGNAL(heartBeatIntervalChanged(int)), SLOT(changeHeartBeatInterval(int))); + connect(proxy, &SignalProxy::heartBeatIntervalChanged, this, &RemotePeer::changeHeartBeatInterval); _heartBeatTimer->setInterval(proxy->heartBeatInterval() * 1000); _heartBeatTimer->start(); } } - void RemotePeer::changeHeartBeatInterval(int secs) { - if(secs <= 0) + if (secs <= 0) _heartBeatTimer->stop(); else { _heartBeatTimer->setInterval(secs * 1000); @@ -124,51 +159,40 @@ void RemotePeer::changeHeartBeatInterval(int secs) } } - int RemotePeer::lag() const { return _lag; } - -QTcpSocket *RemotePeer::socket() const +QTcpSocket* RemotePeer::socket() const { return _socket; } - bool RemotePeer::isSecure() const { if (socket()) { if (isLocal()) return true; -#ifdef HAVE_SSL - QSslSocket *sslSocket = qobject_cast(socket()); + auto* sslSocket = qobject_cast(socket()); if (sslSocket && sslSocket->isEncrypted()) return true; -#endif } return false; } - bool RemotePeer::isLocal() const { - if (socket()) { - if (socket()->peerAddress() == QHostAddress::LocalHost || socket()->peerAddress() == QHostAddress::LocalHostIPv6) - return true; - } - return false; + return hostAddress() == QHostAddress::LocalHost || + hostAddress() == QHostAddress::LocalHostIPv6; } - bool RemotePeer::isOpen() const { return socket() && socket()->state() == QTcpSocket::ConnectedState; } - -void RemotePeer::close(const QString &reason) +void RemotePeer::close(const QString& reason) { if (!reason.isEmpty()) { qWarning() << "Disconnecting:" << reason; @@ -179,29 +203,80 @@ void RemotePeer::close(const QString &reason) } } +void RemotePeer::onReadyRead() +{ + QByteArray msg; + while (readMessage(msg)) { + if (SignalProxy::current()) + SignalProxy::current()->setSourcePeer(this); + + processMessage(msg); -void RemotePeer::handle(const HeartBeat &heartBeat) + if (SignalProxy::current()) + SignalProxy::current()->setSourcePeer(nullptr); + } +} + +bool RemotePeer::readMessage(QByteArray& msg) { - dispatch(HeartBeatReply(heartBeat.timestamp)); + if (_msgSize == 0) { + if (_compressor->bytesAvailable() < 4) + return false; + _compressor->read((char*) &_msgSize, 4); + _msgSize = qFromBigEndian(_msgSize); + + if (_msgSize > maxMessageSize) { + close("Peer tried to send package larger than max package size!"); + return false; + } + + if (_msgSize == 0) { + close("Peer tried to send an empty message!"); + return false; + } + } + + if (_compressor->bytesAvailable() < _msgSize) { + emit transferProgress(socket()->bytesAvailable(), _msgSize); + return false; + } + + emit transferProgress(_msgSize, _msgSize); + + msg.resize(_msgSize); + qint64 bytesRead = _compressor->read(msg.data(), _msgSize); + if (bytesRead != _msgSize) { + close("Premature end of data stream!"); + return false; + } + + _msgSize = 0; + return true; +} + +void RemotePeer::writeMessage(const QByteArray& msg) +{ + auto size = qToBigEndian(msg.size()); + _compressor->write((const char*)&size, 4, Compressor::NoFlush); + _compressor->write(msg.constData(), msg.size()); } +void RemotePeer::handle(const HeartBeat& heartBeat) +{ + dispatch(HeartBeatReply(heartBeat.timestamp)); +} -void RemotePeer::handle(const HeartBeatReply &heartBeatReply) +void RemotePeer::handle(const HeartBeatReply& heartBeatReply) { _heartBeatCount = 0; -#if QT_VERSION >= 0x040700 emit lagUpdated(heartBeatReply.timestamp.msecsTo(QDateTime::currentDateTime().toUTC()) / 2); -#else - emit lagUpdated(heartBeatReply.timestamp.time().msecsTo(QDateTime::currentDateTime().toUTC().time()) / 2); -#endif } - void RemotePeer::sendHeartBeat() { if (signalProxy()->maxHeartBeatCount() > 0 && _heartBeatCount >= signalProxy()->maxHeartBeatCount()) { - qWarning() << "Disconnecting peer:" << description() - << "(didn't receive a heartbeat for over" << _heartBeatCount *_heartBeatTimer->interval() / 1000 << "seconds)"; + qWarning() << "Disconnecting peer:" << description() << "(didn't receive a heartbeat for over" + << _heartBeatCount * _heartBeatTimer->interval() / 1000 << "seconds)"; socket()->close(); _heartBeatTimer->stop(); return; @@ -215,3 +290,21 @@ void RemotePeer::sendHeartBeat() dispatch(HeartBeat(QDateTime::currentDateTime().toUTC())); ++_heartBeatCount; } + +void RemotePeer::setProxyLine(ProxyLine proxyLine) +{ + _proxyLine = std::move(proxyLine); + + if (socket()) { + if (_proxyLine.protocol != QAbstractSocket::UnknownNetworkLayerProtocol) { + QList subnets = Quassel::optionValue("proxy-cidr").split(","); + for (const QString& subnet : subnets) { + if (socket()->peerAddress().isInSubnet(QHostAddress::parseSubnet(subnet))) { + _useProxyLine = true; + return; + } + } + } + } + _useProxyLine = false; +}