#include <QObject>
#include <QIODevice>
#include <QAbstractSocket>
+#include <QHostAddress>
#include <QHash>
#include <QMultiHash>
#include <QList>
#include <QMetaProperty>
#include <QRegExp>
#include <QThread>
+#include <QTime>
#include "syncableobject.h"
#include "util.h"
sigNames.insert(methodId, fn);
}
// ====================
-// END SIGNALRELAY
+// /SIGNALRELAY
// ====================
: QObject(parent)
{
setProxyMode(Client);
+ init();
}
SignalProxy::SignalProxy(ProxyMode mode, QObject* parent)
: QObject(parent)
{
setProxyMode(mode);
+ init();
}
SignalProxy::SignalProxy(ProxyMode mode, QIODevice* device, QObject* parent)
{
setProxyMode(mode);
addPeer(device);
+ init();
}
SignalProxy::~SignalProxy() {
detachObject(sender);
// close peer connections
- foreach(QIODevice *device, _peerByteCount.keys()) {
+ foreach(QIODevice *device, _peers.keys()) {
device->close();
delete device;
}
}
void SignalProxy::setProxyMode(ProxyMode mode) {
- foreach(QIODevice* peer, _peerByteCount.keys()) {
+ foreach(QIODevice* peer, _peers.keys()) {
if(peer->isOpen()) {
qWarning() << "SignalProxy: Cannot change proxy mode while connected";
return;
initClient();
}
-SignalProxy::ProxyMode SignalProxy::proxyMode() const {
- return _proxyMode;
+void SignalProxy::init() {
+ connect(&_heartBeatTimer, SIGNAL(timeout()), this, SLOT(sendHeartBeat()));
+ _heartBeatTimer.start(30 * 1000);
}
void SignalProxy::initServer() {
- disconnect(&_heartBeatTimer, 0, this, 0);
- _heartBeatTimer.stop();
}
void SignalProxy::initClient() {
attachSlot("__objectRenamed__", this, SLOT(objectRenamed(QByteArray, QString, QString)));
- connect(&_heartBeatTimer, SIGNAL(timeout()),
- this, SLOT(sendHeartBeat()));
- _heartBeatTimer.start(60 * 1000); // msecs: one beep per minute
}
bool SignalProxy::addPeer(QIODevice* iodev) {
if(!iodev)
return false;
- if(_peerByteCount.contains(iodev))
+ if(_peers.contains(iodev))
return true;
- if(proxyMode() == Client && !_peerByteCount.isEmpty()) {
+ if(proxyMode() == Client && !_peers.isEmpty()) {
qWarning("SignalProxy: only one peer allowed in client mode!");
return false;
}
connect(sock, SIGNAL(disconnected()), this, SLOT(removePeerBySender()));
}
- _peerByteCount[iodev] = 0;
+ _peers[iodev] = peerInfo();
+ if(iodev->property("UseCompression").toBool())
+ _peers[iodev].usesCompression = true;
- if(_peerByteCount.count() == 1)
+ if(_peers.count() == 1)
emit connected();
return true;
}
void SignalProxy::removePeer(QIODevice* iodev) {
- if(_peerByteCount.isEmpty()) {
+ if(_peers.isEmpty()) {
qWarning() << "SignalProxy::removePeer(): No peers in use!";
return;
}
if(proxyMode() == Server && !iodev) {
// disconnect all
- QList<QIODevice *> peers = _peerByteCount.keys();
+ QList<QIODevice *> peers = _peers.keys();
foreach(QIODevice *peer, peers)
removePeer(peer);
}
if(proxyMode() != Server && !iodev)
- iodev = _peerByteCount.keys().first();
+ iodev = _peers.keys().first();
Q_ASSERT(iodev);
- if(!_peerByteCount.contains(iodev)) {
+ if(!_peers.contains(iodev)) {
qWarning() << "SignalProxy: unknown QIODevice" << iodev;
return;
}
- _peerByteCount.remove(iodev);
+ _peers.remove(iodev);
disconnect(iodev, 0, this, 0);
emit peerRemoved(iodev);
- if(_peerByteCount.isEmpty())
+ if(_peers.isEmpty())
emit disconnected();
}
void SignalProxy::removePeerBySender() {
// OK we're brutal here... but since it's a private slot we know what we've got connected to it...
+ // this Slot is not triggered by destroyed, so the object is still alive and can be used!
QIODevice *ioDev = (QIODevice *)(sender());
removePeer(ioDev);
- qDebug() << "Client disconnected.";
}
void SignalProxy::objectRenamed(const QString &newname, const QString &oldname) {
if(proxyMode() == Server) {
connect(obj, SIGNAL(objectRenamed(QString, QString)), this, SLOT(objectRenamed(QString, QString)));
- setInitialized(obj);
+ obj->setInitialized();
+ emit objectInitialized(obj);
} else {
- requestInit(obj);
+ if(obj->isInitialized())
+ emit objectInitialized(obj);
+ else
+ requestInit(obj);
}
}
-void SignalProxy::setInitialized(SyncableObject *obj) {
- obj->setInitialized();
- emit objectInitialized(obj);
-}
-
-bool SignalProxy::isInitialized(SyncableObject *obj) const {
- return obj->isInitialized();
-}
-
void SignalProxy::requestInit(SyncableObject *obj) {
- if(proxyMode() == Server || isInitialized(obj))
+ if(proxyMode() == Server || obj->isInitialized())
return;
QVariantList params;
}
void SignalProxy::dispatchSignal(QIODevice *receiver, const RequestType &requestType, const QVariantList ¶ms) {
+ Q_ASSERT(_peers.contains(receiver));
QVariantList packedFunc;
packedFunc << (qint16)requestType;
packedFunc << params;
- writeDataToDevice(receiver, QVariant(packedFunc));
+ writeDataToDevice(receiver, QVariant(packedFunc), _peers[receiver].usesCompression);
}
void SignalProxy::dispatchSignal(const RequestType &requestType, const QVariantList ¶ms) {
QVariantList packedFunc;
packedFunc << (qint16)requestType;
packedFunc << params;
- foreach(QIODevice* dev, _peerByteCount.keys())
- writeDataToDevice(dev, QVariant(packedFunc));
+ foreach(QIODevice* dev, _peers.keys()) {
+ Q_ASSERT(_peers.contains(dev));
+ writeDataToDevice(dev, QVariant(packedFunc), _peers[dev].usesCompression);
+ }
}
void SignalProxy::receivePeerSignal(QIODevice *sender, const QVariant &packedFunc) {
switch(callType) {
case RpcCall:
- if(params.empty()) {
+ if(params.empty())
qWarning() << "SignalProxy::receivePeerSignal(): received empty RPC-Call";
- return;
- } else {
- return handleSignal(params.takeFirst().toByteArray(), params);
- }
+ else
+ handleSignal(params.takeFirst().toByteArray(), params);
+ break;
+
case Sync:
- return handleSync(sender, params);
+ handleSync(sender, params);
+ break;
+
case InitRequest:
- return handleInitRequest(sender, params);
+ handleInitRequest(sender, params);
+ break;
+
case InitData:
- return handleInitData(sender, params);
+ handleInitData(sender, params);
+ break;
+
case HeartBeat:
- return;
+ receiveHeartBeat(sender, params);
+ break;
+
+ case HeartBeatReply:
+ receiveHeartBeatReply(sender, params);
+ break;
+
default:
qWarning() << "SignalProxy::receivePeerSignal(): received undefined CallType" << callType << params;
- return;
}
}
void SignalProxy::dataAvailable() {
// yet again. it's a private slot. no need for checks.
QIODevice* ioDev = qobject_cast<QIODevice* >(sender());
+ Q_ASSERT(_peers.contains(ioDev));
QVariant var;
- while(readDataFromDevice(ioDev, _peerByteCount[ioDev], var))
+ while(readDataFromDevice(ioDev, _peers[ioDev].byteCount, var, _peers[ioDev].usesCompression))
receivePeerSignal(ioDev, var);
}
-void SignalProxy::writeDataToDevice(QIODevice *dev, const QVariant &item) {
+void SignalProxy::writeDataToDevice(QIODevice *dev, const QVariant &item, bool compressed) {
QAbstractSocket* sock = qobject_cast<QAbstractSocket*>(dev);
if(!dev->isOpen() || (sock && sock->state()!=QAbstractSocket::ConnectedState)) {
- qWarning("SignalProxy: Can't call on a closed device");
+ qWarning("SignalProxy: Can't call write on a closed device");
return;
}
+
QByteArray block;
QDataStream out(&block, QIODevice::WriteOnly);
out.setVersion(QDataStream::Qt_4_2);
- out << (quint32)0 << item;
+ out << (quint32)0;
+
+ if(compressed) {
+ QByteArray rawItem;
+ QDataStream itemStream(&rawItem, QIODevice::WriteOnly);
+
+ itemStream.setVersion(QDataStream::Qt_4_2);
+ itemStream << item;
+
+ rawItem = qCompress(rawItem);
+
+ out << rawItem;
+ } else {
+ out << item;
+ }
+
out.device()->seek(0);
out << (quint32)(block.size() - sizeof(quint32));
+
dev->write(block);
}
-bool SignalProxy::readDataFromDevice(QIODevice *dev, quint32 &blockSize, QVariant &item) {
+bool SignalProxy::readDataFromDevice(QIODevice *dev, quint32 &blockSize, QVariant &item, bool compressed) {
QDataStream in(dev);
in.setVersion(QDataStream::Qt_4_2);
in >> blockSize;
}
+ if(blockSize > 1 << 22) {
+ qWarning() << qPrintable(tr("Client tried to send package larger than max package size!"));
+ QAbstractSocket *sock = qobject_cast<QAbstractSocket *>(dev);
+ qWarning() << qPrintable(tr("Disconnecting")) << (sock ? qPrintable(sock->peerAddress().toString()) : qPrintable(tr("local client")));
+ dev->close();
+ return false;
+ }
+
if(dev->bytesAvailable() < blockSize)
return false;
- in >> item;
+
+ if(compressed) {
+ QByteArray rawItem;
+ in >> rawItem;
+ rawItem = qUncompress(rawItem);
+
+ QDataStream itemStream(&rawItem, QIODevice::ReadOnly);
+ itemStream.setVersion(QDataStream::Qt_4_2);
+ itemStream >> item;
+ } else {
+ in >> item;
+ }
+
blockSize = 0;
+
return true;
}
QString methodname = QString(method.signature()).section("(", 0, 0);
// determine where we have to chop:
+ int upperCharPos;
if(method.methodType() == QMetaMethod::Slot) {
// we take evertyhing from the first uppercase char if it's slot
- methodname = methodname.mid(methodname.indexOf(QRegExp("[A-Z]")));
+ upperCharPos = methodname.indexOf(QRegExp("[A-Z]"));
+ if(upperCharPos == -1)
+ return QString();
+ methodname = methodname.mid(upperCharPos);
} else {
// and if it's a signal we discard everything from the last uppercase char
- methodname = methodname.left(methodname.lastIndexOf(QRegExp("[A-Z]")));
+ upperCharPos = methodname.lastIndexOf(QRegExp("[A-Z]"));
+ if(upperCharPos == -1)
+ return QString();
+ methodname = methodname.left(upperCharPos);
}
methodname[0] = methodname[0].toUpper();
}
void SignalProxy::setInitData(SyncableObject *obj, const QVariantMap &properties) {
- if(isInitialized(obj))
+ if(obj->isInitialized())
return;
obj->fromVariantMap(properties);
- setInitialized(obj);
+ obj->setInitialized();
+ emit objectInitialized(obj);
invokeSlot(obj, updatedRemotelyId(obj));
}
void SignalProxy::sendHeartBeat() {
- dispatchSignal(SignalProxy::HeartBeat, QVariantList());
+ dispatchSignal(SignalProxy::HeartBeat, QVariantList() << QTime::currentTime());
+ QHash<QIODevice *, peerInfo>::iterator peerIter = _peers.begin();
+ QHash<QIODevice *, peerInfo>::iterator peerIterEnd = _peers.end();
+ while(peerIter != peerIterEnd) {
+ if(peerIter->sentHeartBeats > 0) {
+ updateLag(peerIter.key(), _heartBeatTimer.interval());
+ }
+ if(peerIter->sentHeartBeats > 1) {
+ QAbstractSocket *socket = qobject_cast<QAbstractSocket *>(peerIter.key());
+ qWarning() << "SignalProxy: Disconnecting peer:"
+ << (socket ? qPrintable(socket->peerAddress().toString()) : "local client")
+ << "(didn't receive a heartbeat for over" << peerIter->sentHeartBeats * _heartBeatTimer.interval() / 1000 << "seconds)";
+ peerIter.key()->close();
+ } else {
+ peerIter->sentHeartBeats++;
+ }
+ peerIter++;
+ }
+}
+
+void SignalProxy::receiveHeartBeat(QIODevice *dev, const QVariantList ¶ms) {
+ if(!_peers.contains(dev)) {
+ qWarning() << "SignalProxy: received heart beat from unknown Device:" << dev;
+ }
+ dispatchSignal(dev, SignalProxy::HeartBeatReply, params);
}
+void SignalProxy::receiveHeartBeatReply(QIODevice *dev, const QVariantList ¶ms) {
+ if(!_peers.contains(dev)) {
+ qWarning() << "SignalProxy: received heart beat reply from unknown Device:" << dev;
+ return;
+ }
+
+ _peers[dev].sentHeartBeats = 0;
+
+ if(params.isEmpty()) {
+ qWarning() << "SignalProxy: received heart beat reply with less params then sent from:" << dev;
+ return;
+ }
+
+ QTime sendTime = params[0].value<QTime>();
+ updateLag(dev, sendTime.msecsTo(QTime::currentTime()) / 2);
+}
+
+void SignalProxy::updateLag(QIODevice *dev, int lag) {
+ Q_ASSERT(_peers.contains(dev));
+ _peers[dev].lag = lag;
+ if(proxyMode() == Client) {
+ emit lagUpdated(lag);
+ }
+}
+
+
void SignalProxy::dumpProxyStats() {
QString mode;
if(proxyMode() == Server)