projects
/
quassel.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
SignanlProxy does no longer try to write to closed devices. Also closed devices can...
[quassel.git]
/
src
/
common
/
signalproxy.cpp
diff --git
a/src/common/signalproxy.cpp
b/src/common/signalproxy.cpp
index
94aa063
..
7808ad1
100644
(file)
--- a/
src/common/signalproxy.cpp
+++ b/
src/common/signalproxy.cpp
@@
-45,12
+45,18
@@
// ==================================================
class PeerSignalEvent : public QEvent {
public:
// ==================================================
class PeerSignalEvent : public QEvent {
public:
- PeerSignalEvent(SignalProxy *sender, SignalProxy::RequestType requestType, const QVariantList ¶ms) : QEvent(QEvent::
User
), sender(sender), requestType(requestType), params(params) {}
+ PeerSignalEvent(SignalProxy *sender, SignalProxy::RequestType requestType, const QVariantList ¶ms) : QEvent(QEvent::
Type(SignalProxy::PeerSignal)
), sender(sender), requestType(requestType), params(params) {}
SignalProxy *sender;
SignalProxy::RequestType requestType;
QVariantList params;
};
SignalProxy *sender;
SignalProxy::RequestType requestType;
QVariantList params;
};
+class RemovePeerEvent : public QEvent {
+public:
+ RemovePeerEvent(QObject *peer) : QEvent(QEvent::Type(SignalProxy::RemovePeer)), peer(peer) {}
+ QObject *peer;
+};
+
// ==================================================
// SIGNALRELAY
// ==================================================
// ==================================================
// SIGNALRELAY
// ==================================================
@@
-67,12
+73,12
@@
public:
void setSynchronize(bool);
bool synchronize() const;
void setSynchronize(bool);
bool synchronize() const;
-
+
int sigCount() const;
int sigCount() const;
-
+
private:
bool isSyncMethod(int i);
private:
bool isSyncMethod(int i);
-
+
SignalProxy* proxy;
QObject* caller;
QMultiHash<int, QByteArray> sigNames;
SignalProxy* proxy;
QObject* caller;
QMultiHash<int, QByteArray> sigNames;
@@
-110,7
+116,7
@@
int SignalRelay::qt_metacall(QMetaObject::Call _c, int _id, void **_a) {
proxy->dispatchSignal(SignalProxy::RpcCall, QVariantList() << funcIter.value() << params);
funcIter++;
}
proxy->dispatchSignal(SignalProxy::RpcCall, QVariantList() << funcIter.value() << params);
funcIter++;
}
-
+
// dispatch Sync Signal if necessary
QByteArray signature(caller->metaObject()->method(_id).signature());
SyncableObject *syncObject = qobject_cast<SyncableObject *>(caller);
// dispatch Sync Signal if necessary
QByteArray signature(caller->metaObject()->method(_id).signature());
SyncableObject *syncObject = qobject_cast<SyncableObject *>(caller);
@@
-162,7
+168,7
@@
bool SignalRelay::isSyncMethod(int i) {
QByteArray signature = syncObject->syncMetaObject()->method(i).signature();
if(!proxy->syncMap(syncObject).contains(signature))
return false;
QByteArray signature = syncObject->syncMetaObject()->method(i).signature();
if(!proxy->syncMap(syncObject).contains(signature))
return false;
-
+
if(proxy->proxyMode() == SignalProxy::Server && !signature.contains("Requested"))
return true;
if(proxy->proxyMode() == SignalProxy::Server && !signature.contains("Requested"))
return true;
@@
-249,7
+255,7
@@
SignalProxy::SignalProxy(ProxyMode mode, QIODevice* device, QObject* parent)
setProxyMode(mode);
addPeer(device);
init();
setProxyMode(mode);
addPeer(device);
init();
-}
+}
SignalProxy::~SignalProxy() {
QList<QObject*> senders = _relayHash.keys();
SignalProxy::~SignalProxy() {
QList<QObject*> senders = _relayHash.keys();
@@
-297,7
+303,7
@@
void SignalProxy::initClient() {
bool SignalProxy::addPeer(QIODevice* iodev) {
if(!iodev)
return false;
bool SignalProxy::addPeer(QIODevice* iodev) {
if(!iodev)
return false;
-
+
if(_peers.contains(iodev))
return true;
if(_peers.contains(iodev))
return true;
@@
-306,16
+312,16
@@
bool SignalProxy::addPeer(QIODevice* iodev) {
return false;
}
return false;
}
- if(!iodev->isOpen())
- qWarning("SignalProxy::the device you passed is not open!");
+ if(!iodev->isOpen()) {
+ qWarning("SignalProxy::addPeer(QIODevice *iodev): iodev needs to be open!");
+ return false;
+ }
connect(iodev, SIGNAL(disconnected()), this, SLOT(removePeerBySender()));
connect(iodev, SIGNAL(readyRead()), this, SLOT(dataAvailable()));
connect(iodev, SIGNAL(disconnected()), this, SLOT(removePeerBySender()));
connect(iodev, SIGNAL(readyRead()), this, SLOT(dataAvailable()));
- QAbstractSocket* sock = qobject_cast<QAbstractSocket*>(iodev);
- if(sock) {
- connect(sock, SIGNAL(disconnected()), this, SLOT(removePeerBySender()));
- }
+ if(!iodev->parent())
+ iodev->setParent(this);
_peers[iodev] = new IODevicePeer(iodev, iodev->property("UseCompression").toBool());
_peers[iodev] = new IODevicePeer(iodev, iodev->property("UseCompression").toBool());
@@
-344,7
+350,6
@@
bool SignalProxy::addPeer(SignalProxy* proxy) {
}
_peers[proxy] = new SignalProxyPeer(this, proxy);
}
_peers[proxy] = new SignalProxyPeer(this, proxy);
-
proxy->addPeer(this);
if(_peers.count() == 1)
proxy->addPeer(this);
if(_peers.count() == 1)
@@
-381,6
+386,15
@@
void SignalProxy::removePeer(QObject* dev) {
if(peer->type() == AbstractPeer::IODevicePeer)
emit peerRemoved(static_cast<QIODevice *>(dev));
if(peer->type() == AbstractPeer::IODevicePeer)
emit peerRemoved(static_cast<QIODevice *>(dev));
+ if(peer->type() == AbstractPeer::SignalProxyPeer) {
+ SignalProxy *proxy = static_cast<SignalProxy *>(dev);
+ if(proxy->_peers.contains(this))
+ proxy->removePeer(this);
+ }
+
+ if(dev->parent() == this)
+ dev->deleteLater();
+
delete peer;
if(_peers.isEmpty())
delete peer;
if(_peers.isEmpty())
@@
-399,7
+413,7
@@
void SignalProxy::objectRenamed(const QString &newname, const QString &oldname)
if(proxyMode() == Client)
return;
if(proxyMode() == Client)
return;
-
+
QVariantList params;
params << "__objectRenamed__" << className << newname << oldname;
dispatchSignal(RpcCall, params);
QVariantList params;
params << "__objectRenamed__" << className << newname << oldname;
dispatchSignal(RpcCall, params);
@@
-483,7
+497,7
@@
const QByteArray &SignalProxy::methodName(QObject *obj, int methodId) {
void SignalProxy::setSyncMap(SyncableObject *obj) {
const QMetaObject *meta = obj->syncMetaObject();
QHash<QByteArray, int> syncMap;
void SignalProxy::setSyncMap(SyncableObject *obj) {
const QMetaObject *meta = obj->syncMetaObject();
QHash<QByteArray, int> syncMap;
-
+
QList<int> slotIndexes;
for(int i = 0; i < meta->methodCount(); i++) {
if(meta->method(i).methodType() == QMetaMethod::Slot)
QList<int> slotIndexes;
for(int i = 0; i < meta->methodCount(); i++) {
if(meta->method(i).methodType() == QMetaMethod::Slot)
@@
-558,7
+572,7
@@
void SignalProxy::setReceiveMap(SyncableObject *obj) {
signature = QByteArray(requestSlot.signature());
if(!signature.startsWith("request"))
continue;
signature = QByteArray(requestSlot.signature());
if(!signature.startsWith("request"))
continue;
-
+
paramsPos = signature.indexOf('(');
if(paramsPos == -1)
continue;
paramsPos = signature.indexOf('(');
if(paramsPos == -1)
continue;
@@
-580,7
+594,7
@@
void SignalProxy::setReceiveMap(SyncableObject *obj) {
if(receiverId != -1)
receiveMap[i] = receiverId;
}
if(receiverId != -1)
receiveMap[i] = receiverId;
}
- _classInfo[meta]->receiveMap = receiveMap;
+ _classInfo[meta]->receiveMap = receiveMap;
}
const QHash<int, int> &SignalProxy::receiveMap(SyncableObject *obj) {
}
const QHash<int, int> &SignalProxy::receiveMap(SyncableObject *obj) {
@@
-662,7
+676,7
@@
bool SignalProxy::attachSlot(const QByteArray& sigName, QObject* recv, const cha
void SignalProxy::synchronize(SyncableObject *obj) {
createClassInfo(obj);
setUpdatedRemotelyId(obj);
void SignalProxy::synchronize(SyncableObject *obj) {
createClassInfo(obj);
setUpdatedRemotelyId(obj);
-
+
// attaching all the Signals
SignalRelay* relay;
if(_relayHash.contains(obj))
// attaching all the Signals
SignalRelay* relay;
if(_relayHash.contains(obj))
@@
-748,7
+762,10
@@
void SignalProxy::dispatchSignal(const RequestType &requestType, const QVariantL
case AbstractPeer::IODevicePeer:
{
IODevicePeer *ioPeer = static_cast<IODevicePeer *>(*peer);
case AbstractPeer::IODevicePeer:
{
IODevicePeer *ioPeer = static_cast<IODevicePeer *>(*peer);
- ioPeer->dispatchPackedFunc(packedFunc);
+ if(ioPeer->isOpen())
+ ioPeer->dispatchPackedFunc(packedFunc);
+ else
+ QCoreApplication::postEvent(this, new RemovePeerEvent(peer.key()));
}
break;
case AbstractPeer::SignalProxyPeer:
}
break;
case AbstractPeer::SignalProxyPeer:
@@
-772,7
+789,7
@@
void SignalProxy::receivePackedFunc(AbstractPeer *sender, const QVariant &packed
RequestType requestType = (RequestType)params.takeFirst().value<int>();
receivePeerSignal(sender, requestType, params);
}
RequestType requestType = (RequestType)params.takeFirst().value<int>();
receivePeerSignal(sender, requestType, params);
}
-
+
void SignalProxy::receivePeerSignal(AbstractPeer *sender, const RequestType &requestType, const QVariantList ¶ms) {
switch(requestType) {
case RpcCall:
void SignalProxy::receivePeerSignal(AbstractPeer *sender, const RequestType &requestType, const QVariantList ¶ms) {
switch(requestType) {
case RpcCall:
@@
-786,15
+803,15
@@
void SignalProxy::receivePeerSignal(AbstractPeer *sender, const RequestType &req
case Sync:
handleSync(sender, params);
break;
case Sync:
handleSync(sender, params);
break;
-
+
case InitRequest:
handleInitRequest(sender, params);
break;
case InitRequest:
handleInitRequest(sender, params);
break;
-
+
case InitData:
handleInitData(sender, params);
break;
case InitData:
handleInitData(sender, params);
break;
-
+
case HeartBeat:
receiveHeartBeat(sender, params);
break;
case HeartBeat:
receiveHeartBeat(sender, params);
break;
@@
-822,7
+839,7
@@
void SignalProxy::handleSync(AbstractPeer *sender, QVariantList params) {
qWarning() << "received invalid Sync call" << params;
return;
}
qWarning() << "received invalid Sync call" << params;
return;
}
-
+
QByteArray className = params.takeFirst().toByteArray();
QString objectName = params.takeFirst().toString();
QByteArray signal = params.takeFirst().toByteArray();
QByteArray className = params.takeFirst().toByteArray();
QString objectName = params.takeFirst().toString();
QByteArray signal = params.takeFirst().toByteArray();
@@
-859,7
+876,7
@@
void SignalProxy::handleSync(AbstractPeer *sender, QVariantList params) {
returnParams << returnValue;
sender->dispatchSignal(Sync, returnParams);
}
returnParams << returnValue;
sender->dispatchSignal(Sync, returnParams);
}
-
+
// send emit update signal
invokeSlot(receiver, updatedRemotelyId(receiver));
}
// send emit update signal
invokeSlot(receiver, updatedRemotelyId(receiver));
}
@@
-870,10
+887,10
@@
void SignalProxy::handleInitRequest(AbstractPeer *sender, const QVariantList &pa
<< params;
return;
}
<< params;
return;
}
-
+
QByteArray className(params[0].toByteArray());
QString objectName(params[1].toString());
QByteArray className(params[0].toByteArray());
QString objectName(params[1].toString());
-
+
if(!_syncSlave.contains(className)) {
qWarning() << "SignalProxy::handleInitRequest() received initRequest for unregistered Class:"
<< className;
if(!_syncSlave.contains(className)) {
qWarning() << "SignalProxy::handleInitRequest() received initRequest for unregistered Class:"
<< className;
@@
-885,7
+902,7
@@
void SignalProxy::handleInitRequest(AbstractPeer *sender, const QVariantList &pa
<< className << objectName;
return;
}
<< className << objectName;
return;
}
-
+
SyncableObject *obj = _syncSlave[className][objectName];
QVariantList params_;
SyncableObject *obj = _syncSlave[className][objectName];
QVariantList params_;
@@
-903,7
+920,7
@@
void SignalProxy::handleInitData(AbstractPeer *sender, const QVariantList ¶m
<< params;
return;
}
<< params;
return;
}
-
+
QByteArray className(params[0].toByteArray());
QString objectName(params[1].toString());
QVariantMap propertyMap(params[2].toMap());
QByteArray className(params[0].toByteArray());
QString objectName(params[1].toString());
QVariantMap propertyMap(params[2].toMap());
@@
-952,7
+969,7
@@
bool SignalProxy::invokeSlot(QObject *receiver, int methodId, const QVariantList
return false;
}
return false;
}
- void *_a[] = {0, // return type...
+ void *_a[] = {0, // return type...
0, 0, 0, 0 , 0, // and 10 args - that's the max size qt can handle with signals and slots
0, 0, 0, 0 , 0};
0, 0, 0, 0 , 0, // and 10 args - that's the max size qt can handle with signals and slots
0, 0, 0, 0 , 0};
@@
-972,7
+989,7
@@
bool SignalProxy::invokeSlot(QObject *receiver, int methodId, const QVariantList
if(returnValue.type() != QVariant::Invalid)
_a[0] = const_cast<void *>(returnValue.constData());
if(returnValue.type() != QVariant::Invalid)
_a[0] = const_cast<void *>(returnValue.constData());
-
+
Qt::ConnectionType type = QThread::currentThread() == receiver->thread()
? Qt::DirectConnection
: Qt::QueuedConnection;
Qt::ConnectionType type = QThread::currentThread() == receiver->thread()
? Qt::DirectConnection
: Qt::QueuedConnection;
@@
-984,7
+1001,7
@@
bool SignalProxy::invokeSlot(QObject *receiver, int methodId, const QVariantList
// note to self: qmetaobject.cpp:990 ff
return false;
}
// note to self: qmetaobject.cpp:990 ff
return false;
}
-
+
}
bool SignalProxy::invokeSlot(QObject *receiver, int methodId, const QVariantList ¶ms) {
}
bool SignalProxy::invokeSlot(QObject *receiver, int methodId, const QVariantList ¶ms) {
@@
-1013,7
+1030,7
@@
void SignalProxy::writeDataToDevice(QIODevice *dev, const QVariant &item, bool c
QDataStream out(&block, QIODevice::WriteOnly);
out.setVersion(QDataStream::Qt_4_2);
out << (quint32)0;
QDataStream out(&block, QIODevice::WriteOnly);
out.setVersion(QDataStream::Qt_4_2);
out << (quint32)0;
-
+
if(compressed) {
QByteArray rawItem;
QDataStream itemStream(&rawItem, QIODevice::WriteOnly);
if(compressed) {
QByteArray rawItem;
QDataStream itemStream(&rawItem, QIODevice::WriteOnly);
@@
-1044,30
+1061,35
@@
bool SignalProxy::readDataFromDevice(QIODevice *dev, quint32 &blockSize, QVarian
}
if(blockSize > 1 << 22) {
}
if(blockSize > 1 << 22) {
- qWarning() << qPrintable(tr("Client tried to send package larger than max package size!"));
- QAbstractSocket *sock = qobject_cast<QAbstractSocket *>(dev);
- qWarning() << qPrintable(tr("Disconnecting")) << (sock ? qPrintable(sock->peerAddress().toString()) : qPrintable(tr("local client")));
- dev->close();
+ disconnectDevice(dev, tr("Client tried to send package larger than max package size!"));
+ return false;
+ }
+
+ if(blockSize == 0) {
+ disconnectDevice(dev, tr("Client tried to send 0 byte package!"));
return false;
}
return false;
}
-
+
if(dev->bytesAvailable() < blockSize)
return false;
if(dev->bytesAvailable() < blockSize)
return false;
+ blockSize = 0;
+
if(compressed) {
QByteArray rawItem;
in >> rawItem;
if(compressed) {
QByteArray rawItem;
in >> rawItem;
- // debug check
+
int nbytes = rawItem.size();
int nbytes = rawItem.size();
- if
(nbytes <= 4) {
+ if(nbytes <= 4) {
const char *data = rawItem.constData();
const char *data = rawItem.constData();
- if (nbytes < 4 || (data[0]!=0 || data[1]!=0 || data[2]!=0 || data[3]!=0))
- qWarning() << "receieved corrupted compressed data:"
- << blockSize << rawItem << rawItem.size() << dev;
+ if(nbytes < 4 || (data[0]!=0 || data[1]!=0 || data[2]!=0 || data[3]!=0)) {
+ disconnectDevice(dev, tr("Client sent corrupted compressed data!"));
+ return false;
+ }
}
}
- // end
+
rawItem = qUncompress(rawItem);
rawItem = qUncompress(rawItem);
-
+
QDataStream itemStream(&rawItem, QIODevice::ReadOnly);
itemStream.setVersion(QDataStream::Qt_4_2);
itemStream >> item;
QDataStream itemStream(&rawItem, QIODevice::ReadOnly);
itemStream.setVersion(QDataStream::Qt_4_2);
itemStream >> item;
@@
-1075,7
+1097,10
@@
bool SignalProxy::readDataFromDevice(QIODevice *dev, quint32 &blockSize, QVarian
in >> item;
}
in >> item;
}
- blockSize = 0;
+ if(!item.isValid()) {
+ disconnectDevice(dev, tr("Client sent corrupt data: unable to load QVariant!"));
+ return false;
+ }
return true;
}
return true;
}
@@
-1175,25
+1200,40
@@
void SignalProxy::receiveHeartBeatReply(AbstractPeer *peer, const QVariantList &
qWarning() << "SignalProxy: received heart beat reply with less params then sent from:" << ioPeer->address();
return;
}
qWarning() << "SignalProxy: received heart beat reply with less params then sent from:" << ioPeer->address();
return;
}
-
+
QTime sendTime = params[0].value<QTime>();
updateLag(ioPeer, sendTime.msecsTo(QTime::currentTime()) / 2);
}
void SignalProxy::customEvent(QEvent *event) {
switch(event->type()) {
QTime sendTime = params[0].value<QTime>();
updateLag(ioPeer, sendTime.msecsTo(QTime::currentTime()) / 2);
}
void SignalProxy::customEvent(QEvent *event) {
switch(event->type()) {
- case
QEvent::User
:
+ case
PeerSignal
:
{
{
- PeerSignalEvent *
sig
= static_cast<PeerSignalEvent *>(event);
- receivePeerSignal(
sig->sender, sig->requestType, sig
->params);
+ PeerSignalEvent *
e
= static_cast<PeerSignalEvent *>(event);
+ receivePeerSignal(
e->sender, e->requestType, e
->params);
}
event->accept();
break;
}
event->accept();
break;
+ case RemovePeer:
+ {
+ RemovePeerEvent *e = static_cast<RemovePeerEvent *>(event);
+ removePeer(e->peer);
+ }
+ event->accept();
default:
return;
}
}
default:
return;
}
}
+void SignalProxy::disconnectDevice(QIODevice *dev, const QString &reason) {
+ if(!reason.isEmpty())
+ qWarning() << qPrintable(reason);
+ QAbstractSocket *sock = qobject_cast<QAbstractSocket *>(dev);
+ if(sock)
+ qWarning() << qPrintable(tr("Disconnecting")) << qPrintable(sock->peerAddress().toString());
+ dev->close();
+}
+
void SignalProxy::updateLag(IODevicePeer *peer, int lag) {
peer->lag = lag;
if(proxyMode() == Client) {
void SignalProxy::updateLag(IODevicePeer *peer, int lag) {
peer->lag = lag;
if(proxyMode() == Client) {
@@
-1216,7
+1256,7
@@
void SignalProxy::dumpProxyStats() {
int slaveCount = 0;
foreach(ObjectId oid, _syncSlave.values())
slaveCount += oid.count();
int slaveCount = 0;
foreach(ObjectId oid, _syncSlave.values())
slaveCount += oid.count();
-
+
qDebug() << this;
qDebug() << " Proxy Mode:" << mode;
qDebug() << "attached sending Objects:" << _relayHash.count();
qDebug() << this;
qDebug() << " Proxy Mode:" << mode;
qDebug() << "attached sending Objects:" << _relayHash.count();
@@
-1233,13
+1273,13
@@
void SignalProxy::dumpSyncMap(SyncableObject *object) {
QHash<QByteArray, int> syncMap_ = syncMap(object);
QHash<QByteArray, int>::const_iterator iter = syncMap_.constBegin();
while(iter != syncMap_.constEnd()) {
QHash<QByteArray, int> syncMap_ = syncMap(object);
QHash<QByteArray, int>::const_iterator iter = syncMap_.constBegin();
while(iter != syncMap_.constEnd()) {
- qDebug() << iter.key() << "-->" << iter.value() << meta->method(iter.value()).signature();
+ qDebug() << iter.key() << "-->" << iter.value() << meta->method(iter.value()).signature();
iter++;
}
// QHash<int, int> syncMap_ = syncMap(object);
// QHash<int, int>::const_iterator iter = syncMap_.constBegin();
// while(iter != syncMap_.constEnd()) {
iter++;
}
// QHash<int, int> syncMap_ = syncMap(object);
// QHash<int, int>::const_iterator iter = syncMap_.constBegin();
// while(iter != syncMap_.constEnd()) {
-// qDebug() << iter.key() << meta->method(iter.key()).signature() << "-->" << iter.value() << meta->method(iter.value()).signature();
+// qDebug() << iter.key() << meta->method(iter.key()).signature() << "-->" << iter.value() << meta->method(iter.value()).signature();
// iter++;
// }
}
// iter++;
// }
}