#include <QMetaProperty>
#include <QRegExp>
#include <QThread>
+#include <QTime>
#include "syncableobject.h"
#include "util.h"
: QObject(parent)
{
setProxyMode(Client);
+ init();
}
SignalProxy::SignalProxy(ProxyMode mode, QObject* parent)
: QObject(parent)
{
setProxyMode(mode);
+ init();
}
SignalProxy::SignalProxy(ProxyMode mode, QIODevice* device, QObject* parent)
{
setProxyMode(mode);
addPeer(device);
+ init();
}
SignalProxy::~SignalProxy() {
initClient();
}
-SignalProxy::ProxyMode SignalProxy::proxyMode() const {
- return _proxyMode;
+void SignalProxy::init() {
+ connect(&_heartBeatTimer, SIGNAL(timeout()), this, SLOT(sendHeartBeat()));
+ _heartBeatTimer.start(30 * 1000);
}
void SignalProxy::initServer() {
- disconnect(&_heartBeatTimer, 0, this, 0);
- _heartBeatTimer.stop();
}
void SignalProxy::initClient() {
attachSlot("__objectRenamed__", this, SLOT(objectRenamed(QByteArray, QString, QString)));
- connect(&_heartBeatTimer, SIGNAL(timeout()),
- this, SLOT(sendHeartBeat()));
- _heartBeatTimer.start(60 * 1000); // msecs: one beep per minute
}
bool SignalProxy::addPeer(QIODevice* iodev) {
}
}
-// void SignalProxy::setInitialized(SyncableObject *obj) {
-// obj->setInitialized();
-// emit objectInitialized(obj);
-// }
-
-// bool SignalProxy::isInitialized(SyncableObject *obj) const {
-// return obj->isInitialized();
-// }
-
void SignalProxy::requestInit(SyncableObject *obj) {
if(proxyMode() == Server || obj->isInitialized())
return;
switch(callType) {
case RpcCall:
- if(params.empty()) {
+ if(params.empty())
qWarning() << "SignalProxy::receivePeerSignal(): received empty RPC-Call";
- return;
- } else {
- return handleSignal(params.takeFirst().toByteArray(), params);
- }
+ else
+ handleSignal(params.takeFirst().toByteArray(), params);
+ break;
+
case Sync:
- return handleSync(sender, params);
+ handleSync(sender, params);
+ break;
+
case InitRequest:
- return handleInitRequest(sender, params);
+ handleInitRequest(sender, params);
+ break;
+
case InitData:
- return handleInitData(sender, params);
+ handleInitData(sender, params);
+ break;
+
case HeartBeat:
- return;
+ receiveHeartBeat(sender, params);
+ break;
+
+ case HeartBeatReply:
+ receiveHeartBeatReply(sender, params);
+ break;
+
default:
qWarning() << "SignalProxy::receivePeerSignal(): received undefined CallType" << callType << params;
- return;
}
}
if(blockSize > 1 << 22) {
qWarning() << qPrintable(tr("Client tried to send package larger than max package size!"));
- QAbstractSocket* sock = qobject_cast<QAbstractSocket*>(dev);
+ QAbstractSocket *sock = qobject_cast<QAbstractSocket *>(dev);
qWarning() << qPrintable(tr("Disconnecting")) << (sock ? qPrintable(sock->peerAddress().toString()) : qPrintable(tr("local client")));
dev->close();
return false;
}
void SignalProxy::sendHeartBeat() {
- dispatchSignal(SignalProxy::HeartBeat, QVariantList());
+ dispatchSignal(SignalProxy::HeartBeat, QVariantList() << QTime::currentTime());
+ QHash<QIODevice *, peerInfo>::iterator peerIter = _peers.begin();
+ QHash<QIODevice *, peerInfo>::iterator peerIterEnd = _peers.end();
+ while(peerIter != peerIterEnd) {
+ if(peerIter->sentHeartBeats > 0) {
+ updateLag(peerIter.key(), _heartBeatTimer.interval());
+ }
+ if(peerIter->sentHeartBeats > 1) {
+ QAbstractSocket *socket = qobject_cast<QAbstractSocket *>(peerIter.key());
+ qWarning() << "SignalProxy: Disconnecting peer:"
+ << (socket ? qPrintable(socket->peerAddress().toString()) : "local client")
+ << "(didn't receive a heartbeat for over" << peerIter->sentHeartBeats * _heartBeatTimer.interval() / 1000 << "seconds)";
+ peerIter.key()->close();
+ } else {
+ peerIter->sentHeartBeats++;
+ }
+ peerIter++;
+ }
}
+void SignalProxy::receiveHeartBeat(QIODevice *dev, const QVariantList ¶ms) {
+ if(!_peers.contains(dev)) {
+ qWarning() << "SignalProxy: received heart beat from unknown Device:" << dev;
+ }
+ dispatchSignal(dev, SignalProxy::HeartBeatReply, params);
+}
+
+void SignalProxy::receiveHeartBeatReply(QIODevice *dev, const QVariantList ¶ms) {
+ if(!_peers.contains(dev)) {
+ qWarning() << "SignalProxy: received heart beat reply from unknown Device:" << dev;
+ return;
+ }
+
+ _peers[dev].sentHeartBeats = 0;
+
+ if(params.isEmpty()) {
+ qWarning() << "SignalProxy: received heart beat reply with less params then sent from:" << dev;
+ return;
+ }
+
+ QTime sendTime = params[0].value<QTime>();
+ updateLag(dev, sendTime.msecsTo(QTime::currentTime()) / 2);
+}
+
+void SignalProxy::updateLag(QIODevice *dev, int lag) {
+ Q_ASSERT(_peers.contains(dev));
+ _peers[dev].lag = lag;
+ if(proxyMode() == Client) {
+ emit lagUpdated(lag);
+ }
+}
+
+
void SignalProxy::dumpProxyStats() {
QString mode;
if(proxyMode() == Server)