From: Michael Marley Date: Fri, 19 Dec 2014 00:02:45 +0000 (-0500) Subject: Improve reliability of PostgreSQL connections X-Git-Tag: 0.12-beta1~48^2 X-Git-Url: https://git.quassel-irc.org/?p=quassel.git;a=commitdiff_plain;h=f11377872cde42c47b3354b8e660cb2025ceffd8 Improve reliability of PostgreSQL connections This patch adds support for detecting that a query failed due to the DB connection being down and attempts to reconnect before retrying the query. It does this for opening transactions, safeExec(), and prepareAndExecuteQuery(). --- diff --git a/src/core/postgresqlstorage.cpp b/src/core/postgresqlstorage.cpp index bb58da17..31f22b9d 100644 --- a/src/core/postgresqlstorage.cpp +++ b/src/core/postgresqlstorage.cpp @@ -152,12 +152,17 @@ void PostgreSqlStorage::setConnectionProperties(const QVariantMap &properties) int PostgreSqlStorage::installedSchemaVersion() { - QSqlQuery query = logDb().exec("SELECT value FROM coreinfo WHERE key = 'schemaversion'"); + QSqlQuery query(logDb()); + query.prepare("SELECT value FROM coreinfo WHERE key = 'schemaversion'"); + safeExec(query); + watchQuery(query); if (query.first()) return query.value(0).toInt(); // maybe it's really old... (schema version 0) - query = logDb().exec("SELECT MAX(version) FROM coreinfo"); + query.prepare("SELECT MAX(version) FROM coreinfo"); + safeExec(query); + watchQuery(query); if (query.first()) return query.value(0).toInt(); @@ -170,10 +175,10 @@ bool PostgreSqlStorage::updateSchemaVersion(int newVersion) QSqlQuery query(logDb()); query.prepare("UPDATE coreinfo SET value = :version WHERE key = 'schemaversion'"); query.bindValue(":version", newVersion); - query.exec(); + safeExec(query); bool success = true; - if (query.lastError().isValid()) { + if (!watchQuery(query)) { qCritical() << "PostgreSqlStorage::updateSchemaVersion(int): Updating schema version failed!"; success = false; } @@ -186,10 +191,10 @@ bool PostgreSqlStorage::setupSchemaVersion(int version) QSqlQuery query(logDb()); query.prepare("INSERT INTO coreinfo (key, value) VALUES ('schemaversion', :version)"); query.bindValue(":version", version); - query.exec(); + safeExec(query); bool success = true; - if (query.lastError().isValid()) { + if (!watchQuery(query)) { qCritical() << "PostgreSqlStorage::setupSchemaVersion(int): Updating schema version failed!"; success = false; } @@ -221,6 +226,7 @@ bool PostgreSqlStorage::updateUser(UserId user, const QString &password) query.bindValue(":userid", user.toInt()); query.bindValue(":password", cryptedPassword(password)); safeExec(query); + watchQuery(query); return query.numRowsAffected() != 0; } @@ -232,6 +238,7 @@ void PostgreSqlStorage::renameUser(UserId user, const QString &newName) query.bindValue(":userid", user.toInt()); query.bindValue(":username", newName); safeExec(query); + watchQuery(query); emit userRenamed(user, newName); } @@ -243,6 +250,7 @@ UserId PostgreSqlStorage::validateUser(const QString &user, const QString &passw query.bindValue(":username", user); query.bindValue(":password", cryptedPassword(password)); safeExec(query); + watchQuery(query); if (query.first()) { return query.value(0).toInt(); @@ -259,6 +267,7 @@ UserId PostgreSqlStorage::getUserId(const QString &user) query.prepare(queryString("select_userid")); query.bindValue(":username", user); safeExec(query); + watchQuery(query); if (query.first()) { return query.value(0).toInt(); @@ -274,6 +283,7 @@ UserId PostgreSqlStorage::internalUser() QSqlQuery query(logDb()); query.prepare(queryString("select_internaluser")); safeExec(query); + watchQuery(query); if (query.first()) { return query.value(0).toInt(); @@ -287,7 +297,7 @@ UserId PostgreSqlStorage::internalUser() void PostgreSqlStorage::delUser(UserId user) { QSqlDatabase db = logDb(); - if (!db.transaction()) { + if (!beginTransaction(db)) { qWarning() << "PostgreSqlStorage::delUser(): cannot start transaction!"; return; } @@ -320,6 +330,7 @@ void PostgreSqlStorage::setUserSetting(UserId userId, const QString &settingName selectQuery.bindValue(":userid", userId.toInt()); selectQuery.bindValue(":settingname", settingName); safeExec(selectQuery); + watchQuery(selectQuery); QString setQueryString; if (!selectQuery.first()) { @@ -335,6 +346,7 @@ void PostgreSqlStorage::setUserSetting(UserId userId, const QString &settingName setQuery.bindValue(":settingname", settingName); setQuery.bindValue(":settingvalue", rawData); safeExec(setQuery); + watchQuery(setQuery); } @@ -345,6 +357,7 @@ QVariant PostgreSqlStorage::getUserSetting(UserId userId, const QString &setting query.bindValue(":userid", userId.toInt()); query.bindValue(":settingname", settingName); safeExec(query); + watchQuery(query); if (query.first()) { QVariant data; @@ -365,7 +378,7 @@ IdentityId PostgreSqlStorage::createIdentity(UserId user, CoreIdentity &identity IdentityId identityId; QSqlDatabase db = logDb(); - if (!db.transaction()) { + if (!beginTransaction(db)) { qWarning() << "PostgreSqlStorage::createIdentity(): Unable to start Transaction!"; qWarning() << " -" << qPrintable(db.lastError().text()); return identityId; @@ -399,8 +412,7 @@ IdentityId PostgreSqlStorage::createIdentity(UserId user, CoreIdentity &identity query.bindValue(":sslkey", QByteArray()); #endif safeExec(query); - if (query.lastError().isValid()) { - watchQuery(query); + if (!watchQuery(query)) { db.rollback(); return IdentityId(); } @@ -410,7 +422,6 @@ IdentityId PostgreSqlStorage::createIdentity(UserId user, CoreIdentity &identity identity.setId(identityId); if (!identityId.isValid()) { - watchQuery(query); db.rollback(); return IdentityId(); } @@ -439,7 +450,7 @@ IdentityId PostgreSqlStorage::createIdentity(UserId user, CoreIdentity &identity bool PostgreSqlStorage::updateIdentity(UserId user, const CoreIdentity &identity) { QSqlDatabase db = logDb(); - if (!db.transaction()) { + if (!beginTransaction(db)) { qWarning() << "PostgreSqlStorage::updateIdentity(): Unable to start Transaction!"; qWarning() << " -" << qPrintable(db.lastError().text()); return false; @@ -450,6 +461,7 @@ bool PostgreSqlStorage::updateIdentity(UserId user, const CoreIdentity &identity checkQuery.bindValue(":identityid", identity.id().toInt()); checkQuery.bindValue(":userid", user.toInt()); safeExec(checkQuery); + watchQuery(checkQuery); // there should be exactly one identity for the given id and user if (!checkQuery.first() || checkQuery.value(0).toInt() != 1) { @@ -524,7 +536,7 @@ bool PostgreSqlStorage::updateIdentity(UserId user, const CoreIdentity &identity void PostgreSqlStorage::removeIdentity(UserId user, IdentityId identityId) { QSqlDatabase db = logDb(); - if (!db.transaction()) { + if (!beginTransaction(db)) { qWarning() << "PostgreSqlStorage::removeIdentity(): Unable to start Transaction!"; qWarning() << " -" << qPrintable(db.lastError().text()); return; @@ -563,6 +575,7 @@ QList PostgreSqlStorage::identities(UserId user) nickQuery.prepare(queryString("select_nicks")); safeExec(query); + watchQuery(query); while (query.next()) { CoreIdentity identity(IdentityId(query.value(0).toInt())); @@ -609,7 +622,7 @@ NetworkId PostgreSqlStorage::createNetwork(UserId user, const NetworkInfo &info) NetworkId networkId; QSqlDatabase db = logDb(); - if (!db.transaction()) { + if (!beginTransaction(db)) { qWarning() << "PostgreSqlStorage::createNetwork(): failed to begin transaction!"; qWarning() << " -" << qPrintable(db.lastError().text()); return false; @@ -620,8 +633,7 @@ NetworkId PostgreSqlStorage::createNetwork(UserId user, const NetworkInfo &info) query.bindValue(":userid", user.toInt()); bindNetworkInfo(query, info); safeExec(query); - if (query.lastError().isValid()) { - watchQuery(query); + if (!watchQuery(query)) { db.rollback(); return NetworkId(); } @@ -630,7 +642,6 @@ NetworkId PostgreSqlStorage::createNetwork(UserId user, const NetworkInfo &info) networkId = query.value(0).toInt(); if (!networkId.isValid()) { - watchQuery(query); db.rollback(); return NetworkId(); } @@ -701,7 +712,7 @@ void PostgreSqlStorage::bindServerInfo(QSqlQuery &query, const Network::Server & bool PostgreSqlStorage::updateNetwork(UserId user, const NetworkInfo &info) { QSqlDatabase db = logDb(); - if (!db.transaction()) { + if (!beginTransaction(db)) { qWarning() << "PostgreSqlStorage::updateNetwork(): failed to begin transaction!"; qWarning() << " -" << qPrintable(db.lastError().text()); return false; @@ -756,7 +767,7 @@ bool PostgreSqlStorage::updateNetwork(UserId user, const NetworkInfo &info) bool PostgreSqlStorage::removeNetwork(UserId user, const NetworkId &networkId) { QSqlDatabase db = logDb(); - if (!db.transaction()) { + if (!beginTransaction(db)) { qWarning() << "PostgreSqlStorage::removeNetwork(): cannot start transaction!"; qWarning() << " -" << qPrintable(db.lastError().text()); return false; @@ -1002,7 +1013,7 @@ void PostgreSqlStorage::setUserModes(UserId user, NetworkId networkId, const QSt BufferInfo PostgreSqlStorage::bufferInfo(UserId user, const NetworkId &networkId, BufferInfo::Type type, const QString &buffer, bool create) { QSqlDatabase db = logDb(); - if (!db.transaction()) { + if (!beginTransaction(db)) { qWarning() << "PostgreSqlStorage::bufferInfo(): cannot start read only transaction!"; qWarning() << " -" << qPrintable(db.lastError().text()); return BufferInfo(); @@ -1014,6 +1025,7 @@ BufferInfo PostgreSqlStorage::bufferInfo(UserId user, const NetworkId &networkId query.bindValue(":userid", user.toInt()); query.bindValue(":buffercname", buffer.toLower()); safeExec(query); + watchQuery(query); if (query.first()) { BufferInfo bufferInfo = BufferInfo(query.value(0).toInt(), networkId, (BufferInfo::Type)query.value(1).toInt(), 0, buffer); @@ -1046,9 +1058,8 @@ BufferInfo PostgreSqlStorage::bufferInfo(UserId user, const NetworkId &networkId safeExec(createQuery); - if (createQuery.lastError().isValid()) { + if (!watchQuery(createQuery)) { qWarning() << "PostgreSqlStorage::bufferInfo(): unable to create buffer"; - watchQuery(createQuery); db.rollback(); return BufferInfo(); } @@ -1135,7 +1146,7 @@ QList PostgreSqlStorage::requestBufferIdsForNetwork(UserId user, Netwo bool PostgreSqlStorage::removeBuffer(const UserId &user, const BufferId &bufferId) { QSqlDatabase db = logDb(); - if (!db.transaction()) { + if (!beginTransaction(db)) { qWarning() << "PostgreSqlStorage::removeBuffer(): cannot start transaction!"; return false; } @@ -1170,7 +1181,7 @@ bool PostgreSqlStorage::removeBuffer(const UserId &user, const BufferId &bufferI bool PostgreSqlStorage::renameBuffer(const UserId &user, const BufferId &bufferId, const QString &newName) { QSqlDatabase db = logDb(); - if (!db.transaction()) { + if (!beginTransaction(db)) { qWarning() << "PostgreSqlStorage::renameBuffer(): cannot start transaction!"; return false; } @@ -1182,8 +1193,7 @@ bool PostgreSqlStorage::renameBuffer(const UserId &user, const BufferId &bufferI query.bindValue(":userid", user.toInt()); query.bindValue(":bufferid", bufferId.toInt()); safeExec(query); - if (query.lastError().isValid()) { - watchQuery(query); + if (!watchQuery(query)) { db.rollback(); return false; } @@ -1208,7 +1218,7 @@ bool PostgreSqlStorage::renameBuffer(const UserId &user, const BufferId &bufferI bool PostgreSqlStorage::mergeBuffersPermanently(const UserId &user, const BufferId &bufferId1, const BufferId &bufferId2) { QSqlDatabase db = logDb(); - if (!db.transaction()) { + if (!beginTransaction(db)) { qWarning() << "PostgreSqlStorage::mergeBuffersPermanently(): cannot start transaction!"; qWarning() << " -" << qPrintable(db.lastError().text()); return false; @@ -1343,7 +1353,7 @@ QHash PostgreSqlStorage::bufferMarkerLineMsgIds(UserId user) bool PostgreSqlStorage::logMessage(Message &msg) { QSqlDatabase db = logDb(); - if (!db.transaction()) { + if (!beginTransaction(db)) { qWarning() << "PostgreSqlStorage::logMessage(): cannot start transaction!"; qWarning() << " -" << qPrintable(db.lastError().text()); return false; @@ -1362,7 +1372,9 @@ bool PostgreSqlStorage::logMessage(Message &msg) if (addSenderQuery.lastError().isValid()) { rollbackSavePoint("sender_sp1", db); - getSenderIdQuery = db.exec(getSenderIdQuery.lastQuery()); + getSenderIdQuery.prepare(getSenderIdQuery.lastQuery()); + safeExec(getSenderIdQuery); + watchQuery(getSenderIdQuery); getSenderIdQuery.first(); senderId = getSenderIdQuery.value(0).toInt(); } @@ -1403,7 +1415,7 @@ bool PostgreSqlStorage::logMessage(Message &msg) bool PostgreSqlStorage::logMessages(MessageList &msgs) { QSqlDatabase db = logDb(); - if (!db.transaction()) { + if (!beginTransaction(db)) { qWarning() << "PostgreSqlStorage::logMessage(): cannot start transaction!"; qWarning() << " -" << qPrintable(db.lastError().text()); return false; @@ -1431,7 +1443,9 @@ bool PostgreSqlStorage::logMessages(MessageList &msgs) if (addSenderQuery.lastError().isValid()) { // seems it was inserted meanwhile... by a different thread rollbackSavePoint("sender_sp", db); - selectSenderQuery = db.exec(selectSenderQuery.lastQuery()); + selectSenderQuery.prepare(selectSenderQuery.lastQuery()); + safeExec(selectSenderQuery); + watchQuery(selectSenderQuery); selectSenderQuery.first(); senderIdList << selectSenderQuery.value(0).toInt(); senderIds[sender] = selectSenderQuery.value(0).toInt(); @@ -1620,14 +1634,29 @@ QList PostgreSqlStorage::requestAllMsgs(UserId user, MsgId first, MsgId // return; // } + +bool PostgreSqlStorage::beginTransaction(QSqlDatabase &db) +{ + bool result = db.transaction(); + if (!db.isOpen()) { + db = logDb(); + result = db.transaction(); + } + return result; +} + bool PostgreSqlStorage::beginReadOnlyTransaction(QSqlDatabase &db) { QSqlQuery query = db.exec("BEGIN TRANSACTION READ ONLY"); + if (!db.isOpen()) { + db = logDb(); + query = db.exec("BEGIN TRANSACTION READ ONLY"); + } return !query.lastError().isValid(); } -QSqlQuery PostgreSqlStorage::prepareAndExecuteQuery(const QString &queryname, const QString ¶mstring, const QSqlDatabase &db) +QSqlQuery PostgreSqlStorage::prepareAndExecuteQuery(const QString &queryname, const QString ¶mstring, QSqlDatabase &db) { // Query preparing is done lazily. That means that instead of always checking if the query is already prepared // we just EXECUTE and catch the error @@ -1641,10 +1670,22 @@ QSqlQuery PostgreSqlStorage::prepareAndExecuteQuery(const QString &queryname, co query = db.exec(QString("EXECUTE quassel_%1 (%2)").arg(queryname).arg(paramstring)); } - if (db.lastError().isValid()) { - // and once again: Qt leaves us without error codes so we either parse (language dependant(!)) strings + if (!db.isOpen() || db.lastError().isValid()) { + // If the query failed because the DB connection was down, reopen the connection and start a new transaction. + if (!db.isOpen()) { + db = logDb(); + if (!beginTransaction(db)) { + qWarning() << "PostgreSqlStorage::prepareAndExecuteQuery(): cannot start transaction while recovering from connection loss!"; + qWarning() << " -" << qPrintable(db.lastError().text()); + return query; + } + db.exec("SAVEPOINT quassel_prepare_query"); + } else { + db.exec("ROLLBACK TO SAVEPOINT quassel_prepare_query"); + } + + // and once again: Qt leaves us without error codes so we either parse (language dependent(!)) strings // or we just guess the error. As we're only interested in unprepared queries, this will be our guess. :) - db.exec("ROLLBACK TO SAVEPOINT quassel_prepare_query"); QSqlQuery checkQuery = db.exec(QString("SELECT count(name) FROM pg_prepared_statements WHERE name = 'quassel_%1' AND from_sql = TRUE").arg(queryname.toLower())); checkQuery.first(); if (checkQuery.value(0).toInt() == 0) { @@ -1655,7 +1696,7 @@ QSqlQuery PostgreSqlStorage::prepareAndExecuteQuery(const QString &queryname, co return QSqlQuery(db); } } - // we alwas execute the query again, even if the query was already prepared. + // we always execute the query again, even if the query was already prepared. // this ensures, that the error is properly propagated to the calling function // (otherwise the last call would be the testing select to pg_prepared_statements // which always gives a proper result and the error would be lost) @@ -1674,7 +1715,7 @@ QSqlQuery PostgreSqlStorage::prepareAndExecuteQuery(const QString &queryname, co } -QSqlQuery PostgreSqlStorage::executePreparedQuery(const QString &queryname, const QVariantList ¶ms, const QSqlDatabase &db) +QSqlQuery PostgreSqlStorage::executePreparedQuery(const QString &queryname, const QVariantList ¶ms, QSqlDatabase &db) { QSqlDriver *driver = db.driver(); @@ -1700,7 +1741,7 @@ QSqlQuery PostgreSqlStorage::executePreparedQuery(const QString &queryname, cons } -QSqlQuery PostgreSqlStorage::executePreparedQuery(const QString &queryname, const QVariant ¶m, const QSqlDatabase &db) +QSqlQuery PostgreSqlStorage::executePreparedQuery(const QString &queryname, const QVariant ¶m, QSqlDatabase &db) { QSqlField field; field.setType(param.type()); @@ -1720,6 +1761,26 @@ void PostgreSqlStorage::deallocateQuery(const QString &queryname, const QSqlData } +void PostgreSqlStorage::safeExec(QSqlQuery &query) +{ + // If the query fails due to the connection being gone, it seems to cause + // exec() to return false but no lastError to be set + if(!query.exec() && !query.lastError().isValid()) + { + QSqlDatabase db = logDb(); + QSqlQuery retryQuery(db); + retryQuery.prepare(query.lastQuery()); + QMapIterator i(query.boundValues()); + while (i.hasNext()) + { + i.next(); + retryQuery.bindValue(i.key(),i.value()); + } + query = retryQuery; + query.exec(); + } +} + // ======================================== // PostgreSqlMigrationWriter // ======================================== diff --git a/src/core/postgresqlstorage.h b/src/core/postgresqlstorage.h index 0d172ec1..73b16dcc 100644 --- a/src/core/postgresqlstorage.h +++ b/src/core/postgresqlstorage.h @@ -116,10 +116,11 @@ protected: virtual bool setupSchemaVersion(int version); void safeExec(QSqlQuery &query); + bool beginTransaction(QSqlDatabase &db); bool beginReadOnlyTransaction(QSqlDatabase &db); - QSqlQuery executePreparedQuery(const QString &queryname, const QVariantList ¶ms, const QSqlDatabase &db); - QSqlQuery executePreparedQuery(const QString &queryname, const QVariant ¶m, const QSqlDatabase &db); + QSqlQuery executePreparedQuery(const QString &queryname, const QVariantList ¶ms, QSqlDatabase &db); + QSqlQuery executePreparedQuery(const QString &queryname, const QVariant ¶m, QSqlDatabase &db); void deallocateQuery(const QString &queryname, const QSqlDatabase &db); inline void savePoint(const QString &handle, const QSqlDatabase &db) { db.exec(QString("SAVEPOINT %1").arg(handle)); } @@ -129,8 +130,8 @@ protected: private: void bindNetworkInfo(QSqlQuery &query, const NetworkInfo &info); void bindServerInfo(QSqlQuery &query, const Network::Server &server); - QSqlQuery prepareAndExecuteQuery(const QString &queryname, const QString ¶mstring, const QSqlDatabase &db); - inline QSqlQuery prepareAndExecuteQuery(const QString &queryname, const QSqlDatabase &db) { return prepareAndExecuteQuery(queryname, QString(), db); } + QSqlQuery prepareAndExecuteQuery(const QString &queryname, const QString ¶mstring, QSqlDatabase &db); + inline QSqlQuery prepareAndExecuteQuery(const QString &queryname, QSqlDatabase &db) { return prepareAndExecuteQuery(queryname, QString(), db); } QString _hostName; int _port; @@ -140,8 +141,6 @@ private: }; -inline void PostgreSqlStorage::safeExec(QSqlQuery &query) { query.exec(); } - // ======================================== // PostgreSqlMigration // ========================================