Implement UI and serialization logic for sender modes
[quassel.git] / src / common / remotepeer.cpp
1 /***************************************************************************
2  *   Copyright (C) 2005-2016 by the Quassel Project                        *
3  *   devel@quassel-irc.org                                                 *
4  *                                                                         *
5  *   This program is free software; you can redistribute it and/or modify  *
6  *   it under the terms of the GNU General Public License as published by  *
7  *   the Free Software Foundation; either version 2 of the License, or     *
8  *   (at your option) version 3.                                           *
9  *                                                                         *
10  *   This program is distributed in the hope that it will be useful,       *
11  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
12  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the         *
13  *   GNU General Public License for more details.                          *
14  *                                                                         *
15  *   You should have received a copy of the GNU General Public License     *
16  *   along with this program; if not, write to the                         *
17  *   Free Software Foundation, Inc.,                                       *
18  *   51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.         *
19  ***************************************************************************/
20
21 #include <QtEndian>
22
23 #include <QHostAddress>
24 #include <QTimer>
25
26 #ifdef HAVE_SSL
27 #  include <QSslSocket>
28 #else
29 #  include <QTcpSocket>
30 #endif
31
32 #include "remotepeer.h"
33
34 using namespace Protocol;
35
36 const quint32 maxMessageSize = 64 * 1024 * 1024; // This is uncompressed size. 64 MB should be enough for any sort of initData or backlog chunk
37
38 RemotePeer::RemotePeer(::AuthHandler *authHandler, QTcpSocket *socket, Compressor::CompressionLevel level, QObject *parent)
39     : Peer(authHandler, parent),
40     _socket(socket),
41     _compressor(new Compressor(socket, level, this)),
42     _signalProxy(0),
43     _heartBeatTimer(new QTimer(this)),
44     _heartBeatCount(0),
45     _lag(0),
46     _msgSize(0)
47 {
48     socket->setParent(this);
49     connect(socket, SIGNAL(stateChanged(QAbstractSocket::SocketState)), SLOT(onSocketStateChanged(QAbstractSocket::SocketState)));
50     connect(socket, SIGNAL(error(QAbstractSocket::SocketError)), SLOT(onSocketError(QAbstractSocket::SocketError)));
51     connect(socket, SIGNAL(disconnected()), SIGNAL(disconnected()));
52
53 #ifdef HAVE_SSL
54     QSslSocket *sslSocket = qobject_cast<QSslSocket *>(socket);
55     if (sslSocket)
56         connect(sslSocket, SIGNAL(encrypted()), SIGNAL(secureStateChanged()));
57 #endif
58
59     connect(_compressor, SIGNAL(readyRead()), SLOT(onReadyRead()));
60     connect(_compressor, SIGNAL(error(Compressor::Error)), SLOT(onCompressionError(Compressor::Error)));
61
62     connect(_heartBeatTimer, SIGNAL(timeout()), SLOT(sendHeartBeat()));
63 }
64
65
66 void RemotePeer::onSocketStateChanged(QAbstractSocket::SocketState state)
67 {
68     if (state == QAbstractSocket::ClosingState) {
69         emit statusMessage(tr("Disconnecting..."));
70     }
71 }
72
73
74 void RemotePeer::onSocketError(QAbstractSocket::SocketError error)
75 {
76     emit socketError(error, socket()->errorString());
77 }
78
79
80 void RemotePeer::onCompressionError(Compressor::Error error)
81 {
82     close(QString("Compression error %1").arg(error));
83 }
84
85
86 QString RemotePeer::description() const
87 {
88     if (socket())
89         return socket()->peerAddress().toString();
90
91     return QString();
92 }
93
94 QString RemotePeer::address() const
95 {
96     if (socket())
97         return socket()->peerAddress().toString();
98
99     return QString();
100 }
101
102 quint16 RemotePeer::port() const
103 {
104     if (socket())
105         return socket()->peerPort();
106
107     return 0;
108 }
109
110
111 ::SignalProxy *RemotePeer::signalProxy() const
112 {
113     return _signalProxy;
114 }
115
116
117 void RemotePeer::setSignalProxy(::SignalProxy *proxy)
118 {
119     if (proxy == _signalProxy)
120         return;
121
122     if (!proxy) {
123         _heartBeatTimer->stop();
124         disconnect(signalProxy(), 0, this, 0);
125         _signalProxy = 0;
126         if (isOpen())
127             close();
128     }
129     else {
130         if (signalProxy()) {
131             qWarning() << Q_FUNC_INFO << "Setting another SignalProxy not supported, ignoring!";
132             return;
133         }
134         _signalProxy = proxy;
135         connect(proxy, SIGNAL(heartBeatIntervalChanged(int)), SLOT(changeHeartBeatInterval(int)));
136         _heartBeatTimer->setInterval(proxy->heartBeatInterval() * 1000);
137         _heartBeatTimer->start();
138     }
139 }
140
141
142 void RemotePeer::changeHeartBeatInterval(int secs)
143 {
144     if(secs <= 0)
145         _heartBeatTimer->stop();
146     else {
147         _heartBeatTimer->setInterval(secs * 1000);
148         _heartBeatTimer->start();
149     }
150 }
151
152
153 int RemotePeer::lag() const
154 {
155     return _lag;
156 }
157
158
159 QTcpSocket *RemotePeer::socket() const
160 {
161     return _socket;
162 }
163
164
165 bool RemotePeer::isSecure() const
166 {
167     if (socket()) {
168         if (isLocal())
169             return true;
170 #ifdef HAVE_SSL
171         QSslSocket *sslSocket = qobject_cast<QSslSocket *>(socket());
172         if (sslSocket && sslSocket->isEncrypted())
173             return true;
174 #endif
175     }
176     return false;
177 }
178
179
180 bool RemotePeer::isLocal() const
181 {
182     if (socket()) {
183         if (socket()->peerAddress() == QHostAddress::LocalHost || socket()->peerAddress() == QHostAddress::LocalHostIPv6)
184             return true;
185     }
186     return false;
187 }
188
189
190 bool RemotePeer::isOpen() const
191 {
192     return socket() && socket()->state() == QTcpSocket::ConnectedState;
193 }
194
195
196 void RemotePeer::close(const QString &reason)
197 {
198     if (!reason.isEmpty()) {
199         qWarning() << "Disconnecting:" << reason;
200     }
201
202     if (socket() && socket()->state() != QTcpSocket::UnconnectedState) {
203         socket()->disconnectFromHost();
204     }
205 }
206
207
208 void RemotePeer::onReadyRead()
209 {
210     QByteArray msg;
211     while (readMessage(msg)) {
212         if (SignalProxy::current())
213             SignalProxy::current()->setSourcePeer(this);
214
215         processMessage(msg);
216
217
218         if (SignalProxy::current())
219             SignalProxy::current()->setSourcePeer(nullptr);
220     }
221 }
222
223
224 bool RemotePeer::readMessage(QByteArray &msg)
225 {
226     if (_msgSize == 0) {
227         if (_compressor->bytesAvailable() < 4)
228             return false;
229         _compressor->read((char*)&_msgSize, 4);
230         _msgSize = qFromBigEndian<quint32>(_msgSize);
231
232         if (_msgSize > maxMessageSize) {
233             close("Peer tried to send package larger than max package size!");
234             return false;
235         }
236
237         if (_msgSize == 0) {
238             close("Peer tried to send an empty message!");
239             return false;
240         }
241     }
242
243     if (_compressor->bytesAvailable() < _msgSize) {
244         emit transferProgress(socket()->bytesAvailable(), _msgSize);
245         return false;
246     }
247
248     emit transferProgress(_msgSize, _msgSize);
249
250     msg.resize(_msgSize);
251     qint64 bytesRead = _compressor->read(msg.data(), _msgSize);
252     if (bytesRead != _msgSize) {
253         close("Premature end of data stream!");
254         return false;
255     }
256
257     _msgSize = 0;
258     return true;
259 }
260
261
262 void RemotePeer::writeMessage(const QByteArray &msg)
263 {
264     quint32 size = qToBigEndian<quint32>(msg.size());
265     _compressor->write((const char*)&size, 4, Compressor::NoFlush);
266     _compressor->write(msg.constData(), msg.size());
267 }
268
269
270 void RemotePeer::handle(const HeartBeat &heartBeat)
271 {
272     dispatch(HeartBeatReply(heartBeat.timestamp));
273 }
274
275
276 void RemotePeer::handle(const HeartBeatReply &heartBeatReply)
277 {
278     _heartBeatCount = 0;
279     emit lagUpdated(heartBeatReply.timestamp.msecsTo(QDateTime::currentDateTime().toUTC()) / 2);
280 }
281
282
283 void RemotePeer::sendHeartBeat()
284 {
285     if (signalProxy()->maxHeartBeatCount() > 0 && _heartBeatCount >= signalProxy()->maxHeartBeatCount()) {
286         qWarning() << "Disconnecting peer:" << description()
287                    << "(didn't receive a heartbeat for over" << _heartBeatCount *_heartBeatTimer->interval() / 1000 << "seconds)";
288         socket()->close();
289         _heartBeatTimer->stop();
290         return;
291     }
292
293     if (_heartBeatCount > 0) {
294         _lag = _heartBeatCount * _heartBeatTimer->interval();
295         emit lagUpdated(_lag);
296     }
297
298     dispatch(HeartBeat(QDateTime::currentDateTime().toUTC()));
299     ++_heartBeatCount;
300 }