first version of lockless storage backend (WIP with lots of debug output)
authorMarcus Eggenberger <egs@quassel-irc.org>
Fri, 21 Nov 2008 13:01:08 +0000 (14:01 +0100)
committerMarcus Eggenberger <egs@quassel-irc.org>
Fri, 21 Nov 2008 13:01:08 +0000 (14:01 +0100)
src/core/abstractsqlstorage.cpp
src/core/abstractsqlstorage.h
src/core/core.cpp
src/core/sqlitestorage.cpp

index 6d850d8..467b226 100644 (file)
 
 #include "logger.h"
 
+#include <QMutexLocker>
 #include <QSqlError>
 #include <QSqlQuery>
 
 AbstractSqlStorage::AbstractSqlStorage(QObject *parent)
   : Storage(parent),
-    _schemaVersion(0)
+    _schemaVersion(0),
+    _nextConnectionId(0)
 {
 }
 
 AbstractSqlStorage::~AbstractSqlStorage() {
-  QHash<QPair<QString, int>, QSqlQuery *>::iterator iter = _queryCache.begin();
-  while(iter != _queryCache.end()) {
-    delete *iter;
-    iter = _queryCache.erase(iter);
+  // disconnect the connections, so their deletion is no longer interessting for us
+  QHash<QThread *, Connection *>::iterator conIter;
+  for(conIter = _connectionPool.begin(); conIter != _connectionPool.end(); conIter++) {
+    disconnect(conIter.value(), 0, this, 0);
   }
-  
-  {
-    QSqlDatabase db = QSqlDatabase::database("quassel_connection");
-    db.commit();
-    db.close();
-  }
-  QSqlDatabase::removeDatabase("quassel_connection");  
 }
 
 QSqlDatabase AbstractSqlStorage::logDb() {
-  QSqlDatabase db = QSqlDatabase::database("quassel_connection");
-  if(db.isValid() && db.isOpen())
-    return db;
-
-  if(!openDb()) {
-    qWarning() << "Unable to Open Database" << displayName();
-    qWarning() << "-" << db.lastError().text();
-  }
+  if(!_connectionPool.contains(QThread::currentThread()))
+    addConnectionToPool();
 
-  return QSqlDatabase::database("quassel_connection");
+  qDebug() << "using logDb" << _connectionPool[QThread::currentThread()]->name() << QThread::currentThread();
+  return QSqlDatabase::database(_connectionPool[QThread::currentThread()]->name());
 }
 
-bool AbstractSqlStorage::openDb() {
-  QSqlDatabase db = QSqlDatabase::database("quassel_connection");
-  if(db.isValid() && !db.isOpen())
-    return db.open();
+void AbstractSqlStorage::addConnectionToPool() {
+  QMutexLocker locker(&_connectionPoolMutex);
+  // we have to recheck if the connection pool already contains a connection for
+  // this thread. Since now (after the lock) we can only tell for sure
+  if(_connectionPool.contains(QThread::currentThread()))
+    return;
+
+  QThread *currentThread = QThread::currentThread();
+
+  int connectionId = _nextConnectionId++;
 
-  db = QSqlDatabase::addDatabase(driverName(), "quassel_connection");
+  Connection *connection = new Connection(QLatin1String(QString("quassel_connection_%1").arg(connectionId).toLatin1()), this);
+  qDebug() << "new connection" << connection->name() << currentThread << QLatin1String(QString("quassel_connection_%1").arg(connectionId).toLatin1());
+  connection->moveToThread(currentThread);
+  connect(this, SIGNAL(syncCachedQueries()), connection, SLOT(syncCachedQueries()));
+  connect(this, SIGNAL(destroyed()), connection, SLOT(deleteLater()));
+  connect(currentThread, SIGNAL(destroyed()), connection, SLOT(deleteLater()));
+  connect(connection, SIGNAL(destroyed()), this, SLOT(connectionDestroyed()));
+  _connectionPool[currentThread] = connection;
+
+  QSqlDatabase db = QSqlDatabase::addDatabase(driverName(), connection->name());
   db.setDatabaseName(databaseName());
 
   if(!hostName().isEmpty())
@@ -75,7 +80,10 @@ bool AbstractSqlStorage::openDb() {
     db.setPassword(password());
   }
 
-  return db.open();
+  if(!db.open()) {
+    qWarning() << "Unable to open database" << displayName() << "for thread" << QThread::currentThread();
+    qWarning() << "-" << db.lastError().text();
+  }
 }
 
 bool AbstractSqlStorage::init(const QVariantMap &settings) {
@@ -105,13 +113,7 @@ bool AbstractSqlStorage::init(const QVariantMap &settings) {
 }
 
 void AbstractSqlStorage::sync() {
-  QHash<QPair<QString, int>, QSqlQuery *>::iterator iter = _queryCache.begin();
-  while(iter != _queryCache.end()) {
-    delete *iter;
-    iter = _queryCache.erase(iter);
-  }
-
-  logDb().commit();
+  emit syncCachedQueries();
 }
 
 QString AbstractSqlStorage::queryString(const QString &queryName, int version) {
@@ -134,13 +136,9 @@ QString AbstractSqlStorage::queryString(const QString &queryName, int version) {
 }
 
 QSqlQuery &AbstractSqlStorage::cachedQuery(const QString &queryName, int version) {
-  QPair<QString, int> queryId = qMakePair(queryName, version);
-  if(!_queryCache.contains(queryId)) {
-    QSqlQuery *query = new QSqlQuery(logDb());
-    query->prepare(queryString(queryName, version));
-    _queryCache[queryId] = query;
-  }
-  return *(_queryCache[queryId]);
+  Q_ASSERT(_connectionPool.contains(QThread::currentThread()));
+  qDebug() << "cached query" << queryName << "using" << _connectionPool[QThread::currentThread()]->name() << QThread::currentThread();
+  return _connectionPool[QThread::currentThread()]->cachedQuery(queryName, version);
 }
 
 QStringList AbstractSqlStorage::setupQueries() {
@@ -239,3 +237,57 @@ bool AbstractSqlStorage::watchQuery(QSqlQuery &query) {
   }
   return true;
 }
+
+void AbstractSqlStorage::connectionDestroyed() {
+  QMutexLocker locker(&_connectionPoolMutex);
+  _connectionPool.remove(sender()->thread());
+}
+
+// ========================================
+//  AbstractSqlStorage::Connection
+// ========================================
+AbstractSqlStorage::Connection::Connection(const QString &name, AbstractSqlStorage *storage, QObject *parent)
+  : QObject(parent),
+    _name(name.toLatin1()),
+    _storageEngine(storage)
+{
+}
+
+AbstractSqlStorage::Connection::~Connection() {
+  QHash<QPair<QString, int>, QSqlQuery *>::iterator iter = _queryCache.begin();
+  while(iter != _queryCache.end()) {
+    delete *iter;
+    iter = _queryCache.erase(iter);
+  }
+  {
+    QSqlDatabase db = QSqlDatabase::database(name(), false);
+    if(db.isOpen()) {
+      db.commit();
+      db.close();
+    }
+  }
+  QSqlDatabase::removeDatabase(name());
+}
+
+QSqlQuery &AbstractSqlStorage::Connection::cachedQuery(const QString &queryName, int version) {
+  QPair<QString, int> queryId = qMakePair(queryName, version);
+  if(_queryCache.contains(queryId)) {
+    return *(_queryCache[queryId]);
+  }
+
+  QSqlQuery *query = new QSqlQuery(QSqlDatabase::database(name()));
+  query->prepare(_storageEngine->queryString(queryName, version));
+  _queryCache[queryId] = query;
+  return *query;
+}
+
+void AbstractSqlStorage::Connection::syncCachedQueries() {
+  QHash<QPair<QString, int>, QSqlQuery *>::iterator iter = _queryCache.begin();
+  while(iter != _queryCache.end()) {
+    delete *iter;
+    iter = _queryCache.erase(iter);
+  }
+  QSqlDatabase db = QSqlDatabase::database(name(), false);
+  if(db.isOpen())
+    db.commit();
+}
index 9018509..bbe4d08 100644 (file)
@@ -63,13 +63,43 @@ protected:
   inline virtual QString userName() { return QString(); }
   inline virtual QString password() { return QString(); }
 
+signals:
+  void syncCachedQueries();
+
+private slots:
+  void connectionDestroyed();
+
 private:
-  bool openDb();
+  void addConnectionToPool();
 
   int _schemaVersion;
 
-  QHash<QPair<QString, int>, QSqlQuery *> _queryCache;
+  int _nextConnectionId;
+  QMutex _connectionPoolMutex;
+  class Connection;
+  QHash<QThread *, Connection *> _connectionPool;
 };
 
+// ========================================
+//  AbstractSqlStorage::Connection
+// ========================================
+class AbstractSqlStorage::Connection : public QObject {
+  Q_OBJECT
+
+public:
+  Connection(const QString &name, AbstractSqlStorage *storage, QObject *parent = 0);
+  ~Connection();
+  
+  inline QLatin1String name() const { return QLatin1String(_name); }
+  QSqlQuery &cachedQuery(const QString &queryName, int version);
+
+public slots:
+  void syncCachedQueries();
+
+private:
+  QByteArray _name;
+  QHash<QPair<QString, int>, QSqlQuery *> _queryCache;
+  AbstractSqlStorage *_storageEngine;
+};
 
 #endif
index 9432f2a..34a25cb 100644 (file)
@@ -168,9 +168,9 @@ QString Core::setupCore(QVariantMap setupData) {
   CoreSettings s;
   s.setStorageSettings(setupData);
   quInfo() << qPrintable(tr("Creating admin user..."));
-  mutex.lock();
+  //mutex.lock();
   storage->addUser(user, password);
-  mutex.unlock();
+  //mutex.unlock();
   startListening();  // TODO check when we need this
   return QString();
 }
@@ -225,23 +225,23 @@ bool Core::initStorage(QVariantMap dbSettings, bool setup) {
 }
 
 void Core::syncStorage() {
-  QMutexLocker locker(&mutex);
+  // QMutexLocker locker(&mutex);
   if(storage) storage->sync();
 }
 
 /*** Storage Access ***/
 void Core::setUserSetting(UserId userId, const QString &settingName, const QVariant &data) {
-  QMutexLocker locker(&mutex);
+  // QMutexLocker locker(&mutex);
   instance()->storage->setUserSetting(userId, settingName, data);
 }
 
 QVariant Core::getUserSetting(UserId userId, const QString &settingName, const QVariant &data) {
-  QMutexLocker locker(&mutex);
+  // QMutexLocker locker(&mutex);
   return instance()->storage->getUserSetting(userId, settingName, data);
 }
 
 bool Core::createNetwork(UserId user, NetworkInfo &info) {
-  QMutexLocker locker(&mutex);
+  // QMutexLocker locker(&mutex);
   NetworkId networkId = instance()->storage->createNetwork(user, info);
   if(!networkId.isValid())
     return false;
@@ -251,107 +251,107 @@ bool Core::createNetwork(UserId user, NetworkInfo &info) {
 }
 
 bool Core::updateNetwork(UserId user, const NetworkInfo &info) {
-  QMutexLocker locker(&mutex);
+  // QMutexLocker locker(&mutex);
   return instance()->storage->updateNetwork(user, info);
 }
 
 bool Core::removeNetwork(UserId user, const NetworkId &networkId) {
-  QMutexLocker locker(&mutex);
+  // QMutexLocker locker(&mutex);
   return instance()->storage->removeNetwork(user, networkId);
 }
 
 QList<NetworkInfo> Core::networks(UserId user) {
-  QMutexLocker locker(&mutex);
+  // QMutexLocker locker(&mutex);
   return instance()->storage->networks(user);
 }
 
 NetworkId Core::networkId(UserId user, const QString &network) {
-  QMutexLocker locker(&mutex);
+  // QMutexLocker locker(&mutex);
   return instance()->storage->getNetworkId(user, network);
 }
 
 QList<NetworkId> Core::connectedNetworks(UserId user) {
-  QMutexLocker locker(&mutex);
+  // QMutexLocker locker(&mutex);
   return instance()->storage->connectedNetworks(user);
 }
 
 void Core::setNetworkConnected(UserId user, const NetworkId &networkId, bool isConnected) {
-  QMutexLocker locker(&mutex);
+  // QMutexLocker locker(&mutex);
   return instance()->storage->setNetworkConnected(user, networkId, isConnected);
 }
 
 QHash<QString, QString> Core::persistentChannels(UserId user, const NetworkId &networkId) {
-  QMutexLocker locker(&mutex);
+  // QMutexLocker locker(&mutex);
   return instance()->storage->persistentChannels(user, networkId);
 }
 
 void Core::setChannelPersistent(UserId user, const NetworkId &networkId, const QString &channel, bool isJoined) {
-  QMutexLocker locker(&mutex);
+  // QMutexLocker locker(&mutex);
   return instance()->storage->setChannelPersistent(user, networkId, channel, isJoined);
 }
 
 void Core::setPersistentChannelKey(UserId user, const NetworkId &networkId, const QString &channel, const QString &key) {
-  QMutexLocker locker(&mutex);
+  // QMutexLocker locker(&mutex);
   return instance()->storage->setPersistentChannelKey(user, networkId, channel, key);
 }
 
 BufferInfo Core::bufferInfo(UserId user, const NetworkId &networkId, BufferInfo::Type type, const QString &buffer) {
-  QMutexLocker locker(&mutex);
+  // QMutexLocker locker(&mutex);
   return instance()->storage->getBufferInfo(user, networkId, type, buffer);
 }
 
 BufferInfo Core::getBufferInfo(UserId user, const BufferId &bufferId) {
-  QMutexLocker locker(&mutex);
+  // QMutexLocker locker(&mutex);
   return instance()->storage->getBufferInfo(user, bufferId);
 }
 
 MsgId Core::storeMessage(const Message &message) {
-  QMutexLocker locker(&mutex);
+  // QMutexLocker locker(&mutex);
   return instance()->storage->logMessage(message);
 }
 
 QList<Message> Core::requestMsgs(UserId user, BufferId buffer, int lastmsgs, int offset) {
-  QMutexLocker locker(&mutex);
+  // QMutexLocker locker(&mutex);
   return instance()->storage->requestMsgs(user, buffer, lastmsgs, offset);
 }
 
 QList<Message> Core::requestMsgs(UserId user, BufferId buffer, QDateTime since, int offset) {
-  QMutexLocker locker(&mutex);
+  // QMutexLocker locker(&mutex);
   return instance()->storage->requestMsgs(user, buffer, since, offset);
 }
 
 QList<Message> Core::requestMsgRange(UserId user, BufferId buffer, int first, int last) {
-  QMutexLocker locker(&mutex);
+  // QMutexLocker locker(&mutex);
   return instance()->storage->requestMsgRange(user, buffer, first, last);
 }
 
 QList<BufferInfo> Core::requestBuffers(UserId user) {
-  QMutexLocker locker(&mutex);
+  // QMutexLocker locker(&mutex);
   return instance()->storage->requestBuffers(user);
 }
 
 QList<BufferId> Core::requestBufferIdsForNetwork(UserId user, NetworkId networkId) {
-  QMutexLocker locker(&mutex);
+  // QMutexLocker locker(&mutex);
   return instance()->storage->requestBufferIdsForNetwork(user, networkId);
 }
 
 bool Core::removeBuffer(const UserId &user, const BufferId &bufferId) {
-  QMutexLocker locker(&mutex);
+  // QMutexLocker locker(&mutex);
   return instance()->storage->removeBuffer(user, bufferId);
 }
 
 BufferId Core::renameBuffer(const UserId &user, const NetworkId &networkId, const QString &newName, const QString &oldName) {
-  QMutexLocker locker(&mutex);
+  // QMutexLocker locker(&mutex);
   return instance()->storage->renameBuffer(user, networkId, newName, oldName);
 }
 
 void Core::setBufferLastSeenMsg(UserId user, const BufferId &bufferId, const MsgId &msgId) {
-  QMutexLocker locker(&mutex);
+  // QMutexLocker locker(&mutex);
   return instance()->storage->setBufferLastSeenMsg(user, bufferId, msgId);
 }
 
 QHash<BufferId, MsgId> Core::bufferLastSeenMsgIds(UserId user) {
-  QMutexLocker locker(&mutex);
+  // QMutexLocker locker(&mutex);
   return instance()->storage->bufferLastSeenMsgIds(user);
 }
 
@@ -545,9 +545,9 @@ void Core::processClientMessage(QTcpSocket *socket, const QVariantMap &msg) {
       SignalProxy::writeDataToDevice(socket, reply);
     } else if(msg["MsgType"] == "ClientLogin") {
       QVariantMap reply;
-      mutex.lock();
+      // mutex.lock();
       UserId uid = storage->validateUser(msg["User"].toString(), msg["Password"].toString());
-      mutex.unlock();
+      // mutex.unlock();
       if(uid == 0) {
         reply["MsgType"] = "ClientLoginReject";
         reply["Error"] = tr("<b>Invalid username or password!</b><br>The username/password combination you supplied could not be found in the database.");
@@ -627,9 +627,9 @@ void Core::setupInternalClientSession(SignalProxy *proxy) {
     setupCoreForInternalUsage();
   }
 
-  mutex.lock();
+  // mutex.lock();
   UserId uid = storage->internalUser();
-  mutex.unlock();
+  // mutex.unlock();
 
   // Find or create session for validated user
   SessionThread *sess;
index 613fa32..7b02401 100644 (file)
@@ -480,7 +480,9 @@ void SqliteStorage::setPersistentChannelKey(UserId user, const NetworkId &networ
 
 
 void SqliteStorage::createBuffer(UserId user, const NetworkId &networkId, BufferInfo::Type type, const QString &buffer) {
-  QSqlQuery &query = cachedQuery("insert_buffer");
+  // QSqlQuery &query = cachedQuery("insert_buffer");
+  QSqlQuery query(logDb());
+  query.prepare(queryString("insert_buffer"));
   query.bindValue(":userid", user.toInt());
   query.bindValue(":networkid", networkId.toInt());
   query.bindValue(":buffertype", (int)type);
@@ -492,7 +494,9 @@ void SqliteStorage::createBuffer(UserId user, const NetworkId &networkId, Buffer
 }
 
 BufferInfo SqliteStorage::getBufferInfo(UserId user, const NetworkId &networkId, BufferInfo::Type type, const QString &buffer) {
-  QSqlQuery &query = cachedQuery("select_bufferByName");
+  // QSqlQuery &query = cachedQuery("select_bufferByName");
+  QSqlQuery query(logDb());
+  query.prepare(queryString("select_bufferByName"));
   query.bindValue(":networkid", networkId.toInt());
   query.bindValue(":userid", user.toInt());
   query.bindValue(":buffercname", buffer.toLower());
@@ -632,7 +636,10 @@ BufferId SqliteStorage::renameBuffer(const UserId &user, const NetworkId &networ
 }
 
 void SqliteStorage::setBufferLastSeenMsg(UserId user, const BufferId &bufferId, const MsgId &msgId) {
-  QSqlQuery &query = cachedQuery("update_buffer_lastseen");
+  // QSqlQuery &query = cachedQuery("update_buffer_lastseen");
+  QSqlQuery query(logDb());
+  query.prepare(queryString("update_buffer_lastseen"));
+
   query.bindValue(":userid", user.toInt());
   query.bindValue(":bufferid", bufferId.toInt());
   query.bindValue(":lastseenmsgid", msgId.toInt());
@@ -656,7 +663,10 @@ QHash<BufferId, MsgId> SqliteStorage::bufferLastSeenMsgIds(UserId user) {
 }
 
 MsgId SqliteStorage::logMessage(Message msg) {
-  QSqlQuery &logMessageQuery = cachedQuery("insert_message");
+  // QSqlQuery &logMessageQuery = cachedQuery("insert_message");
+  QSqlQuery logMessageQuery(logDb());
+  logMessageQuery.prepare(queryString("insert_message"));
+
   logMessageQuery.bindValue(":time", msg.timestamp().toTime_t());
   logMessageQuery.bindValue(":bufferid", msg.bufferInfo().bufferId().toInt());
   logMessageQuery.bindValue(":type", msg.type());
@@ -668,7 +678,9 @@ MsgId SqliteStorage::logMessage(Message msg) {
   if(logMessageQuery.lastError().isValid()) {
     // constraint violation - must be NOT NULL constraint - probably the sender is missing...
     if(logMessageQuery.lastError().number() == 19) {
-      QSqlQuery &addSenderQuery = cachedQuery("insert_sender");
+      // QSqlQuery &addSenderQuery = cachedQuery("insert_sender");
+      QSqlQuery addSenderQuery(logDb());
+      addSenderQuery.prepare(queryString("insert_sender"));
       addSenderQuery.bindValue(":sender", msg.sender());
       addSenderQuery.exec();
       watchQuery(addSenderQuery);
@@ -696,7 +708,10 @@ QList<Message> SqliteStorage::requestMsgs(UserId user, BufferId bufferId, int la
     offset = 0;
   } else {
     // we have to determine the real offset first
-    QSqlQuery &offsetQuery = cachedQuery("select_messagesOffset");
+    // QSqlQuery &offsetQuery = cachedQuery("select_messagesOffset");
+    QSqlQuery offsetQuery(logDb());
+    offsetQuery.prepare(queryString("select_messagesOffset"));
+
     offsetQuery.bindValue(":bufferid", bufferId.toInt());
     offsetQuery.bindValue(":messageid", offset);
     offsetQuery.exec();
@@ -705,7 +720,10 @@ QList<Message> SqliteStorage::requestMsgs(UserId user, BufferId bufferId, int la
   }
 
   // now let's select the messages
-  QSqlQuery &msgQuery = cachedQuery("select_messages");
+  // QSqlQuery &msgQuery = cachedQuery("select_messages");
+  QSqlQuery msgQuery(logDb());
+  msgQuery.prepare(queryString("select_messages"));
+
   msgQuery.bindValue(":bufferid", bufferId.toInt());
   msgQuery.bindValue(":limit", lastmsgs);
   msgQuery.bindValue(":offset", offset);
@@ -735,7 +753,10 @@ QList<Message> SqliteStorage::requestMsgs(UserId user, BufferId bufferId, QDateT
     return messagelist;
 
   // we have to determine the real offset first
-  QSqlQuery &offsetQuery = cachedQuery("select_messagesSinceOffset");
+  // QSqlQuery &offsetQuery = cachedQuery("select_messagesSinceOffset");
+  QSqlQuery offsetQuery(logDb());
+  offsetQuery.prepare(queryString("select_messagesSinceOffset"));
+
   offsetQuery.bindValue(":bufferid", bufferId.toInt());
   offsetQuery.bindValue(":since", since.toTime_t());
   offsetQuery.exec();
@@ -743,7 +764,9 @@ QList<Message> SqliteStorage::requestMsgs(UserId user, BufferId bufferId, QDateT
   offset = offsetQuery.value(0).toInt();
 
   // now let's select the messages
-  QSqlQuery &msgQuery = cachedQuery("select_messagesSince");
+  // QSqlQuery &msgQuery = cachedQuery("select_messagesSince");
+  QSqlQuery msgQuery(logDb());
+  msgQuery.prepare(queryString("select_messagesSince"));
   msgQuery.bindValue(":bufferid", bufferId.toInt());
   msgQuery.bindValue(":since", since.toTime_t());
   msgQuery.bindValue(":offset", offset);
@@ -773,7 +796,9 @@ QList<Message> SqliteStorage::requestMsgRange(UserId user, BufferId bufferId, in
   if(!bufferInfo.isValid())
     return messagelist;
 
-  QSqlQuery &rangeQuery = cachedQuery("select_messageRange");
+  // QSqlQuery &rangeQuery = cachedQuery("select_messageRange");
+  QSqlQuery rangeQuery(logDb());
+  rangeQuery.prepare(queryString("select_messageRange"));
   rangeQuery.bindValue(":bufferid", bufferId.toInt());
   rangeQuery.bindValue(":firstmsg", first);
   rangeQuery.bindValue(":lastmsg", last);