From b2f681c796855ba0f863eb14b062c5d1a2825df1 Mon Sep 17 00:00:00 2001 From: Marcus Eggenberger Date: Fri, 21 Nov 2008 14:01:08 +0100 Subject: [PATCH] first version of lockless storage backend (WIP with lots of debug output) --- src/core/abstractsqlstorage.cpp | 134 ++++++++++++++++++++++---------- src/core/abstractsqlstorage.h | 34 +++++++- src/core/core.cpp | 62 +++++++-------- src/core/sqlitestorage.cpp | 45 ++++++++--- 4 files changed, 191 insertions(+), 84 deletions(-) diff --git a/src/core/abstractsqlstorage.cpp b/src/core/abstractsqlstorage.cpp index 6d850d8d..467b2260 100644 --- a/src/core/abstractsqlstorage.cpp +++ b/src/core/abstractsqlstorage.cpp @@ -22,49 +22,54 @@ #include "logger.h" +#include #include #include AbstractSqlStorage::AbstractSqlStorage(QObject *parent) : Storage(parent), - _schemaVersion(0) + _schemaVersion(0), + _nextConnectionId(0) { } AbstractSqlStorage::~AbstractSqlStorage() { - QHash, QSqlQuery *>::iterator iter = _queryCache.begin(); - while(iter != _queryCache.end()) { - delete *iter; - iter = _queryCache.erase(iter); + // disconnect the connections, so their deletion is no longer interessting for us + QHash::iterator conIter; + for(conIter = _connectionPool.begin(); conIter != _connectionPool.end(); conIter++) { + disconnect(conIter.value(), 0, this, 0); } - - { - QSqlDatabase db = QSqlDatabase::database("quassel_connection"); - db.commit(); - db.close(); - } - QSqlDatabase::removeDatabase("quassel_connection"); } QSqlDatabase AbstractSqlStorage::logDb() { - QSqlDatabase db = QSqlDatabase::database("quassel_connection"); - if(db.isValid() && db.isOpen()) - return db; - - if(!openDb()) { - qWarning() << "Unable to Open Database" << displayName(); - qWarning() << "-" << db.lastError().text(); - } + if(!_connectionPool.contains(QThread::currentThread())) + addConnectionToPool(); - return QSqlDatabase::database("quassel_connection"); + qDebug() << "using logDb" << _connectionPool[QThread::currentThread()]->name() << QThread::currentThread(); + return QSqlDatabase::database(_connectionPool[QThread::currentThread()]->name()); } -bool AbstractSqlStorage::openDb() { - QSqlDatabase db = QSqlDatabase::database("quassel_connection"); - if(db.isValid() && !db.isOpen()) - return db.open(); +void AbstractSqlStorage::addConnectionToPool() { + QMutexLocker locker(&_connectionPoolMutex); + // we have to recheck if the connection pool already contains a connection for + // this thread. Since now (after the lock) we can only tell for sure + if(_connectionPool.contains(QThread::currentThread())) + return; + + QThread *currentThread = QThread::currentThread(); + + int connectionId = _nextConnectionId++; - db = QSqlDatabase::addDatabase(driverName(), "quassel_connection"); + Connection *connection = new Connection(QLatin1String(QString("quassel_connection_%1").arg(connectionId).toLatin1()), this); + qDebug() << "new connection" << connection->name() << currentThread << QLatin1String(QString("quassel_connection_%1").arg(connectionId).toLatin1()); + connection->moveToThread(currentThread); + connect(this, SIGNAL(syncCachedQueries()), connection, SLOT(syncCachedQueries())); + connect(this, SIGNAL(destroyed()), connection, SLOT(deleteLater())); + connect(currentThread, SIGNAL(destroyed()), connection, SLOT(deleteLater())); + connect(connection, SIGNAL(destroyed()), this, SLOT(connectionDestroyed())); + _connectionPool[currentThread] = connection; + + QSqlDatabase db = QSqlDatabase::addDatabase(driverName(), connection->name()); db.setDatabaseName(databaseName()); if(!hostName().isEmpty()) @@ -75,7 +80,10 @@ bool AbstractSqlStorage::openDb() { db.setPassword(password()); } - return db.open(); + if(!db.open()) { + qWarning() << "Unable to open database" << displayName() << "for thread" << QThread::currentThread(); + qWarning() << "-" << db.lastError().text(); + } } bool AbstractSqlStorage::init(const QVariantMap &settings) { @@ -105,13 +113,7 @@ bool AbstractSqlStorage::init(const QVariantMap &settings) { } void AbstractSqlStorage::sync() { - QHash, QSqlQuery *>::iterator iter = _queryCache.begin(); - while(iter != _queryCache.end()) { - delete *iter; - iter = _queryCache.erase(iter); - } - - logDb().commit(); + emit syncCachedQueries(); } QString AbstractSqlStorage::queryString(const QString &queryName, int version) { @@ -134,13 +136,9 @@ QString AbstractSqlStorage::queryString(const QString &queryName, int version) { } QSqlQuery &AbstractSqlStorage::cachedQuery(const QString &queryName, int version) { - QPair queryId = qMakePair(queryName, version); - if(!_queryCache.contains(queryId)) { - QSqlQuery *query = new QSqlQuery(logDb()); - query->prepare(queryString(queryName, version)); - _queryCache[queryId] = query; - } - return *(_queryCache[queryId]); + Q_ASSERT(_connectionPool.contains(QThread::currentThread())); + qDebug() << "cached query" << queryName << "using" << _connectionPool[QThread::currentThread()]->name() << QThread::currentThread(); + return _connectionPool[QThread::currentThread()]->cachedQuery(queryName, version); } QStringList AbstractSqlStorage::setupQueries() { @@ -239,3 +237,57 @@ bool AbstractSqlStorage::watchQuery(QSqlQuery &query) { } return true; } + +void AbstractSqlStorage::connectionDestroyed() { + QMutexLocker locker(&_connectionPoolMutex); + _connectionPool.remove(sender()->thread()); +} + +// ======================================== +// AbstractSqlStorage::Connection +// ======================================== +AbstractSqlStorage::Connection::Connection(const QString &name, AbstractSqlStorage *storage, QObject *parent) + : QObject(parent), + _name(name.toLatin1()), + _storageEngine(storage) +{ +} + +AbstractSqlStorage::Connection::~Connection() { + QHash, QSqlQuery *>::iterator iter = _queryCache.begin(); + while(iter != _queryCache.end()) { + delete *iter; + iter = _queryCache.erase(iter); + } + { + QSqlDatabase db = QSqlDatabase::database(name(), false); + if(db.isOpen()) { + db.commit(); + db.close(); + } + } + QSqlDatabase::removeDatabase(name()); +} + +QSqlQuery &AbstractSqlStorage::Connection::cachedQuery(const QString &queryName, int version) { + QPair queryId = qMakePair(queryName, version); + if(_queryCache.contains(queryId)) { + return *(_queryCache[queryId]); + } + + QSqlQuery *query = new QSqlQuery(QSqlDatabase::database(name())); + query->prepare(_storageEngine->queryString(queryName, version)); + _queryCache[queryId] = query; + return *query; +} + +void AbstractSqlStorage::Connection::syncCachedQueries() { + QHash, QSqlQuery *>::iterator iter = _queryCache.begin(); + while(iter != _queryCache.end()) { + delete *iter; + iter = _queryCache.erase(iter); + } + QSqlDatabase db = QSqlDatabase::database(name(), false); + if(db.isOpen()) + db.commit(); +} diff --git a/src/core/abstractsqlstorage.h b/src/core/abstractsqlstorage.h index 90185091..bbe4d081 100644 --- a/src/core/abstractsqlstorage.h +++ b/src/core/abstractsqlstorage.h @@ -63,13 +63,43 @@ protected: inline virtual QString userName() { return QString(); } inline virtual QString password() { return QString(); } +signals: + void syncCachedQueries(); + +private slots: + void connectionDestroyed(); + private: - bool openDb(); + void addConnectionToPool(); int _schemaVersion; - QHash, QSqlQuery *> _queryCache; + int _nextConnectionId; + QMutex _connectionPoolMutex; + class Connection; + QHash _connectionPool; }; +// ======================================== +// AbstractSqlStorage::Connection +// ======================================== +class AbstractSqlStorage::Connection : public QObject { + Q_OBJECT + +public: + Connection(const QString &name, AbstractSqlStorage *storage, QObject *parent = 0); + ~Connection(); + + inline QLatin1String name() const { return QLatin1String(_name); } + QSqlQuery &cachedQuery(const QString &queryName, int version); + +public slots: + void syncCachedQueries(); + +private: + QByteArray _name; + QHash, QSqlQuery *> _queryCache; + AbstractSqlStorage *_storageEngine; +}; #endif diff --git a/src/core/core.cpp b/src/core/core.cpp index 9432f2a0..34a25cb5 100644 --- a/src/core/core.cpp +++ b/src/core/core.cpp @@ -168,9 +168,9 @@ QString Core::setupCore(QVariantMap setupData) { CoreSettings s; s.setStorageSettings(setupData); quInfo() << qPrintable(tr("Creating admin user...")); - mutex.lock(); + //mutex.lock(); storage->addUser(user, password); - mutex.unlock(); + //mutex.unlock(); startListening(); // TODO check when we need this return QString(); } @@ -225,23 +225,23 @@ bool Core::initStorage(QVariantMap dbSettings, bool setup) { } void Core::syncStorage() { - QMutexLocker locker(&mutex); + // QMutexLocker locker(&mutex); if(storage) storage->sync(); } /*** Storage Access ***/ void Core::setUserSetting(UserId userId, const QString &settingName, const QVariant &data) { - QMutexLocker locker(&mutex); + // QMutexLocker locker(&mutex); instance()->storage->setUserSetting(userId, settingName, data); } QVariant Core::getUserSetting(UserId userId, const QString &settingName, const QVariant &data) { - QMutexLocker locker(&mutex); + // QMutexLocker locker(&mutex); return instance()->storage->getUserSetting(userId, settingName, data); } bool Core::createNetwork(UserId user, NetworkInfo &info) { - QMutexLocker locker(&mutex); + // QMutexLocker locker(&mutex); NetworkId networkId = instance()->storage->createNetwork(user, info); if(!networkId.isValid()) return false; @@ -251,107 +251,107 @@ bool Core::createNetwork(UserId user, NetworkInfo &info) { } bool Core::updateNetwork(UserId user, const NetworkInfo &info) { - QMutexLocker locker(&mutex); + // QMutexLocker locker(&mutex); return instance()->storage->updateNetwork(user, info); } bool Core::removeNetwork(UserId user, const NetworkId &networkId) { - QMutexLocker locker(&mutex); + // QMutexLocker locker(&mutex); return instance()->storage->removeNetwork(user, networkId); } QList Core::networks(UserId user) { - QMutexLocker locker(&mutex); + // QMutexLocker locker(&mutex); return instance()->storage->networks(user); } NetworkId Core::networkId(UserId user, const QString &network) { - QMutexLocker locker(&mutex); + // QMutexLocker locker(&mutex); return instance()->storage->getNetworkId(user, network); } QList Core::connectedNetworks(UserId user) { - QMutexLocker locker(&mutex); + // QMutexLocker locker(&mutex); return instance()->storage->connectedNetworks(user); } void Core::setNetworkConnected(UserId user, const NetworkId &networkId, bool isConnected) { - QMutexLocker locker(&mutex); + // QMutexLocker locker(&mutex); return instance()->storage->setNetworkConnected(user, networkId, isConnected); } QHash Core::persistentChannels(UserId user, const NetworkId &networkId) { - QMutexLocker locker(&mutex); + // QMutexLocker locker(&mutex); return instance()->storage->persistentChannels(user, networkId); } void Core::setChannelPersistent(UserId user, const NetworkId &networkId, const QString &channel, bool isJoined) { - QMutexLocker locker(&mutex); + // QMutexLocker locker(&mutex); return instance()->storage->setChannelPersistent(user, networkId, channel, isJoined); } void Core::setPersistentChannelKey(UserId user, const NetworkId &networkId, const QString &channel, const QString &key) { - QMutexLocker locker(&mutex); + // QMutexLocker locker(&mutex); return instance()->storage->setPersistentChannelKey(user, networkId, channel, key); } BufferInfo Core::bufferInfo(UserId user, const NetworkId &networkId, BufferInfo::Type type, const QString &buffer) { - QMutexLocker locker(&mutex); + // QMutexLocker locker(&mutex); return instance()->storage->getBufferInfo(user, networkId, type, buffer); } BufferInfo Core::getBufferInfo(UserId user, const BufferId &bufferId) { - QMutexLocker locker(&mutex); + // QMutexLocker locker(&mutex); return instance()->storage->getBufferInfo(user, bufferId); } MsgId Core::storeMessage(const Message &message) { - QMutexLocker locker(&mutex); + // QMutexLocker locker(&mutex); return instance()->storage->logMessage(message); } QList Core::requestMsgs(UserId user, BufferId buffer, int lastmsgs, int offset) { - QMutexLocker locker(&mutex); + // QMutexLocker locker(&mutex); return instance()->storage->requestMsgs(user, buffer, lastmsgs, offset); } QList Core::requestMsgs(UserId user, BufferId buffer, QDateTime since, int offset) { - QMutexLocker locker(&mutex); + // QMutexLocker locker(&mutex); return instance()->storage->requestMsgs(user, buffer, since, offset); } QList Core::requestMsgRange(UserId user, BufferId buffer, int first, int last) { - QMutexLocker locker(&mutex); + // QMutexLocker locker(&mutex); return instance()->storage->requestMsgRange(user, buffer, first, last); } QList Core::requestBuffers(UserId user) { - QMutexLocker locker(&mutex); + // QMutexLocker locker(&mutex); return instance()->storage->requestBuffers(user); } QList Core::requestBufferIdsForNetwork(UserId user, NetworkId networkId) { - QMutexLocker locker(&mutex); + // QMutexLocker locker(&mutex); return instance()->storage->requestBufferIdsForNetwork(user, networkId); } bool Core::removeBuffer(const UserId &user, const BufferId &bufferId) { - QMutexLocker locker(&mutex); + // QMutexLocker locker(&mutex); return instance()->storage->removeBuffer(user, bufferId); } BufferId Core::renameBuffer(const UserId &user, const NetworkId &networkId, const QString &newName, const QString &oldName) { - QMutexLocker locker(&mutex); + // QMutexLocker locker(&mutex); return instance()->storage->renameBuffer(user, networkId, newName, oldName); } void Core::setBufferLastSeenMsg(UserId user, const BufferId &bufferId, const MsgId &msgId) { - QMutexLocker locker(&mutex); + // QMutexLocker locker(&mutex); return instance()->storage->setBufferLastSeenMsg(user, bufferId, msgId); } QHash Core::bufferLastSeenMsgIds(UserId user) { - QMutexLocker locker(&mutex); + // QMutexLocker locker(&mutex); return instance()->storage->bufferLastSeenMsgIds(user); } @@ -545,9 +545,9 @@ void Core::processClientMessage(QTcpSocket *socket, const QVariantMap &msg) { SignalProxy::writeDataToDevice(socket, reply); } else if(msg["MsgType"] == "ClientLogin") { QVariantMap reply; - mutex.lock(); + // mutex.lock(); UserId uid = storage->validateUser(msg["User"].toString(), msg["Password"].toString()); - mutex.unlock(); + // mutex.unlock(); if(uid == 0) { reply["MsgType"] = "ClientLoginReject"; reply["Error"] = tr("Invalid username or password!
The username/password combination you supplied could not be found in the database."); @@ -627,9 +627,9 @@ void Core::setupInternalClientSession(SignalProxy *proxy) { setupCoreForInternalUsage(); } - mutex.lock(); + // mutex.lock(); UserId uid = storage->internalUser(); - mutex.unlock(); + // mutex.unlock(); // Find or create session for validated user SessionThread *sess; diff --git a/src/core/sqlitestorage.cpp b/src/core/sqlitestorage.cpp index 613fa32d..7b024016 100644 --- a/src/core/sqlitestorage.cpp +++ b/src/core/sqlitestorage.cpp @@ -480,7 +480,9 @@ void SqliteStorage::setPersistentChannelKey(UserId user, const NetworkId &networ void SqliteStorage::createBuffer(UserId user, const NetworkId &networkId, BufferInfo::Type type, const QString &buffer) { - QSqlQuery &query = cachedQuery("insert_buffer"); + // QSqlQuery &query = cachedQuery("insert_buffer"); + QSqlQuery query(logDb()); + query.prepare(queryString("insert_buffer")); query.bindValue(":userid", user.toInt()); query.bindValue(":networkid", networkId.toInt()); query.bindValue(":buffertype", (int)type); @@ -492,7 +494,9 @@ void SqliteStorage::createBuffer(UserId user, const NetworkId &networkId, Buffer } BufferInfo SqliteStorage::getBufferInfo(UserId user, const NetworkId &networkId, BufferInfo::Type type, const QString &buffer) { - QSqlQuery &query = cachedQuery("select_bufferByName"); + // QSqlQuery &query = cachedQuery("select_bufferByName"); + QSqlQuery query(logDb()); + query.prepare(queryString("select_bufferByName")); query.bindValue(":networkid", networkId.toInt()); query.bindValue(":userid", user.toInt()); query.bindValue(":buffercname", buffer.toLower()); @@ -632,7 +636,10 @@ BufferId SqliteStorage::renameBuffer(const UserId &user, const NetworkId &networ } void SqliteStorage::setBufferLastSeenMsg(UserId user, const BufferId &bufferId, const MsgId &msgId) { - QSqlQuery &query = cachedQuery("update_buffer_lastseen"); + // QSqlQuery &query = cachedQuery("update_buffer_lastseen"); + QSqlQuery query(logDb()); + query.prepare(queryString("update_buffer_lastseen")); + query.bindValue(":userid", user.toInt()); query.bindValue(":bufferid", bufferId.toInt()); query.bindValue(":lastseenmsgid", msgId.toInt()); @@ -656,7 +663,10 @@ QHash SqliteStorage::bufferLastSeenMsgIds(UserId user) { } MsgId SqliteStorage::logMessage(Message msg) { - QSqlQuery &logMessageQuery = cachedQuery("insert_message"); + // QSqlQuery &logMessageQuery = cachedQuery("insert_message"); + QSqlQuery logMessageQuery(logDb()); + logMessageQuery.prepare(queryString("insert_message")); + logMessageQuery.bindValue(":time", msg.timestamp().toTime_t()); logMessageQuery.bindValue(":bufferid", msg.bufferInfo().bufferId().toInt()); logMessageQuery.bindValue(":type", msg.type()); @@ -668,7 +678,9 @@ MsgId SqliteStorage::logMessage(Message msg) { if(logMessageQuery.lastError().isValid()) { // constraint violation - must be NOT NULL constraint - probably the sender is missing... if(logMessageQuery.lastError().number() == 19) { - QSqlQuery &addSenderQuery = cachedQuery("insert_sender"); + // QSqlQuery &addSenderQuery = cachedQuery("insert_sender"); + QSqlQuery addSenderQuery(logDb()); + addSenderQuery.prepare(queryString("insert_sender")); addSenderQuery.bindValue(":sender", msg.sender()); addSenderQuery.exec(); watchQuery(addSenderQuery); @@ -696,7 +708,10 @@ QList SqliteStorage::requestMsgs(UserId user, BufferId bufferId, int la offset = 0; } else { // we have to determine the real offset first - QSqlQuery &offsetQuery = cachedQuery("select_messagesOffset"); + // QSqlQuery &offsetQuery = cachedQuery("select_messagesOffset"); + QSqlQuery offsetQuery(logDb()); + offsetQuery.prepare(queryString("select_messagesOffset")); + offsetQuery.bindValue(":bufferid", bufferId.toInt()); offsetQuery.bindValue(":messageid", offset); offsetQuery.exec(); @@ -705,7 +720,10 @@ QList SqliteStorage::requestMsgs(UserId user, BufferId bufferId, int la } // now let's select the messages - QSqlQuery &msgQuery = cachedQuery("select_messages"); + // QSqlQuery &msgQuery = cachedQuery("select_messages"); + QSqlQuery msgQuery(logDb()); + msgQuery.prepare(queryString("select_messages")); + msgQuery.bindValue(":bufferid", bufferId.toInt()); msgQuery.bindValue(":limit", lastmsgs); msgQuery.bindValue(":offset", offset); @@ -735,7 +753,10 @@ QList SqliteStorage::requestMsgs(UserId user, BufferId bufferId, QDateT return messagelist; // we have to determine the real offset first - QSqlQuery &offsetQuery = cachedQuery("select_messagesSinceOffset"); + // QSqlQuery &offsetQuery = cachedQuery("select_messagesSinceOffset"); + QSqlQuery offsetQuery(logDb()); + offsetQuery.prepare(queryString("select_messagesSinceOffset")); + offsetQuery.bindValue(":bufferid", bufferId.toInt()); offsetQuery.bindValue(":since", since.toTime_t()); offsetQuery.exec(); @@ -743,7 +764,9 @@ QList SqliteStorage::requestMsgs(UserId user, BufferId bufferId, QDateT offset = offsetQuery.value(0).toInt(); // now let's select the messages - QSqlQuery &msgQuery = cachedQuery("select_messagesSince"); + // QSqlQuery &msgQuery = cachedQuery("select_messagesSince"); + QSqlQuery msgQuery(logDb()); + msgQuery.prepare(queryString("select_messagesSince")); msgQuery.bindValue(":bufferid", bufferId.toInt()); msgQuery.bindValue(":since", since.toTime_t()); msgQuery.bindValue(":offset", offset); @@ -773,7 +796,9 @@ QList SqliteStorage::requestMsgRange(UserId user, BufferId bufferId, in if(!bufferInfo.isValid()) return messagelist; - QSqlQuery &rangeQuery = cachedQuery("select_messageRange"); + // QSqlQuery &rangeQuery = cachedQuery("select_messageRange"); + QSqlQuery rangeQuery(logDb()); + rangeQuery.prepare(queryString("select_messageRange")); rangeQuery.bindValue(":bufferid", bufferId.toInt()); rangeQuery.bindValue(":firstmsg", first); rangeQuery.bindValue(":lastmsg", last); -- 2.20.1