X-Git-Url: https://git.quassel-irc.org/?p=quassel.git;a=blobdiff_plain;f=src%2Fcommon%2Fsignalproxy.cpp;h=7c43592c837e6b7daf01b5808eefce71abd2619d;hp=d9d2b39da4e0013bbc52ddc7b857a57977475efa;hb=1adc00219ba072da57994764d086beed8ffb7bb4;hpb=0679f0aa992e4d0d0aab2ebe08e17544150fbc55 diff --git a/src/common/signalproxy.cpp b/src/common/signalproxy.cpp index d9d2b39d..7c43592c 100644 --- a/src/common/signalproxy.cpp +++ b/src/common/signalproxy.cpp @@ -32,6 +32,9 @@ #include #include #include +#ifdef HAVE_SSL +#include +#endif #include #include #include @@ -45,12 +48,18 @@ // ================================================== class PeerSignalEvent : public QEvent { public: - PeerSignalEvent(SignalProxy *sender, SignalProxy::RequestType requestType, const QVariantList ¶ms) : QEvent(QEvent::User), sender(sender), requestType(requestType), params(params) {} + PeerSignalEvent(SignalProxy *sender, SignalProxy::RequestType requestType, const QVariantList ¶ms) : QEvent(QEvent::Type(SignalProxy::PeerSignal)), sender(sender), requestType(requestType), params(params) {} SignalProxy *sender; SignalProxy::RequestType requestType; QVariantList params; }; +class RemovePeerEvent : public QEvent { +public: + RemovePeerEvent(QObject *peer) : QEvent(QEvent::Type(SignalProxy::RemovePeer)), peer(peer) {} + QObject *peer; +}; + // ================================================== // SIGNALRELAY // ================================================== @@ -67,12 +76,12 @@ public: void setSynchronize(bool); bool synchronize() const; - + int sigCount() const; - + private: bool isSyncMethod(int i); - + SignalProxy* proxy; QObject* caller; QMultiHash sigNames; @@ -110,7 +119,7 @@ int SignalRelay::qt_metacall(QMetaObject::Call _c, int _id, void **_a) { proxy->dispatchSignal(SignalProxy::RpcCall, QVariantList() << funcIter.value() << params); funcIter++; } - + // dispatch Sync Signal if necessary QByteArray signature(caller->metaObject()->method(_id).signature()); SyncableObject *syncObject = qobject_cast(caller); @@ -162,7 +171,7 @@ bool SignalRelay::isSyncMethod(int i) { QByteArray signature = syncObject->syncMetaObject()->method(i).signature(); if(!proxy->syncMap(syncObject).contains(signature)) return false; - + if(proxy->proxyMode() == SignalProxy::Server && !signature.contains("Requested")) return true; @@ -206,6 +215,28 @@ void SignalProxy::IODevicePeer::dispatchSignal(const RequestType &requestType, c dispatchPackedFunc(QVariant(packedFunc)); } +bool SignalProxy::IODevicePeer::isSecure() const { +#ifdef HAVE_SSL + QSslSocket *sslSocket = qobject_cast(_device); + if(sslSocket) + return sslSocket->isEncrypted() || sslSocket->localAddress() == QHostAddress::LocalHost || sslSocket->localAddress() == QHostAddress::LocalHostIPv6; +#endif + + QAbstractSocket *socket = qobject_cast(_device); + if(socket) + return socket->localAddress() == QHostAddress::LocalHost || socket->localAddress() == QHostAddress::LocalHostIPv6; + + return false; +} + +QString SignalProxy::IODevicePeer::address() const { + QAbstractSocket *socket = qobject_cast(_device); + if(socket) + return socket->peerAddress().toString(); + else + return QString(); +} + void SignalProxy::SignalProxyPeer::dispatchSignal(const RequestType &requestType, const QVariantList ¶ms) { Qt::ConnectionType type = QThread::currentThread() == receiver->thread() ? Qt::DirectConnection @@ -241,7 +272,7 @@ SignalProxy::SignalProxy(ProxyMode mode, QIODevice* device, QObject* parent) setProxyMode(mode); addPeer(device); init(); -} +} SignalProxy::~SignalProxy() { QList senders = _relayHash.keys(); @@ -277,6 +308,8 @@ void SignalProxy::setProxyMode(ProxyMode mode) { void SignalProxy::init() { connect(&_heartBeatTimer, SIGNAL(timeout()), this, SLOT(sendHeartBeat())); _heartBeatTimer.start(30 * 1000); + _secure = false; + updateSecureState(); } void SignalProxy::initServer() { @@ -289,7 +322,7 @@ void SignalProxy::initClient() { bool SignalProxy::addPeer(QIODevice* iodev) { if(!iodev) return false; - + if(_peers.contains(iodev)) return true; @@ -298,22 +331,30 @@ bool SignalProxy::addPeer(QIODevice* iodev) { return false; } - if(!iodev->isOpen()) - qWarning("SignalProxy::the device you passed is not open!"); + if(!iodev->isOpen()) { + qWarning("SignalProxy::addPeer(QIODevice *iodev): iodev needs to be open!"); + return false; + } connect(iodev, SIGNAL(disconnected()), this, SLOT(removePeerBySender())); connect(iodev, SIGNAL(readyRead()), this, SLOT(dataAvailable())); - QAbstractSocket* sock = qobject_cast(iodev); - if(sock) { - connect(sock, SIGNAL(disconnected()), this, SLOT(removePeerBySender())); +#ifdef HAVE_SSL + QSslSocket *sslSocket = qobject_cast(iodev); + if(sslSocket) { + connect(iodev, SIGNAL(encrypted()), this, SLOT(updateSecureState())); } +#endif + + if(!iodev->parent()) + iodev->setParent(this); _peers[iodev] = new IODevicePeer(iodev, iodev->property("UseCompression").toBool()); if(_peers.count() == 1) emit connected(); + updateSecureState(); return true; } @@ -336,12 +377,12 @@ bool SignalProxy::addPeer(SignalProxy* proxy) { } _peers[proxy] = new SignalProxyPeer(this, proxy); - proxy->addPeer(this); if(_peers.count() == 1) emit connected(); + updateSecureState(); return true; } @@ -373,8 +414,19 @@ void SignalProxy::removePeer(QObject* dev) { if(peer->type() == AbstractPeer::IODevicePeer) emit peerRemoved(static_cast(dev)); + if(peer->type() == AbstractPeer::SignalProxyPeer) { + SignalProxy *proxy = static_cast(dev); + if(proxy->_peers.contains(this)) + proxy->removePeer(this); + } + + if(dev->parent() == this) + dev->deleteLater(); + delete peer; + updateSecureState(); + if(_peers.isEmpty()) emit disconnected(); } @@ -385,13 +437,13 @@ void SignalProxy::removePeerBySender() { void SignalProxy::objectRenamed(const QString &newname, const QString &oldname) { SyncableObject *syncObject = qobject_cast(sender()); - const QMetaObject *meta = syncObject->metaObject(); + const QMetaObject *meta = syncObject->syncMetaObject(); const QByteArray className(meta->className()); objectRenamed(className, newname, oldname); if(proxyMode() == Client) return; - + QVariantList params; params << "__objectRenamed__" << className << newname << oldname; dispatchSignal(RpcCall, params); @@ -475,7 +527,7 @@ const QByteArray &SignalProxy::methodName(QObject *obj, int methodId) { void SignalProxy::setSyncMap(SyncableObject *obj) { const QMetaObject *meta = obj->syncMetaObject(); QHash syncMap; - + QList slotIndexes; for(int i = 0; i < meta->methodCount(); i++) { if(meta->method(i).methodType() == QMetaMethod::Slot) @@ -550,7 +602,7 @@ void SignalProxy::setReceiveMap(SyncableObject *obj) { signature = QByteArray(requestSlot.signature()); if(!signature.startsWith("request")) continue; - + paramsPos = signature.indexOf('('); if(paramsPos == -1) continue; @@ -572,7 +624,7 @@ void SignalProxy::setReceiveMap(SyncableObject *obj) { if(receiverId != -1) receiveMap[i] = receiverId; } - _classInfo[meta]->receiveMap = receiveMap; + _classInfo[meta]->receiveMap = receiveMap; } const QHash &SignalProxy::receiveMap(SyncableObject *obj) { @@ -654,7 +706,7 @@ bool SignalProxy::attachSlot(const QByteArray& sigName, QObject* recv, const cha void SignalProxy::synchronize(SyncableObject *obj) { createClassInfo(obj); setUpdatedRemotelyId(obj); - + // attaching all the Signals SignalRelay* relay; if(_relayHash.contains(obj)) @@ -695,9 +747,9 @@ void SignalProxy::detachSender() { } void SignalProxy::detachObject(QObject* obj) { + stopSync(static_cast(obj)); detachSignals(obj); detachSlots(obj); - stopSync(static_cast(obj)); } void SignalProxy::detachSignals(QObject* sender) { @@ -740,7 +792,10 @@ void SignalProxy::dispatchSignal(const RequestType &requestType, const QVariantL case AbstractPeer::IODevicePeer: { IODevicePeer *ioPeer = static_cast(*peer); - ioPeer->dispatchPackedFunc(packedFunc); + if(ioPeer->isOpen()) + ioPeer->dispatchPackedFunc(packedFunc); + else + QCoreApplication::postEvent(this, new RemovePeerEvent(peer.key())); } break; case AbstractPeer::SignalProxyPeer: @@ -764,7 +819,7 @@ void SignalProxy::receivePackedFunc(AbstractPeer *sender, const QVariant &packed RequestType requestType = (RequestType)params.takeFirst().value(); receivePeerSignal(sender, requestType, params); } - + void SignalProxy::receivePeerSignal(AbstractPeer *sender, const RequestType &requestType, const QVariantList ¶ms) { switch(requestType) { case RpcCall: @@ -778,15 +833,15 @@ void SignalProxy::receivePeerSignal(AbstractPeer *sender, const RequestType &req case Sync: handleSync(sender, params); break; - + case InitRequest: handleInitRequest(sender, params); break; - + case InitData: handleInitData(sender, params); break; - + case HeartBeat: receiveHeartBeat(sender, params); break; @@ -814,7 +869,7 @@ void SignalProxy::handleSync(AbstractPeer *sender, QVariantList params) { qWarning() << "received invalid Sync call" << params; return; } - + QByteArray className = params.takeFirst().toByteArray(); QString objectName = params.takeFirst().toString(); QByteArray signal = params.takeFirst().toByteArray(); @@ -851,7 +906,7 @@ void SignalProxy::handleSync(AbstractPeer *sender, QVariantList params) { returnParams << returnValue; sender->dispatchSignal(Sync, returnParams); } - + // send emit update signal invokeSlot(receiver, updatedRemotelyId(receiver)); } @@ -862,10 +917,10 @@ void SignalProxy::handleInitRequest(AbstractPeer *sender, const QVariantList &pa << params; return; } - + QByteArray className(params[0].toByteArray()); QString objectName(params[1].toString()); - + if(!_syncSlave.contains(className)) { qWarning() << "SignalProxy::handleInitRequest() received initRequest for unregistered Class:" << className; @@ -877,7 +932,7 @@ void SignalProxy::handleInitRequest(AbstractPeer *sender, const QVariantList &pa << className << objectName; return; } - + SyncableObject *obj = _syncSlave[className][objectName]; QVariantList params_; @@ -895,7 +950,7 @@ void SignalProxy::handleInitData(AbstractPeer *sender, const QVariantList ¶m << params; return; } - + QByteArray className(params[0].toByteArray()); QString objectName(params[1].toString()); QVariantMap propertyMap(params[2].toMap()); @@ -944,7 +999,7 @@ bool SignalProxy::invokeSlot(QObject *receiver, int methodId, const QVariantList return false; } - void *_a[] = {0, // return type... + void *_a[] = {0, // return type... 0, 0, 0, 0 , 0, // and 10 args - that's the max size qt can handle with signals and slots 0, 0, 0, 0 , 0}; @@ -964,7 +1019,7 @@ bool SignalProxy::invokeSlot(QObject *receiver, int methodId, const QVariantList if(returnValue.type() != QVariant::Invalid) _a[0] = const_cast(returnValue.constData()); - + Qt::ConnectionType type = QThread::currentThread() == receiver->thread() ? Qt::DirectConnection : Qt::QueuedConnection; @@ -976,7 +1031,7 @@ bool SignalProxy::invokeSlot(QObject *receiver, int methodId, const QVariantList // note to self: qmetaobject.cpp:990 ff return false; } - + } bool SignalProxy::invokeSlot(QObject *receiver, int methodId, const QVariantList ¶ms) { @@ -1005,7 +1060,7 @@ void SignalProxy::writeDataToDevice(QIODevice *dev, const QVariant &item, bool c QDataStream out(&block, QIODevice::WriteOnly); out.setVersion(QDataStream::Qt_4_2); out << (quint32)0; - + if(compressed) { QByteArray rawItem; QDataStream itemStream(&rawItem, QIODevice::WriteOnly); @@ -1036,30 +1091,35 @@ bool SignalProxy::readDataFromDevice(QIODevice *dev, quint32 &blockSize, QVarian } if(blockSize > 1 << 22) { - qWarning() << qPrintable(tr("Client tried to send package larger than max package size!")); - QAbstractSocket *sock = qobject_cast(dev); - qWarning() << qPrintable(tr("Disconnecting")) << (sock ? qPrintable(sock->peerAddress().toString()) : qPrintable(tr("local client"))); - dev->close(); + disconnectDevice(dev, tr("Client tried to send package larger than max package size!")); return false; } - + + if(blockSize == 0) { + disconnectDevice(dev, tr("Client tried to send 0 byte package!")); + return false; + } + if(dev->bytesAvailable() < blockSize) return false; + blockSize = 0; + if(compressed) { QByteArray rawItem; in >> rawItem; - // debug check + int nbytes = rawItem.size(); - if (nbytes <= 4) { + if(nbytes <= 4) { const char *data = rawItem.constData(); - if (nbytes < 4 || (data[0]!=0 || data[1]!=0 || data[2]!=0 || data[3]!=0)) - qWarning() << "receieved corrupted compressed data:" - << blockSize << rawItem << rawItem.size() << dev; + if(nbytes < 4 || (data[0]!=0 || data[1]!=0 || data[2]!=0 || data[3]!=0)) { + disconnectDevice(dev, tr("Client sent corrupted compressed data!")); + return false; + } } - // end + rawItem = qUncompress(rawItem); - + QDataStream itemStream(&rawItem, QIODevice::ReadOnly); itemStream.setVersion(QDataStream::Qt_4_2); itemStream >> item; @@ -1067,14 +1127,18 @@ bool SignalProxy::readDataFromDevice(QIODevice *dev, quint32 &blockSize, QVarian in >> item; } - blockSize = 0; + if(!item.isValid()) { + disconnectDevice(dev, tr("Client sent corrupt data: unable to load QVariant!")); + return false; + } return true; } bool SignalProxy::methodsMatch(const QMetaMethod &signal, const QMetaMethod &slot) const { // if we don't even have the same basename it's a sure NO - if(methodBaseName(signal) != methodBaseName(slot)) + QString baseName = methodBaseName(signal); + if(baseName != methodBaseName(slot)) return false; // are the signatures compatible? @@ -1082,11 +1146,20 @@ bool SignalProxy::methodsMatch(const QMetaMethod &signal, const QMetaMethod &slo return false; // we take an educated guess if the signals and slots match - QString signalsuffix = ::methodName(signal).mid(QString(::methodName(signal)).lastIndexOf(QRegExp("[A-Z]"))).toLower(); - QString slotprefix = ::methodName(slot).left(QString(::methodName(slot)).indexOf(QRegExp("[A-Z]"))).toLower(); + QString signalsuffix = ::methodName(signal); + QString slotprefix = ::methodName(slot); + if(!baseName.isEmpty()) { + signalsuffix = signalsuffix.mid(baseName.count()).toLower(); + slotprefix = slotprefix.left(slotprefix.count() - baseName.count()).toLower(); + } uint sizediff = qAbs(slotprefix.size() - signalsuffix.size()); int ratio = editingDistance(slotprefix, signalsuffix) - sizediff; +// if(ratio < 2) { +// qDebug() << Q_FUNC_INFO; +// qDebug() << methodBaseName(signal) << methodBaseName(slot); +// qDebug() << signalsuffix << slotprefix << sizediff << ratio; +// } return (ratio < 2); } @@ -1139,12 +1212,9 @@ void SignalProxy::sendHeartBeat() { updateLag(ioPeer, ioPeer->sentHeartBeats * _heartBeatTimer.interval()); } if(ioPeer->sentHeartBeats > 1) { - //FIXME: proper disconnect. -// QAbstractSocket *socket = qobject_cast(peerIter.key()); -// qWarning() << "SignalProxy: Disconnecting peer:" -// << (socket ? qPrintable(socket->peerAddress().toString()) : "local client") -// << "(didn't receive a heartbeat for over" << peerIter->sentHeartBeats * _heartBeatTimer.interval() / 1000 << "seconds)"; -// peerIter.key()->close(); + qWarning() << "SignalProxy: Disconnecting peer:" << ioPeer->address() + << "(didn't receive a heartbeat for over" << ioPeer->sentHeartBeats * _heartBeatTimer.interval() / 1000 << "seconds)"; + ioPeer->close(); } else { ioPeer->sentHeartBeats++; } @@ -1167,28 +1237,43 @@ void SignalProxy::receiveHeartBeatReply(AbstractPeer *peer, const QVariantList & ioPeer->sentHeartBeats = 0; if(params.isEmpty()) { - qWarning() << "SignalProxy: received heart beat reply with less params then sent from:" << ioPeer->device(); + qWarning() << "SignalProxy: received heart beat reply with less params then sent from:" << ioPeer->address(); return; } - + QTime sendTime = params[0].value(); updateLag(ioPeer, sendTime.msecsTo(QTime::currentTime()) / 2); } void SignalProxy::customEvent(QEvent *event) { switch(event->type()) { - case QEvent::User: + case PeerSignal: { - PeerSignalEvent *sig = static_cast(event); - receivePeerSignal(sig->sender, sig->requestType, sig->params); + PeerSignalEvent *e = static_cast(event); + receivePeerSignal(e->sender, e->requestType, e->params); } event->accept(); break; + case RemovePeer: + { + RemovePeerEvent *e = static_cast(event); + removePeer(e->peer); + } + event->accept(); default: return; } } +void SignalProxy::disconnectDevice(QIODevice *dev, const QString &reason) { + if(!reason.isEmpty()) + qWarning() << qPrintable(reason); + QAbstractSocket *sock = qobject_cast(dev); + if(sock) + qWarning() << qPrintable(tr("Disconnecting")) << qPrintable(sock->peerAddress().toString()); + dev->close(); +} + void SignalProxy::updateLag(IODevicePeer *peer, int lag) { peer->lag = lag; if(proxyMode() == Client) { @@ -1211,7 +1296,7 @@ void SignalProxy::dumpProxyStats() { int slaveCount = 0; foreach(ObjectId oid, _syncSlave.values()) slaveCount += oid.count(); - + qDebug() << this; qDebug() << " Proxy Mode:" << mode; qDebug() << "attached sending Objects:" << _relayHash.count(); @@ -1228,13 +1313,20 @@ void SignalProxy::dumpSyncMap(SyncableObject *object) { QHash syncMap_ = syncMap(object); QHash::const_iterator iter = syncMap_.constBegin(); while(iter != syncMap_.constEnd()) { - qDebug() << iter.key() << "-->" << iter.value() << meta->method(iter.value()).signature(); + qDebug() << qPrintable(QString("%1 --> %2 %3").arg(QString(iter.key()), 40).arg(iter.value()).arg(QString(meta->method(iter.value()).signature()))); iter++; } -// QHash syncMap_ = syncMap(object); -// QHash::const_iterator iter = syncMap_.constBegin(); -// while(iter != syncMap_.constEnd()) { -// qDebug() << iter.key() << meta->method(iter.key()).signature() << "-->" << iter.value() << meta->method(iter.value()).signature(); -// iter++; -// } +} + +void SignalProxy::updateSecureState() { + bool wasSecure = _secure; + + _secure = !_peers.isEmpty(); + PeerHash::const_iterator peerIter; + for(peerIter = _peers.constBegin(); peerIter != _peers.constEnd(); peerIter++) { + _secure &= (*peerIter)->isSecure(); + } + + if(wasSecure != _secure) + emit secureStateChanged(_secure); }