#include <QRegExp>
#include <QThread>
#include <QTime>
+#include <QEvent>
+#include <QCoreApplication>
#include "syncableobject.h"
#include "util.h"
+// ==================================================
+// PeerSignalEvent
+// ==================================================
+class PeerSignalEvent : public QEvent {
+public:
+ PeerSignalEvent(SignalProxy *sender, SignalProxy::RequestType requestType, const QVariantList ¶ms) : QEvent(QEvent::User), sender(sender), requestType(requestType), params(params) {}
+ SignalProxy *sender;
+ SignalProxy::RequestType requestType;
+ QVariantList params;
+};
// ==================================================
// SIGNALRELAY
dispatchPackedFunc(QVariant(packedFunc));
}
+QString SignalProxy::IODevicePeer::address() const {
+ QAbstractSocket *socket = qobject_cast<QAbstractSocket *>(_device);
+ if(socket)
+ return socket->peerAddress().toString();
+ else
+ return QString();
+}
+
+void SignalProxy::SignalProxyPeer::dispatchSignal(const RequestType &requestType, const QVariantList ¶ms) {
+ Qt::ConnectionType type = QThread::currentThread() == receiver->thread()
+ ? Qt::DirectConnection
+ : Qt::QueuedConnection;
+
+ if(type == Qt::DirectConnection) {
+ receiver->receivePeerSignal(sender, requestType, params);
+ } else {
+ QCoreApplication::postEvent(receiver, new PeerSignalEvent(sender, requestType, params));
+ }
+}
+
// ==================================================
// SignalProxy
// ==================================================
return true;
}
+bool SignalProxy::addPeer(SignalProxy* proxy) {
+ if(!proxy)
+ return false;
+
+ if(proxyMode() == proxy->proxyMode()) {
+ qWarning() << "SignalProxy::addPeer(): adding a SignalProxy as peer requires one proxy to be server and one client!";
+ return false;
+ }
+
+ if(_peers.contains(proxy)) {
+ return true;
+ }
+
+ if(proxyMode() == Client && !_peers.isEmpty()) {
+ qWarning("SignalProxy: only one peer allowed in client mode!");
+ return false;
+ }
+
+ _peers[proxy] = new SignalProxyPeer(this, proxy);
+
+ proxy->addPeer(this);
+
+ if(_peers.count() == 1)
+ emit connected();
+
+ return true;
+}
+
void SignalProxy::removeAllPeers() {
Q_ASSERT(proxyMode() == Server || _peers.count() <= 1);
// wee need to copy that list since we modify it in the loop
}
}
+void SignalProxy::receivePeerSignal(SignalProxy *sender, const RequestType &requestType, const QVariantList ¶ms) {
+ if(!_peers.contains(sender)) {
+ // we output only the pointer value. otherwise Qt would try to pretty print. As the object might already been destroyed, this is not a good idea.
+ qWarning() << "SignalProxy::receivePeerSignal(): received Signal from unknown Proxy" << reinterpret_cast<void *>(sender);
+ return;
+ }
+ receivePeerSignal(_peers[sender], requestType, params);
+}
+
void SignalProxy::handleSync(AbstractPeer *sender, QVariantList params) {
if(params.count() < 3) {
qWarning() << "received invalid Sync call" << params;
}
void SignalProxy::sendHeartBeat() {
- dispatchSignal(SignalProxy::HeartBeat, QVariantList() << QTime::currentTime());
+ QVariantList heartBeatParams;
+ heartBeatParams << QTime::currentTime();
PeerHash::iterator peer = _peers.begin();
while(peer != _peers.end()) {
if((*peer)->type() == AbstractPeer::IODevicePeer) {
IODevicePeer *ioPeer = static_cast<IODevicePeer *>(*peer);
+ ioPeer->dispatchSignal(SignalProxy::HeartBeat, heartBeatParams);
if(ioPeer->sentHeartBeats > 0) {
updateLag(ioPeer, ioPeer->sentHeartBeats * _heartBeatTimer.interval());
}
if(ioPeer->sentHeartBeats > 1) {
- //FIXME: proper disconnect.
-// QAbstractSocket *socket = qobject_cast<QAbstractSocket *>(peerIter.key());
-// qWarning() << "SignalProxy: Disconnecting peer:"
-// << (socket ? qPrintable(socket->peerAddress().toString()) : "local client")
-// << "(didn't receive a heartbeat for over" << peerIter->sentHeartBeats * _heartBeatTimer.interval() / 1000 << "seconds)";
-// peerIter.key()->close();
+ qWarning() << "SignalProxy: Disconnecting peer:" << ioPeer->address()
+ << "(didn't receive a heartbeat for over" << ioPeer->sentHeartBeats * _heartBeatTimer.interval() / 1000 << "seconds)";
+ ioPeer->close();
} else {
ioPeer->sentHeartBeats++;
}
ioPeer->sentHeartBeats = 0;
if(params.isEmpty()) {
- qWarning() << "SignalProxy: received heart beat reply with less params then sent from:" << ioPeer->device();
+ qWarning() << "SignalProxy: received heart beat reply with less params then sent from:" << ioPeer->address();
return;
}
updateLag(ioPeer, sendTime.msecsTo(QTime::currentTime()) / 2);
}
+void SignalProxy::customEvent(QEvent *event) {
+ switch(event->type()) {
+ case QEvent::User:
+ {
+ PeerSignalEvent *sig = static_cast<PeerSignalEvent *>(event);
+ receivePeerSignal(sig->sender, sig->requestType, sig->params);
+ }
+ event->accept();
+ break;
+ default:
+ return;
+ }
+}
+
void SignalProxy::updateLag(IODevicePeer *peer, int lag) {
peer->lag = lag;
if(proxyMode() == Client) {