X-Git-Url: https://git.quassel-irc.org/?p=quassel.git;a=blobdiff_plain;f=src%2Fcommon%2Fsignalproxy.cpp;h=0357c96242465a2b0526838b0ba1dd47f90de783;hp=85f053b25fae92ed30af9e81bd1a53b41d9a0c21;hb=6efa7d5a4cd38bc21ecfafd04d25a6c952d1e097;hpb=5c35ac3ee6f951d39cc052925aa224debfa148a8 diff --git a/src/common/signalproxy.cpp b/src/common/signalproxy.cpp index 85f053b2..0357c962 100644 --- a/src/common/signalproxy.cpp +++ b/src/common/signalproxy.cpp @@ -34,10 +34,22 @@ #include #include #include +#include +#include #include "syncableobject.h" #include "util.h" +// ================================================== +// PeerSignalEvent +// ================================================== +class PeerSignalEvent : public QEvent { +public: + PeerSignalEvent(SignalProxy *sender, SignalProxy::RequestType requestType, const QVariantList ¶ms) : QEvent(QEvent::User), sender(sender), requestType(requestType), params(params) {} + SignalProxy *sender; + SignalProxy::RequestType requestType; + QVariantList params; +}; // ================================================== // SIGNALRELAY @@ -55,12 +67,12 @@ public: void setSynchronize(bool); bool synchronize() const; - + int sigCount() const; - + private: bool isSyncMethod(int i); - + SignalProxy* proxy; QObject* caller; QMultiHash sigNames; @@ -98,7 +110,7 @@ int SignalRelay::qt_metacall(QMetaObject::Call _c, int _id, void **_a) { proxy->dispatchSignal(SignalProxy::RpcCall, QVariantList() << funcIter.value() << params); funcIter++; } - + // dispatch Sync Signal if necessary QByteArray signature(caller->metaObject()->method(_id).signature()); SyncableObject *syncObject = qobject_cast(caller); @@ -150,7 +162,7 @@ bool SignalRelay::isSyncMethod(int i) { QByteArray signature = syncObject->syncMetaObject()->method(i).signature(); if(!proxy->syncMap(syncObject).contains(signature)) return false; - + if(proxy->proxyMode() == SignalProxy::Server && !signature.contains("Requested")) return true; @@ -194,6 +206,26 @@ void SignalProxy::IODevicePeer::dispatchSignal(const RequestType &requestType, c dispatchPackedFunc(QVariant(packedFunc)); } +QString SignalProxy::IODevicePeer::address() const { + QAbstractSocket *socket = qobject_cast(_device); + if(socket) + return socket->peerAddress().toString(); + else + return QString(); +} + +void SignalProxy::SignalProxyPeer::dispatchSignal(const RequestType &requestType, const QVariantList ¶ms) { + Qt::ConnectionType type = QThread::currentThread() == receiver->thread() + ? Qt::DirectConnection + : Qt::QueuedConnection; + + if(type == Qt::DirectConnection) { + receiver->receivePeerSignal(sender, requestType, params); + } else { + QCoreApplication::postEvent(receiver, new PeerSignalEvent(sender, requestType, params)); + } +} + // ================================================== // SignalProxy // ================================================== @@ -217,7 +249,7 @@ SignalProxy::SignalProxy(ProxyMode mode, QIODevice* device, QObject* parent) setProxyMode(mode); addPeer(device); init(); -} +} SignalProxy::~SignalProxy() { QList senders = _relayHash.keys(); @@ -265,7 +297,7 @@ void SignalProxy::initClient() { bool SignalProxy::addPeer(QIODevice* iodev) { if(!iodev) return false; - + if(_peers.contains(iodev)) return true; @@ -285,6 +317,9 @@ bool SignalProxy::addPeer(QIODevice* iodev) { connect(sock, SIGNAL(disconnected()), this, SLOT(removePeerBySender())); } + if(!sock->parent()) + sock->setParent(this); + _peers[iodev] = new IODevicePeer(iodev, iodev->property("UseCompression").toBool()); if(_peers.count() == 1) @@ -293,6 +328,33 @@ bool SignalProxy::addPeer(QIODevice* iodev) { return true; } +bool SignalProxy::addPeer(SignalProxy* proxy) { + if(!proxy) + return false; + + if(proxyMode() == proxy->proxyMode()) { + qWarning() << "SignalProxy::addPeer(): adding a SignalProxy as peer requires one proxy to be server and one client!"; + return false; + } + + if(_peers.contains(proxy)) { + return true; + } + + if(proxyMode() == Client && !_peers.isEmpty()) { + qWarning("SignalProxy: only one peer allowed in client mode!"); + return false; + } + + _peers[proxy] = new SignalProxyPeer(this, proxy); + proxy->addPeer(this); + + if(_peers.count() == 1) + emit connected(); + + return true; +} + void SignalProxy::removeAllPeers() { Q_ASSERT(proxyMode() == Server || _peers.count() <= 1); // wee need to copy that list since we modify it in the loop @@ -321,6 +383,16 @@ void SignalProxy::removePeer(QObject* dev) { if(peer->type() == AbstractPeer::IODevicePeer) emit peerRemoved(static_cast(dev)); + if(peer->type() == AbstractPeer::SignalProxyPeer) { + SignalProxy *proxy = static_cast(dev); + if(proxy->_peers.contains(this)) + proxy->removePeer(this); + } + + + if(dev->parent() == this) + dev->deleteLater(); + delete peer; if(_peers.isEmpty()) @@ -339,7 +411,7 @@ void SignalProxy::objectRenamed(const QString &newname, const QString &oldname) if(proxyMode() == Client) return; - + QVariantList params; params << "__objectRenamed__" << className << newname << oldname; dispatchSignal(RpcCall, params); @@ -423,7 +495,7 @@ const QByteArray &SignalProxy::methodName(QObject *obj, int methodId) { void SignalProxy::setSyncMap(SyncableObject *obj) { const QMetaObject *meta = obj->syncMetaObject(); QHash syncMap; - + QList slotIndexes; for(int i = 0; i < meta->methodCount(); i++) { if(meta->method(i).methodType() == QMetaMethod::Slot) @@ -498,7 +570,7 @@ void SignalProxy::setReceiveMap(SyncableObject *obj) { signature = QByteArray(requestSlot.signature()); if(!signature.startsWith("request")) continue; - + paramsPos = signature.indexOf('('); if(paramsPos == -1) continue; @@ -520,7 +592,7 @@ void SignalProxy::setReceiveMap(SyncableObject *obj) { if(receiverId != -1) receiveMap[i] = receiverId; } - _classInfo[meta]->receiveMap = receiveMap; + _classInfo[meta]->receiveMap = receiveMap; } const QHash &SignalProxy::receiveMap(SyncableObject *obj) { @@ -602,7 +674,7 @@ bool SignalProxy::attachSlot(const QByteArray& sigName, QObject* recv, const cha void SignalProxy::synchronize(SyncableObject *obj) { createClassInfo(obj); setUpdatedRemotelyId(obj); - + // attaching all the Signals SignalRelay* relay; if(_relayHash.contains(obj)) @@ -712,7 +784,7 @@ void SignalProxy::receivePackedFunc(AbstractPeer *sender, const QVariant &packed RequestType requestType = (RequestType)params.takeFirst().value(); receivePeerSignal(sender, requestType, params); } - + void SignalProxy::receivePeerSignal(AbstractPeer *sender, const RequestType &requestType, const QVariantList ¶ms) { switch(requestType) { case RpcCall: @@ -726,15 +798,15 @@ void SignalProxy::receivePeerSignal(AbstractPeer *sender, const RequestType &req case Sync: handleSync(sender, params); break; - + case InitRequest: handleInitRequest(sender, params); break; - + case InitData: handleInitData(sender, params); break; - + case HeartBeat: receiveHeartBeat(sender, params); break; @@ -748,12 +820,21 @@ void SignalProxy::receivePeerSignal(AbstractPeer *sender, const RequestType &req } } +void SignalProxy::receivePeerSignal(SignalProxy *sender, const RequestType &requestType, const QVariantList ¶ms) { + if(!_peers.contains(sender)) { + // we output only the pointer value. otherwise Qt would try to pretty print. As the object might already been destroyed, this is not a good idea. + qWarning() << "SignalProxy::receivePeerSignal(): received Signal from unknown Proxy" << reinterpret_cast(sender); + return; + } + receivePeerSignal(_peers[sender], requestType, params); +} + void SignalProxy::handleSync(AbstractPeer *sender, QVariantList params) { if(params.count() < 3) { qWarning() << "received invalid Sync call" << params; return; } - + QByteArray className = params.takeFirst().toByteArray(); QString objectName = params.takeFirst().toString(); QByteArray signal = params.takeFirst().toByteArray(); @@ -790,7 +871,7 @@ void SignalProxy::handleSync(AbstractPeer *sender, QVariantList params) { returnParams << returnValue; sender->dispatchSignal(Sync, returnParams); } - + // send emit update signal invokeSlot(receiver, updatedRemotelyId(receiver)); } @@ -801,10 +882,10 @@ void SignalProxy::handleInitRequest(AbstractPeer *sender, const QVariantList &pa << params; return; } - + QByteArray className(params[0].toByteArray()); QString objectName(params[1].toString()); - + if(!_syncSlave.contains(className)) { qWarning() << "SignalProxy::handleInitRequest() received initRequest for unregistered Class:" << className; @@ -816,7 +897,7 @@ void SignalProxy::handleInitRequest(AbstractPeer *sender, const QVariantList &pa << className << objectName; return; } - + SyncableObject *obj = _syncSlave[className][objectName]; QVariantList params_; @@ -834,7 +915,7 @@ void SignalProxy::handleInitData(AbstractPeer *sender, const QVariantList ¶m << params; return; } - + QByteArray className(params[0].toByteArray()); QString objectName(params[1].toString()); QVariantMap propertyMap(params[2].toMap()); @@ -883,7 +964,7 @@ bool SignalProxy::invokeSlot(QObject *receiver, int methodId, const QVariantList return false; } - void *_a[] = {0, // return type... + void *_a[] = {0, // return type... 0, 0, 0, 0 , 0, // and 10 args - that's the max size qt can handle with signals and slots 0, 0, 0, 0 , 0}; @@ -903,7 +984,7 @@ bool SignalProxy::invokeSlot(QObject *receiver, int methodId, const QVariantList if(returnValue.type() != QVariant::Invalid) _a[0] = const_cast(returnValue.constData()); - + Qt::ConnectionType type = QThread::currentThread() == receiver->thread() ? Qt::DirectConnection : Qt::QueuedConnection; @@ -915,7 +996,7 @@ bool SignalProxy::invokeSlot(QObject *receiver, int methodId, const QVariantList // note to self: qmetaobject.cpp:990 ff return false; } - + } bool SignalProxy::invokeSlot(QObject *receiver, int methodId, const QVariantList ¶ms) { @@ -944,7 +1025,7 @@ void SignalProxy::writeDataToDevice(QIODevice *dev, const QVariant &item, bool c QDataStream out(&block, QIODevice::WriteOnly); out.setVersion(QDataStream::Qt_4_2); out << (quint32)0; - + if(compressed) { QByteArray rawItem; QDataStream itemStream(&rawItem, QIODevice::WriteOnly); @@ -981,7 +1062,7 @@ bool SignalProxy::readDataFromDevice(QIODevice *dev, quint32 &blockSize, QVarian dev->close(); return false; } - + if(dev->bytesAvailable() < blockSize) return false; @@ -998,7 +1079,7 @@ bool SignalProxy::readDataFromDevice(QIODevice *dev, quint32 &blockSize, QVarian } // end rawItem = qUncompress(rawItem); - + QDataStream itemStream(&rawItem, QIODevice::ReadOnly); itemStream.setVersion(QDataStream::Qt_4_2); itemStream >> item; @@ -1067,21 +1148,20 @@ void SignalProxy::setInitData(SyncableObject *obj, const QVariantMap &properties } void SignalProxy::sendHeartBeat() { - dispatchSignal(SignalProxy::HeartBeat, QVariantList() << QTime::currentTime()); + QVariantList heartBeatParams; + heartBeatParams << QTime::currentTime(); PeerHash::iterator peer = _peers.begin(); while(peer != _peers.end()) { if((*peer)->type() == AbstractPeer::IODevicePeer) { IODevicePeer *ioPeer = static_cast(*peer); + ioPeer->dispatchSignal(SignalProxy::HeartBeat, heartBeatParams); if(ioPeer->sentHeartBeats > 0) { updateLag(ioPeer, ioPeer->sentHeartBeats * _heartBeatTimer.interval()); } if(ioPeer->sentHeartBeats > 1) { - //FIXME: proper disconnect. -// QAbstractSocket *socket = qobject_cast(peerIter.key()); -// qWarning() << "SignalProxy: Disconnecting peer:" -// << (socket ? qPrintable(socket->peerAddress().toString()) : "local client") -// << "(didn't receive a heartbeat for over" << peerIter->sentHeartBeats * _heartBeatTimer.interval() / 1000 << "seconds)"; -// peerIter.key()->close(); + qWarning() << "SignalProxy: Disconnecting peer:" << ioPeer->address() + << "(didn't receive a heartbeat for over" << ioPeer->sentHeartBeats * _heartBeatTimer.interval() / 1000 << "seconds)"; + ioPeer->close(); } else { ioPeer->sentHeartBeats++; } @@ -1104,14 +1184,28 @@ void SignalProxy::receiveHeartBeatReply(AbstractPeer *peer, const QVariantList & ioPeer->sentHeartBeats = 0; if(params.isEmpty()) { - qWarning() << "SignalProxy: received heart beat reply with less params then sent from:" << ioPeer->device(); + qWarning() << "SignalProxy: received heart beat reply with less params then sent from:" << ioPeer->address(); return; } - + QTime sendTime = params[0].value(); updateLag(ioPeer, sendTime.msecsTo(QTime::currentTime()) / 2); } +void SignalProxy::customEvent(QEvent *event) { + switch(event->type()) { + case QEvent::User: + { + PeerSignalEvent *sig = static_cast(event); + receivePeerSignal(sig->sender, sig->requestType, sig->params); + } + event->accept(); + break; + default: + return; + } +} + void SignalProxy::updateLag(IODevicePeer *peer, int lag) { peer->lag = lag; if(proxyMode() == Client) { @@ -1134,7 +1228,7 @@ void SignalProxy::dumpProxyStats() { int slaveCount = 0; foreach(ObjectId oid, _syncSlave.values()) slaveCount += oid.count(); - + qDebug() << this; qDebug() << " Proxy Mode:" << mode; qDebug() << "attached sending Objects:" << _relayHash.count(); @@ -1151,13 +1245,13 @@ void SignalProxy::dumpSyncMap(SyncableObject *object) { QHash syncMap_ = syncMap(object); QHash::const_iterator iter = syncMap_.constBegin(); while(iter != syncMap_.constEnd()) { - qDebug() << iter.key() << "-->" << iter.value() << meta->method(iter.value()).signature(); + qDebug() << iter.key() << "-->" << iter.value() << meta->method(iter.value()).signature(); iter++; } // QHash syncMap_ = syncMap(object); // QHash::const_iterator iter = syncMap_.constBegin(); // while(iter != syncMap_.constEnd()) { -// qDebug() << iter.key() << meta->method(iter.key()).signature() << "-->" << iter.value() << meta->method(iter.value()).signature(); +// qDebug() << iter.key() << meta->method(iter.key()).signature() << "-->" << iter.value() << meta->method(iter.value()).signature(); // iter++; // } }