fixing request -> receive sync calls
[quassel.git] / src / common / signalproxy.cpp
1 /***************************************************************************
2  *   Copyright (C) 2005-08 by the Quassel Project                          *
3  *   devel@quassel-irc.org                                                 *
4  *                                                                         *
5  *   This program is free software; you can redistribute it and/or modify  *
6  *   it under the terms of the GNU General Public License as published by  *
7  *   the Free Software Foundation; either version 2 of the License, or     *
8  *   (at your option) version 3.                                           *
9  *                                                                         *
10  *   This program is distributed in the hope that it will be useful,       *
11  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
12  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the         *
13  *   GNU General Public License for more details.                          *
14  *                                                                         *
15  *   You should have received a copy of the GNU General Public License     *
16  *   along with this program; if not, write to the                         *
17  *   Free Software Foundation, Inc.,                                       *
18  *   59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.             *
19  ***************************************************************************/
20
21 #include "signalproxy.h"
22
23 #include <QObject>
24 #include <QIODevice>
25 #include <QAbstractSocket>
26 #include <QHostAddress>
27 #include <QHash>
28 #include <QMultiHash>
29 #include <QList>
30 #include <QSet>
31 #include <QDebug>
32 #include <QMetaMethod>
33 #include <QMetaProperty>
34 #include <QRegExp>
35 #ifdef HAVE_SSL
36 #include <QSslSocket>
37 #endif
38 #include <QThread>
39 #include <QTime>
40 #include <QEvent>
41 #include <QCoreApplication>
42
43 #include "syncableobject.h"
44 #include "util.h"
45
46 // ==================================================
47 //  PeerSignalEvent
48 // ==================================================
49 class PeerSignalEvent : public QEvent {
50 public:
51   PeerSignalEvent(SignalProxy *sender, SignalProxy::RequestType requestType, const QVariantList &params) : QEvent(QEvent::Type(SignalProxy::PeerSignal)), sender(sender), requestType(requestType), params(params) {}
52   SignalProxy *sender;
53   SignalProxy::RequestType requestType;
54   QVariantList params;
55 };
56
57 class RemovePeerEvent : public QEvent {
58 public:
59   RemovePeerEvent(QObject *peer) : QEvent(QEvent::Type(SignalProxy::RemovePeer)), peer(peer) {}
60   QObject *peer;
61 };
62
63 // ==================================================
64 //  SIGNALRELAY
65 // ==================================================
66 class SignalRelay: public QObject {
67 /* Q_OBJECT is not necessary or even allowed, because we implement
68    qt_metacall ourselves (and don't use any other features of the meta
69    object system)
70 */
71 public:
72   SignalRelay(SignalProxy* parent, QObject* source);
73   int qt_metacall(QMetaObject::Call _c, int _id, void **_a);
74
75   void attachSignal(int methodId, const QByteArray &func);
76
77   void setSynchronize(bool);
78
79   inline bool synchronize() const { return _sync; }
80   inline int sigCount() const { return sigNames.count(); }
81
82 private:
83   bool isSyncMethod(int i);
84
85   SignalProxy* proxy;
86   QObject* caller;
87   QMultiHash<int, QByteArray> sigNames;
88   bool _sync;
89 };
90
91 SignalRelay::SignalRelay(SignalProxy* parent, QObject* source)
92   : QObject(parent),
93     proxy(parent),
94     caller(source),
95     _sync(false)
96 {
97   QObject::connect(source, SIGNAL(destroyed()), parent, SLOT(detachSender()));
98 }
99
100 int SignalRelay::qt_metacall(QMetaObject::Call _c, int _id, void **_a) {
101   _id = QObject::qt_metacall(_c, _id, _a);
102   if(_id < 0)
103     return _id;
104
105   SignalProxy::ExtendedMetaObject *eMeta = proxy->extendedMetaObject(caller);
106   Q_ASSERT(eMeta);
107   if(_c == QMetaObject::InvokeMetaMethod) {
108     if(sigNames.contains(_id) || synchronize()) {
109       const QList<int> &argTypes = eMeta->argTypes(_id);
110       QVariantList params;
111       int n = argTypes.size();
112       for(int i=0; i<n; i++) {
113         if(argTypes[i] == 0) {
114           qWarning() << "SignalRelay::qt_metacall(): received invalid data for argument number" << i << "of signal" << QString("%1::%2").arg(caller->metaObject()->className()).arg(caller->metaObject()->method(_id).signature());
115           qWarning() << "                            - make sure all your data types are known by the Qt MetaSystem";
116           return _id;
117         }
118         params.append(QVariant(argTypes[i], _a[i+1]));
119       }
120       QMultiHash<int, QByteArray>::const_iterator funcIter = sigNames.constFind(_id);
121       while(funcIter != sigNames.constEnd() && funcIter.key() == _id) {
122         proxy->dispatchSignal(SignalProxy::RpcCall, QVariantList() << funcIter.value() << params);
123         funcIter++;
124       }
125
126       // dispatch Sync Signal if necessary
127       QByteArray signature(caller->metaObject()->method(_id).signature());
128       SyncableObject *syncObject = qobject_cast<SyncableObject *>(caller);
129       if(synchronize() && eMeta->syncMap().contains(signature)) {
130          //qDebug() << "__SYNC__ >>>"
131          //      << caller->metaObject()->className()
132          //      << caller->objectName()
133          //      << signature
134          //      << params;
135         // params.prepend(QVariant(_id));
136         params.prepend(signature);
137         params.prepend(syncObject->objectName());
138         params.prepend(syncObject->syncMetaObject()->className());
139         proxy->dispatchSignal(SignalProxy::Sync, params);
140       }
141     }
142     _id -= QObject::staticMetaObject.methodCount();
143   }
144   return _id;
145 }
146
147 void SignalRelay::setSynchronize(bool sync) {
148   SyncableObject *syncObject = qobject_cast<SyncableObject *>(caller);
149   if(!syncObject)
150     return;
151
152   const QMetaObject *meta = syncObject->syncMetaObject();
153   if(!_sync && sync) {
154     // enable Sync
155     for(int i = 0; i < meta->methodCount(); i++ ) {
156       if(isSyncMethod(i))
157         QMetaObject::connect(caller, i, this, QObject::staticMetaObject.methodCount() + i);
158     }
159   } else if (_sync && !sync) {
160     // disable Sync
161     for(int i = 0; i < meta->methodCount(); i++ ) {
162       if(isSyncMethod(i))
163         QMetaObject::disconnect(caller, i, this, QObject::staticMetaObject.methodCount() + i);
164     }
165   }
166   _sync = sync;
167 }
168
169 bool SignalRelay::isSyncMethod(int i) {
170   SyncableObject *syncObject = qobject_cast<SyncableObject *>(caller);
171   if(!syncObject)
172     return false;
173
174   QByteArray signature = syncObject->syncMetaObject()->method(i).signature();
175   SignalProxy::ExtendedMetaObject *eMeta = proxy->extendedMetaObject(caller);
176   Q_ASSERT(eMeta);
177   if(!eMeta->syncMap().contains(signature))
178     return false;
179
180   if(proxy->proxyMode() == SignalProxy::Server && !signature.contains("Requested"))
181     return true;
182
183   if(proxy->proxyMode() == SignalProxy::Client && signature.contains("Requested"))
184     return true;
185
186   return false;
187 }
188
189 void SignalRelay::attachSignal(int methodId, const QByteArray &func) {
190   // we ride without safetybelts here... all checking for valid method etc pp has to be done by the caller
191   // all connected methodIds are offset by the standard methodCount of QObject
192   if(!sigNames.contains(methodId))
193     QMetaObject::connect(caller, methodId, this, QObject::staticMetaObject.methodCount() + methodId);
194
195   QByteArray fn;
196   if(!func.isEmpty()) {
197     fn = QMetaObject::normalizedSignature(func);
198   } else {
199     fn = QByteArray("2") + caller->metaObject()->method(methodId).signature();
200   }
201   sigNames.insert(methodId, fn);
202 }
203
204 // ==================================================
205 //  Peers
206 // ==================================================
207 void SignalProxy::IODevicePeer::dispatchSignal(const RequestType &requestType, const QVariantList &params) {
208   QVariantList packedFunc;
209   packedFunc << (qint16)requestType
210              << params;
211   dispatchPackedFunc(QVariant(packedFunc));
212 }
213
214 bool SignalProxy::IODevicePeer::isSecure() const {
215 #ifdef HAVE_SSL
216   QSslSocket *sslSocket = qobject_cast<QSslSocket *>(_device);
217   if(sslSocket)
218     return sslSocket->isEncrypted() || sslSocket->localAddress() == QHostAddress::LocalHost || sslSocket->localAddress() == QHostAddress::LocalHostIPv6;
219 #endif
220
221   QAbstractSocket *socket = qobject_cast<QAbstractSocket *>(_device);
222   if(socket)
223     return socket->localAddress() == QHostAddress::LocalHost || socket->localAddress() == QHostAddress::LocalHostIPv6;
224
225   return false;
226 }
227
228 QString SignalProxy::IODevicePeer::address() const {
229   QAbstractSocket *socket = qobject_cast<QAbstractSocket *>(_device);
230   if(socket)
231     return socket->peerAddress().toString();
232   else
233     return QString();
234 }
235
236 void SignalProxy::SignalProxyPeer::dispatchSignal(const RequestType &requestType, const QVariantList &params) {
237   Qt::ConnectionType type = QThread::currentThread() == receiver->thread()
238     ? Qt::DirectConnection
239     : Qt::QueuedConnection;
240
241   if(type == Qt::DirectConnection) {
242     receiver->receivePeerSignal(sender, requestType, params);
243   } else {
244     QCoreApplication::postEvent(receiver, new PeerSignalEvent(sender, requestType, params));
245   }
246 }
247
248 // ==================================================
249 //  SignalProxy
250 // ==================================================
251 SignalProxy::SignalProxy(QObject* parent)
252   : QObject(parent)
253 {
254   setProxyMode(Client);
255   init();
256 }
257
258 SignalProxy::SignalProxy(ProxyMode mode, QObject* parent)
259   : QObject(parent)
260 {
261   setProxyMode(mode);
262   init();
263 }
264
265 SignalProxy::SignalProxy(ProxyMode mode, QIODevice* device, QObject* parent)
266   : QObject(parent)
267 {
268   setProxyMode(mode);
269   addPeer(device);
270   init();
271 }
272
273 SignalProxy::~SignalProxy() {
274   QList<QObject*> senders = _relayHash.keys();
275   foreach(QObject* sender, senders)
276     detachObject(sender);
277   removeAllPeers();
278 }
279
280 void SignalProxy::setProxyMode(ProxyMode mode) {
281   PeerHash::iterator peer = _peers.begin();
282   while(peer != _peers.end()) {
283     if((*peer)->type() != AbstractPeer::IODevicePeer) {
284       IODevicePeer *ioPeer = static_cast<IODevicePeer *>(*peer);
285       if(ioPeer->isOpen()) {
286         qWarning() << "SignalProxy: Cannot change proxy mode while connected";
287         return;
288       }
289     }
290     if((*peer)->type() != AbstractPeer::SignalProxyPeer) {
291       qWarning() << "SignalProxy: Cannot change proxy mode while connected to another internal SignalProxy";
292       return;
293     }
294     peer++;
295   }
296
297   _proxyMode = mode;
298   if(mode == Server)
299     initServer();
300   else
301     initClient();
302 }
303
304 void SignalProxy::init() {
305   connect(&_heartBeatTimer, SIGNAL(timeout()), this, SLOT(sendHeartBeat()));
306   _heartBeatTimer.start(30 * 1000);
307   _secure = false;
308   updateSecureState();
309 }
310
311 void SignalProxy::initServer() {
312 }
313
314 void SignalProxy::initClient() {
315   attachSlot("__objectRenamed__", this, SLOT(objectRenamed(QByteArray, QString, QString)));
316 }
317
318 bool SignalProxy::addPeer(QIODevice* iodev) {
319   if(!iodev)
320     return false;
321
322   if(_peers.contains(iodev))
323     return true;
324
325   if(proxyMode() == Client && !_peers.isEmpty()) {
326     qWarning("SignalProxy: only one peer allowed in client mode!");
327     return false;
328   }
329
330   if(!iodev->isOpen()) {
331     qWarning("SignalProxy::addPeer(QIODevice *iodev): iodev needs to be open!");
332     return false;
333   }
334
335   connect(iodev, SIGNAL(disconnected()), this, SLOT(removePeerBySender()));
336   connect(iodev, SIGNAL(readyRead()), this, SLOT(dataAvailable()));
337
338 #ifdef HAVE_SSL
339   QSslSocket *sslSocket = qobject_cast<QSslSocket *>(iodev);
340   if(sslSocket) {
341     connect(iodev, SIGNAL(encrypted()), this, SLOT(updateSecureState()));
342   }
343 #endif
344
345   if(!iodev->parent())
346     iodev->setParent(this);
347
348   _peers[iodev] = new IODevicePeer(iodev, iodev->property("UseCompression").toBool());
349
350   if(_peers.count() == 1)
351     emit connected();
352
353   updateSecureState();
354   return true;
355 }
356
357 bool SignalProxy::addPeer(SignalProxy* proxy) {
358   if(!proxy)
359     return false;
360
361   if(proxyMode() == proxy->proxyMode()) {
362     qWarning() << "SignalProxy::addPeer(): adding a SignalProxy as peer requires one proxy to be server and one client!";
363     return false;
364   }
365
366   if(_peers.contains(proxy)) {
367     return true;
368   }
369
370   if(proxyMode() == Client && !_peers.isEmpty()) {
371     qWarning("SignalProxy: only one peer allowed in client mode!");
372     return false;
373   }
374
375   _peers[proxy] = new SignalProxyPeer(this, proxy);
376   proxy->addPeer(this);
377
378   if(_peers.count() == 1)
379     emit connected();
380
381   updateSecureState();
382   return true;
383 }
384
385 void SignalProxy::removeAllPeers() {
386   Q_ASSERT(proxyMode() == Server || _peers.count() <= 1);
387   // wee need to copy that list since we modify it in the loop
388   QList<QObject *> peers = _peers.keys();
389   foreach(QObject *peer, peers) {
390     removePeer(peer);
391   }
392 }
393
394 void SignalProxy::removePeer(QObject* dev) {
395   if(_peers.isEmpty()) {
396     qWarning() << "SignalProxy::removePeer(): No peers in use!";
397     return;
398   }
399
400   Q_ASSERT(dev);
401   if(!_peers.contains(dev)) {
402     qWarning() << "SignalProxy: unknown Peer" << dev;
403     return;
404   }
405
406   AbstractPeer *peer = _peers[dev];
407   _peers.remove(dev);
408
409   disconnect(dev, 0, this, 0);
410   if(peer->type() == AbstractPeer::IODevicePeer)
411     emit peerRemoved(static_cast<QIODevice *>(dev));
412
413   if(peer->type() == AbstractPeer::SignalProxyPeer) {
414     SignalProxy *proxy = static_cast<SignalProxy *>(dev);
415     if(proxy->_peers.contains(this))
416       proxy->removePeer(this);
417   }
418
419   if(dev->parent() == this)
420     dev->deleteLater();
421
422   delete peer;
423
424   updateSecureState();
425
426   if(_peers.isEmpty())
427     emit disconnected();
428 }
429
430 void SignalProxy::removePeerBySender() {
431   removePeer(sender());
432 }
433
434 void SignalProxy::objectRenamed(const QString &newname, const QString &oldname) {
435   SyncableObject *syncObject = qobject_cast<SyncableObject *>(sender());
436   const QMetaObject *meta = syncObject->syncMetaObject();
437   const QByteArray className(meta->className());
438   objectRenamed(className, newname, oldname);
439
440   if(proxyMode() == Client)
441     return;
442
443   QVariantList params;
444   params << "__objectRenamed__" << className << newname << oldname;
445   dispatchSignal(RpcCall, params);
446 }
447
448 void SignalProxy::objectRenamed(const QByteArray &classname, const QString &newname, const QString &oldname) {
449   if(_syncSlave.contains(classname) && _syncSlave[classname].contains(oldname) && oldname != newname) {
450     SyncableObject *obj = _syncSlave[classname][newname] = _syncSlave[classname].take(oldname);
451     requestInit(obj);
452   }
453 }
454
455 const QMetaObject *SignalProxy::metaObject(const QObject *obj) {
456   if(const SyncableObject *syncObject = qobject_cast<const SyncableObject *>(obj))
457     return syncObject->syncMetaObject();
458   else
459     return obj->metaObject();
460 }
461
462 SignalProxy::ExtendedMetaObject *SignalProxy::extendedMetaObject(const QObject *obj) const {
463   if(_extendedMetaObjects.contains(metaObject(obj)))
464     return _extendedMetaObjects[metaObject(obj)];
465   else
466     return 0;
467 }
468
469 void SignalProxy::createExtendedMetaObject(const QObject *obj) {
470   const QMetaObject *meta = metaObject(obj);
471   if(!_extendedMetaObjects.contains(meta)) {
472     _extendedMetaObjects[meta] = new ExtendedMetaObject(meta);
473   }
474 }
475
476 bool SignalProxy::attachSignal(QObject* sender, const char* signal, const QByteArray& sigName) {
477   const QMetaObject* meta = metaObject(sender);
478   QByteArray sig(meta->normalizedSignature(signal).mid(1));
479   int methodId = meta->indexOfMethod(sig.constData());
480   if(methodId == -1 || meta->method(methodId).methodType() != QMetaMethod::Signal) {
481     qWarning() << "SignalProxy::attachSignal(): No such signal" << signal;
482     return false;
483   }
484
485   createExtendedMetaObject(sender);
486
487   SignalRelay* relay;
488   if(_relayHash.contains(sender))
489     relay = _relayHash[sender];
490   else
491     relay = _relayHash[sender] = new SignalRelay(this, sender);
492
493   relay->attachSignal(methodId, sigName);
494
495   return true;
496 }
497
498
499 bool SignalProxy::attachSlot(const QByteArray& sigName, QObject* recv, const char* slot) {
500   const QMetaObject* meta = recv->metaObject();
501   int methodId = meta->indexOfMethod(meta->normalizedSignature(slot).mid(1));
502   if(methodId == -1 || meta->method(methodId).methodType() == QMetaMethod::Method) {
503     qWarning() << "SignalProxy::attachSlot(): No such slot" << slot;
504     return false;
505   }
506
507   createExtendedMetaObject(recv);
508
509   QByteArray funcName = QMetaObject::normalizedSignature(sigName.constData());
510   _attachedSlots.insert(funcName, qMakePair(recv, methodId));
511
512   QObject::disconnect(recv, SIGNAL(destroyed()), this, SLOT(detachSender()));
513   QObject::connect(recv, SIGNAL(destroyed()), this, SLOT(detachSender()));
514   return true;
515 }
516
517 void SignalProxy::synchronize(SyncableObject *obj) {
518   createExtendedMetaObject(obj);
519
520   // attaching all the Signals
521   SignalRelay* relay;
522   if(_relayHash.contains(obj))
523     relay = _relayHash[obj];
524   else
525     relay = _relayHash[obj] = new SignalRelay(this, obj);
526
527   relay->setSynchronize(true);
528
529   // attaching as slave to receive sync Calls
530   QByteArray className(obj->syncMetaObject()->className());
531   _syncSlave[className][obj->objectName()] = obj;
532
533   if(proxyMode() == Server) {
534     connect(obj, SIGNAL(objectRenamed(QString, QString)), this, SLOT(objectRenamed(QString, QString)));
535     obj->setInitialized();
536     emit objectInitialized(obj);
537   } else {
538     if(obj->isInitialized())
539       emit objectInitialized(obj);
540     else
541       requestInit(obj);
542   }
543 }
544
545 void SignalProxy::detachSender() {
546   detachObject(sender());
547 }
548
549 void SignalProxy::detachObject(QObject* obj) {
550   stopSync(static_cast<SyncableObject *>(obj));
551   detachSignals(obj);
552   detachSlots(obj);
553 }
554
555 void SignalProxy::detachSignals(QObject* sender) {
556   if(!_relayHash.contains(sender))
557     return;
558   _relayHash.take(sender)->deleteLater();
559 }
560
561 void SignalProxy::detachSlots(QObject* receiver) {
562   SlotHash::iterator slotIter = _attachedSlots.begin();
563   while(slotIter != _attachedSlots.end()) {
564     if(slotIter.value().first == receiver) {
565       slotIter = _attachedSlots.erase(slotIter);
566     } else
567       slotIter++;
568   }
569 }
570
571 void SignalProxy::stopSync(SyncableObject* obj) {
572   if(_relayHash.contains(obj))
573     _relayHash[obj]->setSynchronize(false);
574
575   // we can't use a className here, since it might be effed up, if we receive the call as a result of a decon
576   // gladly the objectName() is still valid. So we have only to iterate over the classes not each instance! *sigh*
577   QHash<QByteArray, ObjectId>::iterator classIter = _syncSlave.begin();
578   while(classIter != _syncSlave.end()) {
579     if(classIter->contains(obj->objectName()) && classIter.value()[obj->objectName()] == obj) {
580       classIter->remove(obj->objectName());
581       break;
582     }
583     classIter++;
584   }
585 }
586
587 void SignalProxy::dispatchSignal(const RequestType &requestType, const QVariantList &params) {
588   QVariant packedFunc(QVariantList() << (qint16)requestType << params);
589   PeerHash::iterator peer = _peers.begin();
590   while(peer != _peers.end()) {
591     switch((*peer)->type()) {
592     case AbstractPeer::IODevicePeer:
593       {
594         IODevicePeer *ioPeer = static_cast<IODevicePeer *>(*peer);
595         if(ioPeer->isOpen())
596           ioPeer->dispatchPackedFunc(packedFunc);
597         else
598           QCoreApplication::postEvent(this, new RemovePeerEvent(peer.key()));
599       }
600       break;
601     case AbstractPeer::SignalProxyPeer:
602       (*peer)->dispatchSignal(requestType, params);
603       break;
604     default:
605       Q_ASSERT(false); // there shouldn't be any peers with wrong / unknown type
606     }
607     peer++;
608   }
609 }
610
611 void SignalProxy::receivePackedFunc(AbstractPeer *sender, const QVariant &packedFunc) {
612   QVariantList params(packedFunc.toList());
613
614   if(params.isEmpty()) {
615     qWarning() << "SignalProxy::receivePeerSignal(): received incompatible Data:" << packedFunc;
616     return;
617   }
618
619   RequestType requestType = (RequestType)params.takeFirst().value<int>();
620   receivePeerSignal(sender, requestType, params);
621 }
622
623 void SignalProxy::receivePeerSignal(AbstractPeer *sender, const RequestType &requestType, const QVariantList &params) {
624   switch(requestType) {
625   case RpcCall:
626     if(params.empty())
627       qWarning() << "SignalProxy::receivePeerSignal(): received empty RPC-Call";
628     else
629       handleSignal(params);
630       //handleSignal(params.takeFirst().toByteArray(), params);
631     break;
632
633   case Sync:
634     handleSync(sender, params);
635     break;
636
637   case InitRequest:
638     handleInitRequest(sender, params);
639     break;
640
641   case InitData:
642     handleInitData(sender, params);
643     break;
644
645   case HeartBeat:
646     receiveHeartBeat(sender, params);
647     break;
648
649   case HeartBeatReply:
650     receiveHeartBeatReply(sender, params);
651     break;
652
653   default:
654     qWarning() << "SignalProxy::receivePeerSignal(): received undefined CallType" << requestType << params;
655   }
656 }
657
658 void SignalProxy::receivePeerSignal(SignalProxy *sender, const RequestType &requestType, const QVariantList &params) {
659   if(!_peers.contains(sender)) {
660     // we output only the pointer value. otherwise Qt would try to pretty print. As the object might already been destroyed, this is not a good idea.
661     qWarning() << "SignalProxy::receivePeerSignal(): received Signal from unknown Proxy" << reinterpret_cast<void *>(sender);
662     return;
663   }
664   receivePeerSignal(_peers[sender], requestType, params);
665 }
666
667 void SignalProxy::handleSync(AbstractPeer *sender, QVariantList params) {
668   if(params.count() < 3) {
669     qWarning() << "received invalid Sync call" << params;
670     return;
671   }
672
673   QByteArray className = params.takeFirst().toByteArray();
674   QString objectName = params.takeFirst().toString();
675   QByteArray signal = params.takeFirst().toByteArray();
676
677   if(!_syncSlave.contains(className) || !_syncSlave[className].contains(objectName)) {
678     qWarning() << QString("no registered receiver for sync call: %1::%2 (objectName=\"%3\"). Params are:").arg(QString(className)).arg(QString(signal)).arg(objectName)
679                << params;
680     return;
681   }
682
683   SyncableObject *receiver = _syncSlave[className][objectName];
684   ExtendedMetaObject *eMeta = extendedMetaObject(receiver);
685   if(!eMeta->syncMap().contains(signal)) {
686     qWarning() << QString("no matching slot for sync call: %1::%2 (objectName=\"%3\"). Params are:").arg(QString(className)).arg(QString(signal)).arg(objectName)
687                << params;
688     return;
689   }
690
691   int slotId = eMeta->syncMap()[signal];
692
693   QVariant returnValue((QVariant::Type)eMeta->returnType(slotId));
694   if(!invokeSlot(receiver, slotId, params, returnValue)) {
695     qWarning("SignalProxy::handleSync(): invokeMethod for \"%s\" failed ", eMeta->methodName(slotId).constData());
696     return;
697   }
698
699   if(returnValue.type() != QVariant::Invalid && eMeta->receiveMap().contains(slotId)) {
700     int receiverId = eMeta->receiveMap()[slotId];
701     QVariantList returnParams;
702     returnParams << className
703                  << objectName
704                  << QByteArray(receiver->metaObject()->method(receiverId).signature());
705     if(eMeta->argTypes(receiverId).count() > 1)
706       returnParams << params;
707     returnParams << returnValue;
708     sender->dispatchSignal(Sync, returnParams);
709   }
710
711   // send emit update signal
712   invokeSlot(receiver, eMeta->updatedRemotelyId());
713 }
714
715 void SignalProxy::handleInitRequest(AbstractPeer *sender, const QVariantList &params) {
716   if(params.count() != 2) {
717     qWarning() << "SignalProxy::handleInitRequest() received initRequest with invalid param Count:"
718                << params;
719     return;
720   }
721
722   QByteArray className(params[0].toByteArray());
723   QString objectName(params[1].toString());
724
725   if(!_syncSlave.contains(className)) {
726     qWarning() << "SignalProxy::handleInitRequest() received initRequest for unregistered Class:"
727                << className;
728     return;
729   }
730
731   if(!_syncSlave[className].contains(objectName)) {
732     qWarning() << "SignalProxy::handleInitRequest() received initRequest for unregistered Object:"
733                << className << objectName;
734     return;
735   }
736
737   SyncableObject *obj = _syncSlave[className][objectName];
738
739   QVariantList params_;
740   params_ << className
741           << objectName
742           << initData(obj);
743
744   sender->dispatchSignal(InitData, params_);
745 }
746
747 void SignalProxy::handleInitData(AbstractPeer *sender, const QVariantList &params) {
748   Q_UNUSED(sender)
749   if(params.count() != 3) {
750     qWarning() << "SignalProxy::handleInitData() received initData with invalid param Count:"
751                << params;
752     return;
753   }
754
755   QByteArray className(params[0].toByteArray());
756   QString objectName(params[1].toString());
757   QVariantMap propertyMap(params[2].toMap());
758
759   if(!_syncSlave.contains(className)) {
760     qWarning() << "SignalProxy::handleInitData() received initData for unregistered Class:"
761                << className;
762     return;
763   }
764
765   if(!_syncSlave[className].contains(objectName)) {
766     qWarning() << "SignalProxy::handleInitData() received initData for unregistered Object:"
767                << className << objectName;
768     return;
769   }
770
771   SyncableObject *obj = _syncSlave[className][objectName];
772   setInitData(obj, propertyMap);
773 }
774
775 //void SignalProxy::handleSignal(const QByteArray &funcName, const QVariantList &params) {
776 void SignalProxy::handleSignal(const QVariantList &data) {
777   QVariantList params = data;
778   QByteArray funcName = params.takeFirst().toByteArray();
779
780   QObject* receiver;
781   int methodId;
782   SlotHash::const_iterator slot = _attachedSlots.constFind(funcName);
783   while(slot != _attachedSlots.constEnd() && slot.key() == funcName) {
784     receiver = (*slot).first;
785     methodId = (*slot).second;
786     if(!invokeSlot(receiver, methodId, params)) {
787       ExtendedMetaObject *eMeta = extendedMetaObject(receiver);
788       qWarning("SignalProxy::handleSignal(): invokeMethod for \"%s\" failed ", eMeta->methodName(methodId).constData());
789     }
790     slot++;
791   }
792 }
793
794 bool SignalProxy::invokeSlot(QObject *receiver, int methodId, const QVariantList &params, QVariant &returnValue) {
795   ExtendedMetaObject *eMeta = extendedMetaObject(receiver);
796   const QList<int> args = eMeta->argTypes(methodId);
797   const int numArgs = params.count() < args.count()
798     ? params.count()
799     : args.count();
800
801   if(eMeta->minArgCount(methodId) > params.count()) {
802       qWarning() << "SignalProxy::invokeSlot(): not enough params to invoke" << eMeta->methodName(methodId);
803       return false;
804   }
805
806   void *_a[] = {0,              // return type...
807                 0, 0, 0, 0 , 0, // and 10 args - that's the max size qt can handle with signals and slots
808                 0, 0, 0, 0 , 0};
809
810   // check for argument compatibility and build params array
811   for(int i = 0; i < numArgs; i++) {
812     if(!params[i].isValid()) {
813       qWarning() << "SignalProxy::invokeSlot(): received invalid data for argument number" << i << "of method" << QString("%1::%2()").arg(receiver->metaObject()->className()).arg(receiver->metaObject()->method(methodId).signature());
814       qWarning() << "                            - make sure all your data types are known by the Qt MetaSystem";
815       return false;
816     }
817     if(args[i] != QMetaType::type(params[i].typeName())) {
818       qWarning() << "SignalProxy::invokeSlot(): incompatible param types to invoke" << eMeta->methodName(methodId);
819       return false;
820     }
821     _a[i+1] = const_cast<void *>(params[i].constData());
822   }
823
824   if(returnValue.type() != QVariant::Invalid)
825     _a[0] = const_cast<void *>(returnValue.constData());
826
827   Qt::ConnectionType type = QThread::currentThread() == receiver->thread()
828     ? Qt::DirectConnection
829     : Qt::QueuedConnection;
830
831   if(type == Qt::DirectConnection) {
832     return receiver->qt_metacall(QMetaObject::InvokeMetaMethod, methodId, _a) < 0;
833   } else {
834     qWarning() << "Queued Connections are not implemented yet";
835     // note to self: qmetaobject.cpp:990 ff
836     return false;
837   }
838
839 }
840
841 bool SignalProxy::invokeSlot(QObject *receiver, int methodId, const QVariantList &params) {
842   QVariant ret;
843   return invokeSlot(receiver, methodId, params, ret);
844 }
845
846 void SignalProxy::dataAvailable() {
847   // yet again. it's a private slot. no need for checks.
848   QIODevice* ioDev = qobject_cast<QIODevice* >(sender());
849   Q_ASSERT(_peers.contains(ioDev) && _peers[ioDev]->type() == AbstractPeer::IODevicePeer);
850   IODevicePeer *peer = static_cast<IODevicePeer *>(_peers[ioDev]);
851   QVariant var;
852   while(peer->readData(var))
853     receivePackedFunc(peer, var);
854 }
855
856 void SignalProxy::writeDataToDevice(QIODevice *dev, const QVariant &item, bool compressed) {
857   QAbstractSocket* sock  = qobject_cast<QAbstractSocket*>(dev);
858   if(!dev->isOpen() || (sock && sock->state()!=QAbstractSocket::ConnectedState)) {
859     qWarning("SignalProxy: Can't call write on a closed device");
860     return;
861   }
862
863   QByteArray block;
864   QDataStream out(&block, QIODevice::WriteOnly);
865   out.setVersion(QDataStream::Qt_4_2);
866   out << (quint32)0;
867
868   if(compressed) {
869     QByteArray rawItem;
870     QDataStream itemStream(&rawItem, QIODevice::WriteOnly);
871
872     itemStream.setVersion(QDataStream::Qt_4_2);
873     itemStream << item;
874
875     rawItem = qCompress(rawItem);
876
877     out << rawItem;
878   } else {
879     out << item;
880   }
881
882   out.device()->seek(0);
883   out << (quint32)(block.size() - sizeof(quint32));
884
885   dev->write(block);
886 }
887
888 bool SignalProxy::readDataFromDevice(QIODevice *dev, quint32 &blockSize, QVariant &item, bool compressed) {
889   QDataStream in(dev);
890   in.setVersion(QDataStream::Qt_4_2);
891
892   if(blockSize == 0) {
893     if(dev->bytesAvailable() < (int)sizeof(quint32)) return false;
894     in >> blockSize;
895   }
896
897   if(blockSize > 1 << 22) {
898     disconnectDevice(dev, tr("Client tried to send package larger than max package size!"));
899     return false;
900   }
901
902   if(blockSize == 0) {
903     disconnectDevice(dev, tr("Client tried to send 0 byte package!"));
904     return false;
905   }
906
907   if(dev->bytesAvailable() < blockSize)
908     return false;
909
910   blockSize = 0;
911
912   if(compressed) {
913     QByteArray rawItem;
914     in >> rawItem;
915
916     int nbytes = rawItem.size();
917     if(nbytes <= 4) {
918       const char *data = rawItem.constData();
919       if(nbytes < 4 || (data[0]!=0 || data[1]!=0 || data[2]!=0 || data[3]!=0)) {
920         disconnectDevice(dev, tr("Client sent corrupted compressed data!"));
921         return false;
922       }
923     }
924
925     rawItem = qUncompress(rawItem);
926
927     QDataStream itemStream(&rawItem, QIODevice::ReadOnly);
928     itemStream.setVersion(QDataStream::Qt_4_2);
929     itemStream >> item;
930   } else {
931     in >> item;
932   }
933
934   if(!item.isValid()) {
935     disconnectDevice(dev, tr("Client sent corrupt data: unable to load QVariant!"));
936     return false;
937   }
938
939   return true;
940 }
941
942 void SignalProxy::requestInit(SyncableObject *obj) {
943   if(proxyMode() == Server || obj->isInitialized())
944     return;
945
946   QVariantList params;
947   params << obj->syncMetaObject()->className()
948          << obj->objectName();
949   dispatchSignal(InitRequest, params);
950 }
951
952 QVariantMap SignalProxy::initData(SyncableObject *obj) const {
953   return obj->toVariantMap();
954 }
955
956 void SignalProxy::setInitData(SyncableObject *obj, const QVariantMap &properties) {
957   if(obj->isInitialized())
958     return;
959   obj->fromVariantMap(properties);
960   obj->setInitialized();
961   emit objectInitialized(obj);
962   invokeSlot(obj, extendedMetaObject(obj)->updatedRemotelyId());
963 }
964
965 void SignalProxy::sendHeartBeat() {
966   QVariantList heartBeatParams;
967   heartBeatParams << QTime::currentTime();
968   PeerHash::iterator peer = _peers.begin();
969   while(peer != _peers.end()) {
970     if((*peer)->type() == AbstractPeer::IODevicePeer) {
971       IODevicePeer *ioPeer = static_cast<IODevicePeer *>(*peer);
972       ioPeer->dispatchSignal(SignalProxy::HeartBeat, heartBeatParams);
973       if(ioPeer->sentHeartBeats > 0) {
974         updateLag(ioPeer, ioPeer->sentHeartBeats * _heartBeatTimer.interval());
975       }
976       if(ioPeer->sentHeartBeats > 1) {
977         qWarning() << "SignalProxy: Disconnecting peer:" << ioPeer->address()
978                    << "(didn't receive a heartbeat for over" << ioPeer->sentHeartBeats * _heartBeatTimer.interval() / 1000 << "seconds)";
979         ioPeer->close();
980       } else {
981         ioPeer->sentHeartBeats++;
982       }
983     }
984     peer++;
985   }
986 }
987
988 void SignalProxy::receiveHeartBeat(AbstractPeer *peer, const QVariantList &params) {
989   peer->dispatchSignal(SignalProxy::HeartBeatReply, params);
990 }
991
992 void SignalProxy::receiveHeartBeatReply(AbstractPeer *peer, const QVariantList &params) {
993   if(peer->type() != AbstractPeer::IODevicePeer) {
994     qWarning() << "SignalProxy::receiveHeartBeatReply: received heart beat from a non IODevicePeer!";
995     return;
996   }
997
998   IODevicePeer *ioPeer = static_cast<IODevicePeer *>(peer);
999   ioPeer->sentHeartBeats = 0;
1000
1001   if(params.isEmpty()) {
1002     qWarning() << "SignalProxy: received heart beat reply with less params then sent from:" << ioPeer->address();
1003     return;
1004   }
1005
1006   QTime sendTime = params[0].value<QTime>();
1007   updateLag(ioPeer, sendTime.msecsTo(QTime::currentTime()) / 2);
1008 }
1009
1010 void SignalProxy::customEvent(QEvent *event) {
1011   switch(event->type()) {
1012   case PeerSignal:
1013     {
1014       PeerSignalEvent *e = static_cast<PeerSignalEvent *>(event);
1015       receivePeerSignal(e->sender, e->requestType, e->params);
1016     }
1017     event->accept();
1018     break;
1019   case RemovePeer:
1020     {
1021       RemovePeerEvent *e = static_cast<RemovePeerEvent *>(event);
1022       removePeer(e->peer);
1023     }
1024     event->accept();
1025   default:
1026     return;
1027   }
1028 }
1029
1030 void SignalProxy::disconnectDevice(QIODevice *dev, const QString &reason) {
1031   if(!reason.isEmpty())
1032     qWarning() << qPrintable(reason);
1033   QAbstractSocket *sock  = qobject_cast<QAbstractSocket *>(dev);
1034   if(sock)
1035     qWarning() << qPrintable(tr("Disconnecting")) << qPrintable(sock->peerAddress().toString());
1036   dev->close();
1037 }
1038
1039 void SignalProxy::updateLag(IODevicePeer *peer, int lag) {
1040   peer->lag = lag;
1041   if(proxyMode() == Client) {
1042     emit lagUpdated(lag);
1043   }
1044 }
1045
1046 void SignalProxy::dumpProxyStats() {
1047   QString mode;
1048   if(proxyMode() == Server)
1049     mode = "Server";
1050   else
1051     mode = "Client";
1052
1053   int sigCount = 0;
1054   foreach(SignalRelay *relay, _relayHash.values())
1055     sigCount += relay->sigCount();
1056
1057   int slaveCount = 0;
1058   foreach(ObjectId oid, _syncSlave.values())
1059     slaveCount += oid.count();
1060
1061   qDebug() << this;
1062   qDebug() << "              Proxy Mode:" << mode;
1063   qDebug() << "attached sending Objects:" << _relayHash.count();
1064   qDebug() << "       number of Signals:" << sigCount;
1065   qDebug() << "          attached Slots:" << _attachedSlots.count();
1066   qDebug() << " number of synced Slaves:" << slaveCount;
1067   qDebug() << "number of Classes cached:" << _extendedMetaObjects.count();
1068 }
1069
1070 void SignalProxy::dumpSyncMap(SyncableObject *object) {
1071   const QMetaObject *meta = object->metaObject();
1072   ExtendedMetaObject *eMeta = extendedMetaObject(object);
1073   qDebug() << "SignalProxy: SyncMap for Class" << meta->className();
1074
1075   QHash<QByteArray, int> syncMap_ = eMeta->syncMap();
1076   QHash<QByteArray, int>::const_iterator iter = syncMap_.constBegin();
1077   while(iter != syncMap_.constEnd()) {
1078     qDebug() << qPrintable(QString("%1 --> %2 %3").arg(QString(iter.key()), 40).arg(iter.value()).arg(QString(meta->method(iter.value()).signature())));
1079     iter++;
1080   }
1081 }
1082
1083 void SignalProxy::updateSecureState() {
1084   bool wasSecure = _secure;
1085
1086   _secure = !_peers.isEmpty();
1087   PeerHash::const_iterator peerIter;
1088   for(peerIter = _peers.constBegin(); peerIter != _peers.constEnd(); peerIter++) {
1089     _secure &= (*peerIter)->isSecure();
1090   }
1091
1092   if(wasSecure != _secure)
1093     emit secureStateChanged(_secure);
1094 }
1095
1096
1097 // ==================================================
1098 //  ExtendedMetaObject
1099 // ==================================================
1100 SignalProxy::ExtendedMetaObject::ExtendedMetaObject(const QMetaObject *meta)
1101   : _meta(meta),
1102     _updatedRemotelyId(-1)
1103 {
1104 }
1105
1106 const QList<int> &SignalProxy::ExtendedMetaObject::argTypes(int methodId) {
1107   if(!_argTypes.contains(methodId)) {
1108     QList<QByteArray> paramTypes = _meta->method(methodId).parameterTypes();
1109     QList<int> argTypes;
1110     for(int i = 0; i < paramTypes.count(); i++) {
1111       argTypes.append(QMetaType::type(paramTypes[i]));
1112     }
1113     _argTypes[methodId] = argTypes;
1114   }
1115   return _argTypes[methodId];
1116 }
1117
1118 const int &SignalProxy::ExtendedMetaObject::returnType(int methodId) {
1119   if(!_returnType.contains(methodId)) {
1120     _returnType[methodId] = QMetaType::type(_meta->method(methodId).typeName());
1121   }
1122   return _returnType[methodId];
1123 }
1124
1125 const int &SignalProxy::ExtendedMetaObject::minArgCount(int methodId) {
1126   if(!_minArgCount.contains(methodId)) {
1127     QString signature(_meta->method(methodId).signature());
1128     _minArgCount[methodId] = _meta->method(methodId).parameterTypes().count() - signature.count("=");
1129   }
1130   return _minArgCount[methodId];
1131 }
1132
1133 const QByteArray &SignalProxy::ExtendedMetaObject::methodName(int methodId) {
1134   if(!_methodNames.contains(methodId)) {
1135     _methodNames[methodId] = methodName(_meta->method(methodId));
1136   }
1137   return _methodNames[methodId];
1138 }
1139
1140 const QHash<QByteArray, int> &SignalProxy::ExtendedMetaObject::syncMap() {
1141   if(_syncMap.isEmpty()) {
1142     QHash<QByteArray, int> syncMap;
1143
1144     QList<int> slotIndexes;
1145     for(int i = 0; i < _meta->methodCount(); i++) {
1146       if(_meta->method(i).methodType() == QMetaMethod::Slot)
1147         slotIndexes << i;
1148     }
1149
1150     // we're faking sync pairs for sync replies
1151     // --> we deliver to every slot starting with "receive"
1152     QByteArray slotSignature;
1153     QList<int>::iterator slotIter = slotIndexes.begin();
1154     while(slotIter != slotIndexes.end()) {
1155       slotSignature = QByteArray(_meta->method(*slotIter).signature());
1156       if(slotSignature.startsWith("receive")) {
1157         syncMap[slotSignature] = *slotIter;
1158         slotIndexes.erase(slotIter);
1159       } else {
1160         slotIter++;
1161       }
1162     }
1163
1164     // find the proper sig -> slot matches
1165     QMetaMethod signal, slot;
1166     int matchIdx;
1167     for(int signalIdx = 0; signalIdx < _meta->methodCount(); signalIdx++) {
1168       signal = _meta->method(signalIdx);
1169       if(signal.methodType() != QMetaMethod::Signal)
1170         continue;
1171
1172       matchIdx = -1;
1173       foreach(int slotIdx, slotIndexes) {
1174         slot = _meta->method(slotIdx);
1175         if(methodsMatch(signal, slot)) {
1176           matchIdx = slotIdx;
1177           break;
1178         }
1179       }
1180       if(matchIdx != -1) {
1181         slotIndexes.removeAll(matchIdx);
1182         syncMap[QByteArray(signal.signature())] = matchIdx;
1183       }
1184     }
1185     _syncMap = syncMap;
1186   }
1187   return _syncMap;
1188 }
1189
1190 const QHash<int, int> &SignalProxy::ExtendedMetaObject::receiveMap() {
1191   if(_receiveMap.isEmpty()) {
1192     QHash<int, int> receiveMap;
1193
1194     QMetaMethod requestSlot;
1195     QByteArray returnTypeName;
1196     QByteArray signature;
1197     QByteArray methodName;
1198     QByteArray params;
1199     int paramsPos;
1200     int receiverId;
1201     const int methodCount = _meta->methodCount();
1202     for(int i = 0; i < methodCount; i++) {
1203       requestSlot = _meta->method(i);
1204       if(requestSlot.methodType() != QMetaMethod::Slot)
1205         continue;
1206
1207       returnTypeName = requestSlot.typeName();
1208       if(QMetaType::Void == (QMetaType::Type)returnType(i))
1209         continue;
1210
1211       signature = QByteArray(requestSlot.signature());
1212       if(!signature.startsWith("request"))
1213         continue;
1214
1215       paramsPos = signature.indexOf('(');
1216       if(paramsPos == -1)
1217         continue;
1218
1219       methodName = signature.left(paramsPos);
1220       params = signature.mid(paramsPos);
1221
1222       methodName = methodName.replace("request", "receive");
1223       params = params.left(params.count() - 1) + ", " + returnTypeName + ")";
1224
1225       signature = QMetaObject::normalizedSignature(methodName + params);
1226       receiverId = _meta->indexOfSlot(signature);
1227
1228       if(receiverId == -1) {
1229         signature = QMetaObject::normalizedSignature(methodName + "(" + returnTypeName + ")");
1230         receiverId = _meta->indexOfSlot(signature);
1231       }
1232
1233       if(receiverId != -1)
1234         receiveMap[i] = receiverId;
1235     }
1236     _receiveMap = receiveMap;
1237   }
1238   return _receiveMap;
1239 }
1240
1241 int SignalProxy::ExtendedMetaObject::updatedRemotelyId() {
1242   if(_updatedRemotelyId == -1) {
1243     _updatedRemotelyId = _meta->indexOfSignal("updatedRemotely()");
1244   }
1245   return _updatedRemotelyId;
1246 }
1247
1248 QByteArray SignalProxy::ExtendedMetaObject::methodName(const QMetaMethod &method) {
1249   QByteArray sig(method.signature());
1250   return sig.left(sig.indexOf("("));
1251 }
1252
1253 bool SignalProxy::ExtendedMetaObject::methodsMatch(const QMetaMethod &signal, const QMetaMethod &slot) {
1254   // if we don't even have the same basename it's a sure NO
1255   QString baseName = methodBaseName(signal);
1256   if(baseName != methodBaseName(slot))
1257     return false;
1258
1259   // are the signatures compatible?
1260   if(!QObject::staticMetaObject.checkConnectArgs(signal.signature(), slot.signature()))
1261     return false;
1262
1263   // we take an educated guess if the signals and slots match
1264   QString signalsuffix = methodName(signal);
1265   QString slotprefix = methodName(slot);
1266   if(!baseName.isEmpty()) {
1267     signalsuffix = signalsuffix.mid(baseName.count()).toLower();
1268     slotprefix = slotprefix.left(slotprefix.count() - baseName.count()).toLower();
1269   }
1270
1271   uint sizediff = qAbs(slotprefix.size() - signalsuffix.size());
1272   int ratio = editingDistance(slotprefix, signalsuffix) - sizediff;
1273   return (ratio < 2);
1274 }
1275
1276 QString SignalProxy::ExtendedMetaObject::methodBaseName(const QMetaMethod &method) {
1277   QString methodname = QString(method.signature()).section("(", 0, 0);
1278
1279   // determine where we have to chop:
1280   int upperCharPos;
1281   if(method.methodType() == QMetaMethod::Slot) {
1282     // we take evertyhing from the first uppercase char if it's slot
1283     upperCharPos = methodname.indexOf(QRegExp("[A-Z]"));
1284     if(upperCharPos == -1)
1285       return QString();
1286     methodname = methodname.mid(upperCharPos);
1287   } else {
1288     // and if it's a signal we discard everything from the last uppercase char
1289     upperCharPos = methodname.lastIndexOf(QRegExp("[A-Z]"));
1290     if(upperCharPos == -1)
1291       return QString();
1292     methodname = methodname.left(upperCharPos);
1293   }
1294
1295   methodname[0] = methodname[0].toUpper();
1296
1297   return methodname;
1298 }
1299