#include <QRegExp>
#include <QThread>
#include <QTime>
+#include <QEvent>
+#include <QCoreApplication>
#include "syncableobject.h"
#include "util.h"
+// ==================================================
+// PeerSignalEvent
+// ==================================================
+class PeerSignalEvent : public QEvent {
+public:
+ PeerSignalEvent(SignalProxy *sender, SignalProxy::RequestType requestType, const QVariantList ¶ms) : QEvent(QEvent::User), sender(sender), requestType(requestType), params(params) {}
+ SignalProxy *sender;
+ SignalProxy::RequestType requestType;
+ QVariantList params;
+};
// ==================================================
// SIGNALRELAY
dispatchPackedFunc(QVariant(packedFunc));
}
+QString SignalProxy::IODevicePeer::address() const {
+ QAbstractSocket *socket = qobject_cast<QAbstractSocket *>(_device);
+ if(socket)
+ return socket->peerAddress().toString();
+ else
+ return QString();
+}
+
+void SignalProxy::SignalProxyPeer::dispatchSignal(const RequestType &requestType, const QVariantList ¶ms) {
+ Qt::ConnectionType type = QThread::currentThread() == receiver->thread()
+ ? Qt::DirectConnection
+ : Qt::QueuedConnection;
+
+ if(type == Qt::DirectConnection) {
+ receiver->receivePeerSignal(sender, requestType, params);
+ } else {
+ QCoreApplication::postEvent(receiver, new PeerSignalEvent(sender, requestType, params));
+ }
+}
+
// ==================================================
// SignalProxy
// ==================================================
QList<QObject*> senders = _relayHash.keys();
foreach(QObject* sender, senders)
detachObject(sender);
+ removeAllPeers();
}
void SignalProxy::setProxyMode(ProxyMode mode) {
connect(sock, SIGNAL(disconnected()), this, SLOT(removePeerBySender()));
}
- // we take ownership of that device
- iodev->setParent(this);
-
_peers[iodev] = new IODevicePeer(iodev, iodev->property("UseCompression").toBool());
if(_peers.count() == 1)
return true;
}
+bool SignalProxy::addPeer(SignalProxy* proxy) {
+ if(!proxy)
+ return false;
+
+ if(proxyMode() == proxy->proxyMode()) {
+ qWarning() << "SignalProxy::addPeer(): adding a SignalProxy as peer requires one proxy to be server and one client!";
+ return false;
+ }
+
+ if(_peers.contains(proxy)) {
+ return true;
+ }
+
+ if(proxyMode() == Client && !_peers.isEmpty()) {
+ qWarning("SignalProxy: only one peer allowed in client mode!");
+ return false;
+ }
+
+ _peers[proxy] = new SignalProxyPeer(this, proxy);
+
+ proxy->addPeer(this);
+
+ if(_peers.count() == 1)
+ emit connected();
+
+ return true;
+}
+
void SignalProxy::removeAllPeers() {
Q_ASSERT(proxyMode() == Server || _peers.count() <= 1);
// wee need to copy that list since we modify it in the loop
QList<QObject *> peers = _peers.keys();
foreach(QObject *peer, peers) {
- switch(_peers[peer]->type()) {
- case AbstractPeer::IODevicePeer:
- removePeer(static_cast<QIODevice *>(peer));
- break;
- case AbstractPeer::SignalProxyPeer:
- removePeer(static_cast<SignalProxy *>(peer));
- break;
- default:
- Q_ASSERT(false); // there shouldn't be any peers with wrong / unknown type
- }
+ removePeer(peer);
}
}
-void SignalProxy::removePeer(QIODevice* iodev) {
+void SignalProxy::removePeer(QObject* dev) {
if(_peers.isEmpty()) {
qWarning() << "SignalProxy::removePeer(): No peers in use!";
return;
}
- Q_ASSERT(iodev);
- if(!_peers.contains(iodev)) {
- qWarning() << "SignalProxy: unknown QIODevice" << iodev;
+ Q_ASSERT(dev);
+ if(!_peers.contains(dev)) {
+ qWarning() << "SignalProxy: unknown Peer" << dev;
return;
}
- AbstractPeer *peer = _peers[iodev];
- _peers.remove(iodev);
- delete peer;
-
- disconnect(iodev, 0, this, 0);
- emit peerRemoved(iodev);
+ AbstractPeer *peer = _peers[dev];
+ _peers.remove(dev);
- if(_peers.isEmpty())
- emit disconnected();
-}
+ disconnect(dev, 0, this, 0);
+ if(peer->type() == AbstractPeer::IODevicePeer)
+ emit peerRemoved(static_cast<QIODevice *>(dev));
-void SignalProxy::removePeer(SignalProxy *proxy) {
- if(!_peers.contains(proxy)) {
- qWarning() << "SignalProxy: unknown QIODevice" << proxy;
- return;
- }
-
- _peers.remove(proxy);
+ delete peer;
if(_peers.isEmpty())
emit disconnected();
}
-
void SignalProxy::removePeerBySender() {
- // OK we're brutal here... but since it's a private slot we know what we've got connected to it...
- // this Slot is not triggered by destroyed, so the object is still alive and can be used!
- QIODevice *ioDev = (QIODevice *)(sender());
- removePeer(ioDev);
+ removePeer(sender());
}
void SignalProxy::objectRenamed(const QString &newname, const QString &oldname) {
}
}
+void SignalProxy::receivePeerSignal(SignalProxy *sender, const RequestType &requestType, const QVariantList ¶ms) {
+ if(!_peers.contains(sender)) {
+ // we output only the pointer value. otherwise Qt would try to pretty print. As the object might already been destroyed, this is not a good idea.
+ qWarning() << "SignalProxy::receivePeerSignal(): received Signal from unknown Proxy" << reinterpret_cast<void *>(sender);
+ return;
+ }
+ receivePeerSignal(_peers[sender], requestType, params);
+}
+
void SignalProxy::handleSync(AbstractPeer *sender, QVariantList params) {
if(params.count() < 3) {
qWarning() << "received invalid Sync call" << params;
}
void SignalProxy::sendHeartBeat() {
- dispatchSignal(SignalProxy::HeartBeat, QVariantList() << QTime::currentTime());
+ QVariantList heartBeatParams;
+ heartBeatParams << QTime::currentTime();
PeerHash::iterator peer = _peers.begin();
while(peer != _peers.end()) {
if((*peer)->type() == AbstractPeer::IODevicePeer) {
IODevicePeer *ioPeer = static_cast<IODevicePeer *>(*peer);
+ ioPeer->dispatchSignal(SignalProxy::HeartBeat, heartBeatParams);
if(ioPeer->sentHeartBeats > 0) {
updateLag(ioPeer, ioPeer->sentHeartBeats * _heartBeatTimer.interval());
}
if(ioPeer->sentHeartBeats > 1) {
- //FIXME: proper disconnect.
-// QAbstractSocket *socket = qobject_cast<QAbstractSocket *>(peerIter.key());
-// qWarning() << "SignalProxy: Disconnecting peer:"
-// << (socket ? qPrintable(socket->peerAddress().toString()) : "local client")
-// << "(didn't receive a heartbeat for over" << peerIter->sentHeartBeats * _heartBeatTimer.interval() / 1000 << "seconds)";
-// peerIter.key()->close();
+ qWarning() << "SignalProxy: Disconnecting peer:" << ioPeer->address()
+ << "(didn't receive a heartbeat for over" << ioPeer->sentHeartBeats * _heartBeatTimer.interval() / 1000 << "seconds)";
+ ioPeer->close();
} else {
ioPeer->sentHeartBeats++;
}
ioPeer->sentHeartBeats = 0;
if(params.isEmpty()) {
- qWarning() << "SignalProxy: received heart beat reply with less params then sent from:" << ioPeer->device();
+ qWarning() << "SignalProxy: received heart beat reply with less params then sent from:" << ioPeer->address();
return;
}
updateLag(ioPeer, sendTime.msecsTo(QTime::currentTime()) / 2);
}
+void SignalProxy::customEvent(QEvent *event) {
+ switch(event->type()) {
+ case QEvent::User:
+ {
+ PeerSignalEvent *sig = static_cast<PeerSignalEvent *>(event);
+ receivePeerSignal(sig->sender, sig->requestType, sig->params);
+ }
+ event->accept();
+ break;
+ default:
+ return;
+ }
+}
+
void SignalProxy::updateLag(IODevicePeer *peer, int lag) {
peer->lag = lag;
if(proxyMode() == Client) {