first version of lockless storage backend (WIP with lots of debug output)
[quassel.git] / src / core / abstractsqlstorage.cpp
index 6d850d8..467b226 100644 (file)
 
 #include "logger.h"
 
+#include <QMutexLocker>
 #include <QSqlError>
 #include <QSqlQuery>
 
 AbstractSqlStorage::AbstractSqlStorage(QObject *parent)
   : Storage(parent),
-    _schemaVersion(0)
+    _schemaVersion(0),
+    _nextConnectionId(0)
 {
 }
 
 AbstractSqlStorage::~AbstractSqlStorage() {
-  QHash<QPair<QString, int>, QSqlQuery *>::iterator iter = _queryCache.begin();
-  while(iter != _queryCache.end()) {
-    delete *iter;
-    iter = _queryCache.erase(iter);
+  // disconnect the connections, so their deletion is no longer interessting for us
+  QHash<QThread *, Connection *>::iterator conIter;
+  for(conIter = _connectionPool.begin(); conIter != _connectionPool.end(); conIter++) {
+    disconnect(conIter.value(), 0, this, 0);
   }
-  
-  {
-    QSqlDatabase db = QSqlDatabase::database("quassel_connection");
-    db.commit();
-    db.close();
-  }
-  QSqlDatabase::removeDatabase("quassel_connection");  
 }
 
 QSqlDatabase AbstractSqlStorage::logDb() {
-  QSqlDatabase db = QSqlDatabase::database("quassel_connection");
-  if(db.isValid() && db.isOpen())
-    return db;
-
-  if(!openDb()) {
-    qWarning() << "Unable to Open Database" << displayName();
-    qWarning() << "-" << db.lastError().text();
-  }
+  if(!_connectionPool.contains(QThread::currentThread()))
+    addConnectionToPool();
 
-  return QSqlDatabase::database("quassel_connection");
+  qDebug() << "using logDb" << _connectionPool[QThread::currentThread()]->name() << QThread::currentThread();
+  return QSqlDatabase::database(_connectionPool[QThread::currentThread()]->name());
 }
 
-bool AbstractSqlStorage::openDb() {
-  QSqlDatabase db = QSqlDatabase::database("quassel_connection");
-  if(db.isValid() && !db.isOpen())
-    return db.open();
+void AbstractSqlStorage::addConnectionToPool() {
+  QMutexLocker locker(&_connectionPoolMutex);
+  // we have to recheck if the connection pool already contains a connection for
+  // this thread. Since now (after the lock) we can only tell for sure
+  if(_connectionPool.contains(QThread::currentThread()))
+    return;
+
+  QThread *currentThread = QThread::currentThread();
+
+  int connectionId = _nextConnectionId++;
 
-  db = QSqlDatabase::addDatabase(driverName(), "quassel_connection");
+  Connection *connection = new Connection(QLatin1String(QString("quassel_connection_%1").arg(connectionId).toLatin1()), this);
+  qDebug() << "new connection" << connection->name() << currentThread << QLatin1String(QString("quassel_connection_%1").arg(connectionId).toLatin1());
+  connection->moveToThread(currentThread);
+  connect(this, SIGNAL(syncCachedQueries()), connection, SLOT(syncCachedQueries()));
+  connect(this, SIGNAL(destroyed()), connection, SLOT(deleteLater()));
+  connect(currentThread, SIGNAL(destroyed()), connection, SLOT(deleteLater()));
+  connect(connection, SIGNAL(destroyed()), this, SLOT(connectionDestroyed()));
+  _connectionPool[currentThread] = connection;
+
+  QSqlDatabase db = QSqlDatabase::addDatabase(driverName(), connection->name());
   db.setDatabaseName(databaseName());
 
   if(!hostName().isEmpty())
@@ -75,7 +80,10 @@ bool AbstractSqlStorage::openDb() {
     db.setPassword(password());
   }
 
-  return db.open();
+  if(!db.open()) {
+    qWarning() << "Unable to open database" << displayName() << "for thread" << QThread::currentThread();
+    qWarning() << "-" << db.lastError().text();
+  }
 }
 
 bool AbstractSqlStorage::init(const QVariantMap &settings) {
@@ -105,13 +113,7 @@ bool AbstractSqlStorage::init(const QVariantMap &settings) {
 }
 
 void AbstractSqlStorage::sync() {
-  QHash<QPair<QString, int>, QSqlQuery *>::iterator iter = _queryCache.begin();
-  while(iter != _queryCache.end()) {
-    delete *iter;
-    iter = _queryCache.erase(iter);
-  }
-
-  logDb().commit();
+  emit syncCachedQueries();
 }
 
 QString AbstractSqlStorage::queryString(const QString &queryName, int version) {
@@ -134,13 +136,9 @@ QString AbstractSqlStorage::queryString(const QString &queryName, int version) {
 }
 
 QSqlQuery &AbstractSqlStorage::cachedQuery(const QString &queryName, int version) {
-  QPair<QString, int> queryId = qMakePair(queryName, version);
-  if(!_queryCache.contains(queryId)) {
-    QSqlQuery *query = new QSqlQuery(logDb());
-    query->prepare(queryString(queryName, version));
-    _queryCache[queryId] = query;
-  }
-  return *(_queryCache[queryId]);
+  Q_ASSERT(_connectionPool.contains(QThread::currentThread()));
+  qDebug() << "cached query" << queryName << "using" << _connectionPool[QThread::currentThread()]->name() << QThread::currentThread();
+  return _connectionPool[QThread::currentThread()]->cachedQuery(queryName, version);
 }
 
 QStringList AbstractSqlStorage::setupQueries() {
@@ -239,3 +237,57 @@ bool AbstractSqlStorage::watchQuery(QSqlQuery &query) {
   }
   return true;
 }
+
+void AbstractSqlStorage::connectionDestroyed() {
+  QMutexLocker locker(&_connectionPoolMutex);
+  _connectionPool.remove(sender()->thread());
+}
+
+// ========================================
+//  AbstractSqlStorage::Connection
+// ========================================
+AbstractSqlStorage::Connection::Connection(const QString &name, AbstractSqlStorage *storage, QObject *parent)
+  : QObject(parent),
+    _name(name.toLatin1()),
+    _storageEngine(storage)
+{
+}
+
+AbstractSqlStorage::Connection::~Connection() {
+  QHash<QPair<QString, int>, QSqlQuery *>::iterator iter = _queryCache.begin();
+  while(iter != _queryCache.end()) {
+    delete *iter;
+    iter = _queryCache.erase(iter);
+  }
+  {
+    QSqlDatabase db = QSqlDatabase::database(name(), false);
+    if(db.isOpen()) {
+      db.commit();
+      db.close();
+    }
+  }
+  QSqlDatabase::removeDatabase(name());
+}
+
+QSqlQuery &AbstractSqlStorage::Connection::cachedQuery(const QString &queryName, int version) {
+  QPair<QString, int> queryId = qMakePair(queryName, version);
+  if(_queryCache.contains(queryId)) {
+    return *(_queryCache[queryId]);
+  }
+
+  QSqlQuery *query = new QSqlQuery(QSqlDatabase::database(name()));
+  query->prepare(_storageEngine->queryString(queryName, version));
+  _queryCache[queryId] = query;
+  return *query;
+}
+
+void AbstractSqlStorage::Connection::syncCachedQueries() {
+  QHash<QPair<QString, int>, QSqlQuery *>::iterator iter = _queryCache.begin();
+  while(iter != _queryCache.end()) {
+    delete *iter;
+    iter = _queryCache.erase(iter);
+  }
+  QSqlDatabase db = QSqlDatabase::database(name(), false);
+  if(db.isOpen())
+    db.commit();
+}