#include "logger.h"
+#include <QMutexLocker>
#include <QSqlError>
#include <QSqlQuery>
AbstractSqlStorage::AbstractSqlStorage(QObject *parent)
: Storage(parent),
- _schemaVersion(0)
+ _schemaVersion(0),
+ _nextConnectionId(0)
{
}
AbstractSqlStorage::~AbstractSqlStorage() {
- QHash<QPair<QString, int>, QSqlQuery *>::iterator iter = _queryCache.begin();
- while(iter != _queryCache.end()) {
- delete *iter;
- iter = _queryCache.erase(iter);
+ // disconnect the connections, so their deletion is no longer interessting for us
+ QHash<QThread *, Connection *>::iterator conIter;
+ for(conIter = _connectionPool.begin(); conIter != _connectionPool.end(); conIter++) {
+ disconnect(conIter.value(), 0, this, 0);
}
-
- {
- QSqlDatabase db = QSqlDatabase::database("quassel_connection");
- db.commit();
- db.close();
- }
- QSqlDatabase::removeDatabase("quassel_connection");
}
QSqlDatabase AbstractSqlStorage::logDb() {
- QSqlDatabase db = QSqlDatabase::database("quassel_connection");
- if(db.isValid() && db.isOpen())
- return db;
-
- if(!openDb()) {
- qWarning() << "Unable to Open Database" << displayName();
- qWarning() << "-" << db.lastError().text();
- }
+ if(!_connectionPool.contains(QThread::currentThread()))
+ addConnectionToPool();
- return QSqlDatabase::database("quassel_connection");
+ qDebug() << "using logDb" << _connectionPool[QThread::currentThread()]->name() << QThread::currentThread();
+ return QSqlDatabase::database(_connectionPool[QThread::currentThread()]->name());
}
-bool AbstractSqlStorage::openDb() {
- QSqlDatabase db = QSqlDatabase::database("quassel_connection");
- if(db.isValid() && !db.isOpen())
- return db.open();
+void AbstractSqlStorage::addConnectionToPool() {
+ QMutexLocker locker(&_connectionPoolMutex);
+ // we have to recheck if the connection pool already contains a connection for
+ // this thread. Since now (after the lock) we can only tell for sure
+ if(_connectionPool.contains(QThread::currentThread()))
+ return;
+
+ QThread *currentThread = QThread::currentThread();
+
+ int connectionId = _nextConnectionId++;
- db = QSqlDatabase::addDatabase(driverName(), "quassel_connection");
+ Connection *connection = new Connection(QLatin1String(QString("quassel_connection_%1").arg(connectionId).toLatin1()), this);
+ qDebug() << "new connection" << connection->name() << currentThread << QLatin1String(QString("quassel_connection_%1").arg(connectionId).toLatin1());
+ connection->moveToThread(currentThread);
+ connect(this, SIGNAL(syncCachedQueries()), connection, SLOT(syncCachedQueries()));
+ connect(this, SIGNAL(destroyed()), connection, SLOT(deleteLater()));
+ connect(currentThread, SIGNAL(destroyed()), connection, SLOT(deleteLater()));
+ connect(connection, SIGNAL(destroyed()), this, SLOT(connectionDestroyed()));
+ _connectionPool[currentThread] = connection;
+
+ QSqlDatabase db = QSqlDatabase::addDatabase(driverName(), connection->name());
db.setDatabaseName(databaseName());
if(!hostName().isEmpty())
db.setPassword(password());
}
- return db.open();
+ if(!db.open()) {
+ qWarning() << "Unable to open database" << displayName() << "for thread" << QThread::currentThread();
+ qWarning() << "-" << db.lastError().text();
+ }
}
bool AbstractSqlStorage::init(const QVariantMap &settings) {
}
void AbstractSqlStorage::sync() {
- QHash<QPair<QString, int>, QSqlQuery *>::iterator iter = _queryCache.begin();
- while(iter != _queryCache.end()) {
- delete *iter;
- iter = _queryCache.erase(iter);
- }
-
- logDb().commit();
+ emit syncCachedQueries();
}
QString AbstractSqlStorage::queryString(const QString &queryName, int version) {
}
QSqlQuery &AbstractSqlStorage::cachedQuery(const QString &queryName, int version) {
- QPair<QString, int> queryId = qMakePair(queryName, version);
- if(!_queryCache.contains(queryId)) {
- QSqlQuery *query = new QSqlQuery(logDb());
- query->prepare(queryString(queryName, version));
- _queryCache[queryId] = query;
- }
- return *(_queryCache[queryId]);
+ Q_ASSERT(_connectionPool.contains(QThread::currentThread()));
+ qDebug() << "cached query" << queryName << "using" << _connectionPool[QThread::currentThread()]->name() << QThread::currentThread();
+ return _connectionPool[QThread::currentThread()]->cachedQuery(queryName, version);
}
QStringList AbstractSqlStorage::setupQueries() {
}
return true;
}
+
+void AbstractSqlStorage::connectionDestroyed() {
+ QMutexLocker locker(&_connectionPoolMutex);
+ _connectionPool.remove(sender()->thread());
+}
+
+// ========================================
+// AbstractSqlStorage::Connection
+// ========================================
+AbstractSqlStorage::Connection::Connection(const QString &name, AbstractSqlStorage *storage, QObject *parent)
+ : QObject(parent),
+ _name(name.toLatin1()),
+ _storageEngine(storage)
+{
+}
+
+AbstractSqlStorage::Connection::~Connection() {
+ QHash<QPair<QString, int>, QSqlQuery *>::iterator iter = _queryCache.begin();
+ while(iter != _queryCache.end()) {
+ delete *iter;
+ iter = _queryCache.erase(iter);
+ }
+ {
+ QSqlDatabase db = QSqlDatabase::database(name(), false);
+ if(db.isOpen()) {
+ db.commit();
+ db.close();
+ }
+ }
+ QSqlDatabase::removeDatabase(name());
+}
+
+QSqlQuery &AbstractSqlStorage::Connection::cachedQuery(const QString &queryName, int version) {
+ QPair<QString, int> queryId = qMakePair(queryName, version);
+ if(_queryCache.contains(queryId)) {
+ return *(_queryCache[queryId]);
+ }
+
+ QSqlQuery *query = new QSqlQuery(QSqlDatabase::database(name()));
+ query->prepare(_storageEngine->queryString(queryName, version));
+ _queryCache[queryId] = query;
+ return *query;
+}
+
+void AbstractSqlStorage::Connection::syncCachedQueries() {
+ QHash<QPair<QString, int>, QSqlQuery *>::iterator iter = _queryCache.begin();
+ while(iter != _queryCache.end()) {
+ delete *iter;
+ iter = _queryCache.erase(iter);
+ }
+ QSqlDatabase db = QSqlDatabase::database(name(), false);
+ if(db.isOpen())
+ db.commit();
+}
inline virtual QString userName() { return QString(); }
inline virtual QString password() { return QString(); }
+signals:
+ void syncCachedQueries();
+
+private slots:
+ void connectionDestroyed();
+
private:
- bool openDb();
+ void addConnectionToPool();
int _schemaVersion;
- QHash<QPair<QString, int>, QSqlQuery *> _queryCache;
+ int _nextConnectionId;
+ QMutex _connectionPoolMutex;
+ class Connection;
+ QHash<QThread *, Connection *> _connectionPool;
};
+// ========================================
+// AbstractSqlStorage::Connection
+// ========================================
+class AbstractSqlStorage::Connection : public QObject {
+ Q_OBJECT
+
+public:
+ Connection(const QString &name, AbstractSqlStorage *storage, QObject *parent = 0);
+ ~Connection();
+
+ inline QLatin1String name() const { return QLatin1String(_name); }
+ QSqlQuery &cachedQuery(const QString &queryName, int version);
+
+public slots:
+ void syncCachedQueries();
+
+private:
+ QByteArray _name;
+ QHash<QPair<QString, int>, QSqlQuery *> _queryCache;
+ AbstractSqlStorage *_storageEngine;
+};
#endif
CoreSettings s;
s.setStorageSettings(setupData);
quInfo() << qPrintable(tr("Creating admin user..."));
- mutex.lock();
+ //mutex.lock();
storage->addUser(user, password);
- mutex.unlock();
+ //mutex.unlock();
startListening(); // TODO check when we need this
return QString();
}
}
void Core::syncStorage() {
- QMutexLocker locker(&mutex);
+ // QMutexLocker locker(&mutex);
if(storage) storage->sync();
}
/*** Storage Access ***/
void Core::setUserSetting(UserId userId, const QString &settingName, const QVariant &data) {
- QMutexLocker locker(&mutex);
+ // QMutexLocker locker(&mutex);
instance()->storage->setUserSetting(userId, settingName, data);
}
QVariant Core::getUserSetting(UserId userId, const QString &settingName, const QVariant &data) {
- QMutexLocker locker(&mutex);
+ // QMutexLocker locker(&mutex);
return instance()->storage->getUserSetting(userId, settingName, data);
}
bool Core::createNetwork(UserId user, NetworkInfo &info) {
- QMutexLocker locker(&mutex);
+ // QMutexLocker locker(&mutex);
NetworkId networkId = instance()->storage->createNetwork(user, info);
if(!networkId.isValid())
return false;
}
bool Core::updateNetwork(UserId user, const NetworkInfo &info) {
- QMutexLocker locker(&mutex);
+ // QMutexLocker locker(&mutex);
return instance()->storage->updateNetwork(user, info);
}
bool Core::removeNetwork(UserId user, const NetworkId &networkId) {
- QMutexLocker locker(&mutex);
+ // QMutexLocker locker(&mutex);
return instance()->storage->removeNetwork(user, networkId);
}
QList<NetworkInfo> Core::networks(UserId user) {
- QMutexLocker locker(&mutex);
+ // QMutexLocker locker(&mutex);
return instance()->storage->networks(user);
}
NetworkId Core::networkId(UserId user, const QString &network) {
- QMutexLocker locker(&mutex);
+ // QMutexLocker locker(&mutex);
return instance()->storage->getNetworkId(user, network);
}
QList<NetworkId> Core::connectedNetworks(UserId user) {
- QMutexLocker locker(&mutex);
+ // QMutexLocker locker(&mutex);
return instance()->storage->connectedNetworks(user);
}
void Core::setNetworkConnected(UserId user, const NetworkId &networkId, bool isConnected) {
- QMutexLocker locker(&mutex);
+ // QMutexLocker locker(&mutex);
return instance()->storage->setNetworkConnected(user, networkId, isConnected);
}
QHash<QString, QString> Core::persistentChannels(UserId user, const NetworkId &networkId) {
- QMutexLocker locker(&mutex);
+ // QMutexLocker locker(&mutex);
return instance()->storage->persistentChannels(user, networkId);
}
void Core::setChannelPersistent(UserId user, const NetworkId &networkId, const QString &channel, bool isJoined) {
- QMutexLocker locker(&mutex);
+ // QMutexLocker locker(&mutex);
return instance()->storage->setChannelPersistent(user, networkId, channel, isJoined);
}
void Core::setPersistentChannelKey(UserId user, const NetworkId &networkId, const QString &channel, const QString &key) {
- QMutexLocker locker(&mutex);
+ // QMutexLocker locker(&mutex);
return instance()->storage->setPersistentChannelKey(user, networkId, channel, key);
}
BufferInfo Core::bufferInfo(UserId user, const NetworkId &networkId, BufferInfo::Type type, const QString &buffer) {
- QMutexLocker locker(&mutex);
+ // QMutexLocker locker(&mutex);
return instance()->storage->getBufferInfo(user, networkId, type, buffer);
}
BufferInfo Core::getBufferInfo(UserId user, const BufferId &bufferId) {
- QMutexLocker locker(&mutex);
+ // QMutexLocker locker(&mutex);
return instance()->storage->getBufferInfo(user, bufferId);
}
MsgId Core::storeMessage(const Message &message) {
- QMutexLocker locker(&mutex);
+ // QMutexLocker locker(&mutex);
return instance()->storage->logMessage(message);
}
QList<Message> Core::requestMsgs(UserId user, BufferId buffer, int lastmsgs, int offset) {
- QMutexLocker locker(&mutex);
+ // QMutexLocker locker(&mutex);
return instance()->storage->requestMsgs(user, buffer, lastmsgs, offset);
}
QList<Message> Core::requestMsgs(UserId user, BufferId buffer, QDateTime since, int offset) {
- QMutexLocker locker(&mutex);
+ // QMutexLocker locker(&mutex);
return instance()->storage->requestMsgs(user, buffer, since, offset);
}
QList<Message> Core::requestMsgRange(UserId user, BufferId buffer, int first, int last) {
- QMutexLocker locker(&mutex);
+ // QMutexLocker locker(&mutex);
return instance()->storage->requestMsgRange(user, buffer, first, last);
}
QList<BufferInfo> Core::requestBuffers(UserId user) {
- QMutexLocker locker(&mutex);
+ // QMutexLocker locker(&mutex);
return instance()->storage->requestBuffers(user);
}
QList<BufferId> Core::requestBufferIdsForNetwork(UserId user, NetworkId networkId) {
- QMutexLocker locker(&mutex);
+ // QMutexLocker locker(&mutex);
return instance()->storage->requestBufferIdsForNetwork(user, networkId);
}
bool Core::removeBuffer(const UserId &user, const BufferId &bufferId) {
- QMutexLocker locker(&mutex);
+ // QMutexLocker locker(&mutex);
return instance()->storage->removeBuffer(user, bufferId);
}
BufferId Core::renameBuffer(const UserId &user, const NetworkId &networkId, const QString &newName, const QString &oldName) {
- QMutexLocker locker(&mutex);
+ // QMutexLocker locker(&mutex);
return instance()->storage->renameBuffer(user, networkId, newName, oldName);
}
void Core::setBufferLastSeenMsg(UserId user, const BufferId &bufferId, const MsgId &msgId) {
- QMutexLocker locker(&mutex);
+ // QMutexLocker locker(&mutex);
return instance()->storage->setBufferLastSeenMsg(user, bufferId, msgId);
}
QHash<BufferId, MsgId> Core::bufferLastSeenMsgIds(UserId user) {
- QMutexLocker locker(&mutex);
+ // QMutexLocker locker(&mutex);
return instance()->storage->bufferLastSeenMsgIds(user);
}
SignalProxy::writeDataToDevice(socket, reply);
} else if(msg["MsgType"] == "ClientLogin") {
QVariantMap reply;
- mutex.lock();
+ // mutex.lock();
UserId uid = storage->validateUser(msg["User"].toString(), msg["Password"].toString());
- mutex.unlock();
+ // mutex.unlock();
if(uid == 0) {
reply["MsgType"] = "ClientLoginReject";
reply["Error"] = tr("<b>Invalid username or password!</b><br>The username/password combination you supplied could not be found in the database.");
setupCoreForInternalUsage();
}
- mutex.lock();
+ // mutex.lock();
UserId uid = storage->internalUser();
- mutex.unlock();
+ // mutex.unlock();
// Find or create session for validated user
SessionThread *sess;
void SqliteStorage::createBuffer(UserId user, const NetworkId &networkId, BufferInfo::Type type, const QString &buffer) {
- QSqlQuery &query = cachedQuery("insert_buffer");
+ // QSqlQuery &query = cachedQuery("insert_buffer");
+ QSqlQuery query(logDb());
+ query.prepare(queryString("insert_buffer"));
query.bindValue(":userid", user.toInt());
query.bindValue(":networkid", networkId.toInt());
query.bindValue(":buffertype", (int)type);
}
BufferInfo SqliteStorage::getBufferInfo(UserId user, const NetworkId &networkId, BufferInfo::Type type, const QString &buffer) {
- QSqlQuery &query = cachedQuery("select_bufferByName");
+ // QSqlQuery &query = cachedQuery("select_bufferByName");
+ QSqlQuery query(logDb());
+ query.prepare(queryString("select_bufferByName"));
query.bindValue(":networkid", networkId.toInt());
query.bindValue(":userid", user.toInt());
query.bindValue(":buffercname", buffer.toLower());
}
void SqliteStorage::setBufferLastSeenMsg(UserId user, const BufferId &bufferId, const MsgId &msgId) {
- QSqlQuery &query = cachedQuery("update_buffer_lastseen");
+ // QSqlQuery &query = cachedQuery("update_buffer_lastseen");
+ QSqlQuery query(logDb());
+ query.prepare(queryString("update_buffer_lastseen"));
+
query.bindValue(":userid", user.toInt());
query.bindValue(":bufferid", bufferId.toInt());
query.bindValue(":lastseenmsgid", msgId.toInt());
}
MsgId SqliteStorage::logMessage(Message msg) {
- QSqlQuery &logMessageQuery = cachedQuery("insert_message");
+ // QSqlQuery &logMessageQuery = cachedQuery("insert_message");
+ QSqlQuery logMessageQuery(logDb());
+ logMessageQuery.prepare(queryString("insert_message"));
+
logMessageQuery.bindValue(":time", msg.timestamp().toTime_t());
logMessageQuery.bindValue(":bufferid", msg.bufferInfo().bufferId().toInt());
logMessageQuery.bindValue(":type", msg.type());
if(logMessageQuery.lastError().isValid()) {
// constraint violation - must be NOT NULL constraint - probably the sender is missing...
if(logMessageQuery.lastError().number() == 19) {
- QSqlQuery &addSenderQuery = cachedQuery("insert_sender");
+ // QSqlQuery &addSenderQuery = cachedQuery("insert_sender");
+ QSqlQuery addSenderQuery(logDb());
+ addSenderQuery.prepare(queryString("insert_sender"));
addSenderQuery.bindValue(":sender", msg.sender());
addSenderQuery.exec();
watchQuery(addSenderQuery);
offset = 0;
} else {
// we have to determine the real offset first
- QSqlQuery &offsetQuery = cachedQuery("select_messagesOffset");
+ // QSqlQuery &offsetQuery = cachedQuery("select_messagesOffset");
+ QSqlQuery offsetQuery(logDb());
+ offsetQuery.prepare(queryString("select_messagesOffset"));
+
offsetQuery.bindValue(":bufferid", bufferId.toInt());
offsetQuery.bindValue(":messageid", offset);
offsetQuery.exec();
}
// now let's select the messages
- QSqlQuery &msgQuery = cachedQuery("select_messages");
+ // QSqlQuery &msgQuery = cachedQuery("select_messages");
+ QSqlQuery msgQuery(logDb());
+ msgQuery.prepare(queryString("select_messages"));
+
msgQuery.bindValue(":bufferid", bufferId.toInt());
msgQuery.bindValue(":limit", lastmsgs);
msgQuery.bindValue(":offset", offset);
return messagelist;
// we have to determine the real offset first
- QSqlQuery &offsetQuery = cachedQuery("select_messagesSinceOffset");
+ // QSqlQuery &offsetQuery = cachedQuery("select_messagesSinceOffset");
+ QSqlQuery offsetQuery(logDb());
+ offsetQuery.prepare(queryString("select_messagesSinceOffset"));
+
offsetQuery.bindValue(":bufferid", bufferId.toInt());
offsetQuery.bindValue(":since", since.toTime_t());
offsetQuery.exec();
offset = offsetQuery.value(0).toInt();
// now let's select the messages
- QSqlQuery &msgQuery = cachedQuery("select_messagesSince");
+ // QSqlQuery &msgQuery = cachedQuery("select_messagesSince");
+ QSqlQuery msgQuery(logDb());
+ msgQuery.prepare(queryString("select_messagesSince"));
msgQuery.bindValue(":bufferid", bufferId.toInt());
msgQuery.bindValue(":since", since.toTime_t());
msgQuery.bindValue(":offset", offset);
if(!bufferInfo.isValid())
return messagelist;
- QSqlQuery &rangeQuery = cachedQuery("select_messageRange");
+ // QSqlQuery &rangeQuery = cachedQuery("select_messageRange");
+ QSqlQuery rangeQuery(logDb());
+ rangeQuery.prepare(queryString("select_messageRange"));
rangeQuery.bindValue(":bufferid", bufferId.toInt());
rangeQuery.bindValue(":firstmsg", first);
rangeQuery.bindValue(":lastmsg", last);