Improve reliability of PostgreSQL connections 103/head
authorMichael Marley <michael@michaelmarley.com>
Fri, 19 Dec 2014 00:02:45 +0000 (19:02 -0500)
committerMichael Marley <michael@michaelmarley.com>
Fri, 19 Dec 2014 02:58:01 +0000 (21:58 -0500)
This patch adds support for detecting that a query failed due to
the DB connection being down and attempts to reconnect before
retrying the query.  It does this for opening transactions,
safeExec(), and prepareAndExecuteQuery().

src/core/postgresqlstorage.cpp
src/core/postgresqlstorage.h

index bb58da1..31f22b9 100644 (file)
@@ -152,12 +152,17 @@ void PostgreSqlStorage::setConnectionProperties(const QVariantMap &properties)
 
 int PostgreSqlStorage::installedSchemaVersion()
 {
-    QSqlQuery query = logDb().exec("SELECT value FROM coreinfo WHERE key = 'schemaversion'");
+    QSqlQuery query(logDb());
+    query.prepare("SELECT value FROM coreinfo WHERE key = 'schemaversion'");
+    safeExec(query);
+    watchQuery(query);
     if (query.first())
         return query.value(0).toInt();
 
     // maybe it's really old... (schema version 0)
-    query = logDb().exec("SELECT MAX(version) FROM coreinfo");
+    query.prepare("SELECT MAX(version) FROM coreinfo");
+    safeExec(query);
+    watchQuery(query);
     if (query.first())
         return query.value(0).toInt();
 
@@ -170,10 +175,10 @@ bool PostgreSqlStorage::updateSchemaVersion(int newVersion)
     QSqlQuery query(logDb());
     query.prepare("UPDATE coreinfo SET value = :version WHERE key = 'schemaversion'");
     query.bindValue(":version", newVersion);
-    query.exec();
+    safeExec(query);
 
     bool success = true;
-    if (query.lastError().isValid()) {
+    if (!watchQuery(query)) {
         qCritical() << "PostgreSqlStorage::updateSchemaVersion(int): Updating schema version failed!";
         success = false;
     }
@@ -186,10 +191,10 @@ bool PostgreSqlStorage::setupSchemaVersion(int version)
     QSqlQuery query(logDb());
     query.prepare("INSERT INTO coreinfo (key, value) VALUES ('schemaversion', :version)");
     query.bindValue(":version", version);
-    query.exec();
+    safeExec(query);
 
     bool success = true;
-    if (query.lastError().isValid()) {
+    if (!watchQuery(query)) {
         qCritical() << "PostgreSqlStorage::setupSchemaVersion(int): Updating schema version failed!";
         success = false;
     }
@@ -221,6 +226,7 @@ bool PostgreSqlStorage::updateUser(UserId user, const QString &password)
     query.bindValue(":userid", user.toInt());
     query.bindValue(":password", cryptedPassword(password));
     safeExec(query);
+    watchQuery(query);
     return query.numRowsAffected() != 0;
 }
 
@@ -232,6 +238,7 @@ void PostgreSqlStorage::renameUser(UserId user, const QString &newName)
     query.bindValue(":userid", user.toInt());
     query.bindValue(":username", newName);
     safeExec(query);
+    watchQuery(query);
     emit userRenamed(user, newName);
 }
 
@@ -243,6 +250,7 @@ UserId PostgreSqlStorage::validateUser(const QString &user, const QString &passw
     query.bindValue(":username", user);
     query.bindValue(":password", cryptedPassword(password));
     safeExec(query);
+    watchQuery(query);
 
     if (query.first()) {
         return query.value(0).toInt();
@@ -259,6 +267,7 @@ UserId PostgreSqlStorage::getUserId(const QString &user)
     query.prepare(queryString("select_userid"));
     query.bindValue(":username", user);
     safeExec(query);
+    watchQuery(query);
 
     if (query.first()) {
         return query.value(0).toInt();
@@ -274,6 +283,7 @@ UserId PostgreSqlStorage::internalUser()
     QSqlQuery query(logDb());
     query.prepare(queryString("select_internaluser"));
     safeExec(query);
+    watchQuery(query);
 
     if (query.first()) {
         return query.value(0).toInt();
@@ -287,7 +297,7 @@ UserId PostgreSqlStorage::internalUser()
 void PostgreSqlStorage::delUser(UserId user)
 {
     QSqlDatabase db = logDb();
-    if (!db.transaction()) {
+    if (!beginTransaction(db)) {
         qWarning() << "PostgreSqlStorage::delUser(): cannot start transaction!";
         return;
     }
@@ -320,6 +330,7 @@ void PostgreSqlStorage::setUserSetting(UserId userId, const QString &settingName
     selectQuery.bindValue(":userid", userId.toInt());
     selectQuery.bindValue(":settingname", settingName);
     safeExec(selectQuery);
+    watchQuery(selectQuery);
 
     QString setQueryString;
     if (!selectQuery.first()) {
@@ -335,6 +346,7 @@ void PostgreSqlStorage::setUserSetting(UserId userId, const QString &settingName
     setQuery.bindValue(":settingname", settingName);
     setQuery.bindValue(":settingvalue", rawData);
     safeExec(setQuery);
+    watchQuery(setQuery);
 }
 
 
@@ -345,6 +357,7 @@ QVariant PostgreSqlStorage::getUserSetting(UserId userId, const QString &setting
     query.bindValue(":userid", userId.toInt());
     query.bindValue(":settingname", settingName);
     safeExec(query);
+    watchQuery(query);
 
     if (query.first()) {
         QVariant data;
@@ -365,7 +378,7 @@ IdentityId PostgreSqlStorage::createIdentity(UserId user, CoreIdentity &identity
     IdentityId identityId;
 
     QSqlDatabase db = logDb();
-    if (!db.transaction()) {
+    if (!beginTransaction(db)) {
         qWarning() << "PostgreSqlStorage::createIdentity(): Unable to start Transaction!";
         qWarning() << " -" << qPrintable(db.lastError().text());
         return identityId;
@@ -399,8 +412,7 @@ IdentityId PostgreSqlStorage::createIdentity(UserId user, CoreIdentity &identity
     query.bindValue(":sslkey", QByteArray());
 #endif
     safeExec(query);
-    if (query.lastError().isValid()) {
-        watchQuery(query);
+    if (!watchQuery(query)) {
         db.rollback();
         return IdentityId();
     }
@@ -410,7 +422,6 @@ IdentityId PostgreSqlStorage::createIdentity(UserId user, CoreIdentity &identity
     identity.setId(identityId);
 
     if (!identityId.isValid()) {
-        watchQuery(query);
         db.rollback();
         return IdentityId();
     }
@@ -439,7 +450,7 @@ IdentityId PostgreSqlStorage::createIdentity(UserId user, CoreIdentity &identity
 bool PostgreSqlStorage::updateIdentity(UserId user, const CoreIdentity &identity)
 {
     QSqlDatabase db = logDb();
-    if (!db.transaction()) {
+    if (!beginTransaction(db)) {
         qWarning() << "PostgreSqlStorage::updateIdentity(): Unable to start Transaction!";
         qWarning() << " -" << qPrintable(db.lastError().text());
         return false;
@@ -450,6 +461,7 @@ bool PostgreSqlStorage::updateIdentity(UserId user, const CoreIdentity &identity
     checkQuery.bindValue(":identityid", identity.id().toInt());
     checkQuery.bindValue(":userid", user.toInt());
     safeExec(checkQuery);
+    watchQuery(checkQuery);
 
     // there should be exactly one identity for the given id and user
     if (!checkQuery.first() || checkQuery.value(0).toInt() != 1) {
@@ -524,7 +536,7 @@ bool PostgreSqlStorage::updateIdentity(UserId user, const CoreIdentity &identity
 void PostgreSqlStorage::removeIdentity(UserId user, IdentityId identityId)
 {
     QSqlDatabase db = logDb();
-    if (!db.transaction()) {
+    if (!beginTransaction(db)) {
         qWarning() << "PostgreSqlStorage::removeIdentity(): Unable to start Transaction!";
         qWarning() << " -" << qPrintable(db.lastError().text());
         return;
@@ -563,6 +575,7 @@ QList<CoreIdentity> PostgreSqlStorage::identities(UserId user)
     nickQuery.prepare(queryString("select_nicks"));
 
     safeExec(query);
+    watchQuery(query);
 
     while (query.next()) {
         CoreIdentity identity(IdentityId(query.value(0).toInt()));
@@ -609,7 +622,7 @@ NetworkId PostgreSqlStorage::createNetwork(UserId user, const NetworkInfo &info)
     NetworkId networkId;
 
     QSqlDatabase db = logDb();
-    if (!db.transaction()) {
+    if (!beginTransaction(db)) {
         qWarning() << "PostgreSqlStorage::createNetwork(): failed to begin transaction!";
         qWarning() << " -" << qPrintable(db.lastError().text());
         return false;
@@ -620,8 +633,7 @@ NetworkId PostgreSqlStorage::createNetwork(UserId user, const NetworkInfo &info)
     query.bindValue(":userid", user.toInt());
     bindNetworkInfo(query, info);
     safeExec(query);
-    if (query.lastError().isValid()) {
-        watchQuery(query);
+    if (!watchQuery(query)) {
         db.rollback();
         return NetworkId();
     }
@@ -630,7 +642,6 @@ NetworkId PostgreSqlStorage::createNetwork(UserId user, const NetworkInfo &info)
     networkId = query.value(0).toInt();
 
     if (!networkId.isValid()) {
-        watchQuery(query);
         db.rollback();
         return NetworkId();
     }
@@ -701,7 +712,7 @@ void PostgreSqlStorage::bindServerInfo(QSqlQuery &query, const Network::Server &
 bool PostgreSqlStorage::updateNetwork(UserId user, const NetworkInfo &info)
 {
     QSqlDatabase db = logDb();
-    if (!db.transaction()) {
+    if (!beginTransaction(db)) {
         qWarning() << "PostgreSqlStorage::updateNetwork(): failed to begin transaction!";
         qWarning() << " -" << qPrintable(db.lastError().text());
         return false;
@@ -756,7 +767,7 @@ bool PostgreSqlStorage::updateNetwork(UserId user, const NetworkInfo &info)
 bool PostgreSqlStorage::removeNetwork(UserId user, const NetworkId &networkId)
 {
     QSqlDatabase db = logDb();
-    if (!db.transaction()) {
+    if (!beginTransaction(db)) {
         qWarning() << "PostgreSqlStorage::removeNetwork(): cannot start transaction!";
         qWarning() << " -" << qPrintable(db.lastError().text());
         return false;
@@ -1002,7 +1013,7 @@ void PostgreSqlStorage::setUserModes(UserId user, NetworkId networkId, const QSt
 BufferInfo PostgreSqlStorage::bufferInfo(UserId user, const NetworkId &networkId, BufferInfo::Type type, const QString &buffer, bool create)
 {
     QSqlDatabase db = logDb();
-    if (!db.transaction()) {
+    if (!beginTransaction(db)) {
         qWarning() << "PostgreSqlStorage::bufferInfo(): cannot start read only transaction!";
         qWarning() << " -" << qPrintable(db.lastError().text());
         return BufferInfo();
@@ -1014,6 +1025,7 @@ BufferInfo PostgreSqlStorage::bufferInfo(UserId user, const NetworkId &networkId
     query.bindValue(":userid", user.toInt());
     query.bindValue(":buffercname", buffer.toLower());
     safeExec(query);
+    watchQuery(query);
 
     if (query.first()) {
         BufferInfo bufferInfo = BufferInfo(query.value(0).toInt(), networkId, (BufferInfo::Type)query.value(1).toInt(), 0, buffer);
@@ -1046,9 +1058,8 @@ BufferInfo PostgreSqlStorage::bufferInfo(UserId user, const NetworkId &networkId
 
     safeExec(createQuery);
 
-    if (createQuery.lastError().isValid()) {
+    if (!watchQuery(createQuery)) {
         qWarning() << "PostgreSqlStorage::bufferInfo(): unable to create buffer";
-        watchQuery(createQuery);
         db.rollback();
         return BufferInfo();
     }
@@ -1135,7 +1146,7 @@ QList<BufferId> PostgreSqlStorage::requestBufferIdsForNetwork(UserId user, Netwo
 bool PostgreSqlStorage::removeBuffer(const UserId &user, const BufferId &bufferId)
 {
     QSqlDatabase db = logDb();
-    if (!db.transaction()) {
+    if (!beginTransaction(db)) {
         qWarning() << "PostgreSqlStorage::removeBuffer(): cannot start transaction!";
         return false;
     }
@@ -1170,7 +1181,7 @@ bool PostgreSqlStorage::removeBuffer(const UserId &user, const BufferId &bufferI
 bool PostgreSqlStorage::renameBuffer(const UserId &user, const BufferId &bufferId, const QString &newName)
 {
     QSqlDatabase db = logDb();
-    if (!db.transaction()) {
+    if (!beginTransaction(db)) {
         qWarning() << "PostgreSqlStorage::renameBuffer(): cannot start transaction!";
         return false;
     }
@@ -1182,8 +1193,7 @@ bool PostgreSqlStorage::renameBuffer(const UserId &user, const BufferId &bufferI
     query.bindValue(":userid", user.toInt());
     query.bindValue(":bufferid", bufferId.toInt());
     safeExec(query);
-    if (query.lastError().isValid()) {
-        watchQuery(query);
+    if (!watchQuery(query)) {
         db.rollback();
         return false;
     }
@@ -1208,7 +1218,7 @@ bool PostgreSqlStorage::renameBuffer(const UserId &user, const BufferId &bufferI
 bool PostgreSqlStorage::mergeBuffersPermanently(const UserId &user, const BufferId &bufferId1, const BufferId &bufferId2)
 {
     QSqlDatabase db = logDb();
-    if (!db.transaction()) {
+    if (!beginTransaction(db)) {
         qWarning() << "PostgreSqlStorage::mergeBuffersPermanently(): cannot start transaction!";
         qWarning() << " -" << qPrintable(db.lastError().text());
         return false;
@@ -1343,7 +1353,7 @@ QHash<BufferId, MsgId> PostgreSqlStorage::bufferMarkerLineMsgIds(UserId user)
 bool PostgreSqlStorage::logMessage(Message &msg)
 {
     QSqlDatabase db = logDb();
-    if (!db.transaction()) {
+    if (!beginTransaction(db)) {
         qWarning() << "PostgreSqlStorage::logMessage(): cannot start transaction!";
         qWarning() << " -" << qPrintable(db.lastError().text());
         return false;
@@ -1362,7 +1372,9 @@ bool PostgreSqlStorage::logMessage(Message &msg)
 
         if (addSenderQuery.lastError().isValid()) {
             rollbackSavePoint("sender_sp1", db);
-            getSenderIdQuery = db.exec(getSenderIdQuery.lastQuery());
+            getSenderIdQuery.prepare(getSenderIdQuery.lastQuery());
+            safeExec(getSenderIdQuery);
+            watchQuery(getSenderIdQuery);
             getSenderIdQuery.first();
             senderId = getSenderIdQuery.value(0).toInt();
         }
@@ -1403,7 +1415,7 @@ bool PostgreSqlStorage::logMessage(Message &msg)
 bool PostgreSqlStorage::logMessages(MessageList &msgs)
 {
     QSqlDatabase db = logDb();
-    if (!db.transaction()) {
+    if (!beginTransaction(db)) {
         qWarning() << "PostgreSqlStorage::logMessage(): cannot start transaction!";
         qWarning() << " -" << qPrintable(db.lastError().text());
         return false;
@@ -1431,7 +1443,9 @@ bool PostgreSqlStorage::logMessages(MessageList &msgs)
             if (addSenderQuery.lastError().isValid()) {
                 // seems it was inserted meanwhile... by a different thread
                 rollbackSavePoint("sender_sp", db);
-                selectSenderQuery = db.exec(selectSenderQuery.lastQuery());
+                selectSenderQuery.prepare(selectSenderQuery.lastQuery());
+                safeExec(selectSenderQuery);
+                watchQuery(selectSenderQuery);
                 selectSenderQuery.first();
                 senderIdList << selectSenderQuery.value(0).toInt();
                 senderIds[sender] = selectSenderQuery.value(0).toInt();
@@ -1620,14 +1634,29 @@ QList<Message> PostgreSqlStorage::requestAllMsgs(UserId user, MsgId first, MsgId
 //   return;
 // }
 
+
+bool PostgreSqlStorage::beginTransaction(QSqlDatabase &db)
+{
+    bool result = db.transaction();
+    if (!db.isOpen()) {
+        db = logDb();
+        result = db.transaction();
+    }
+    return result;
+}
+
 bool PostgreSqlStorage::beginReadOnlyTransaction(QSqlDatabase &db)
 {
     QSqlQuery query = db.exec("BEGIN TRANSACTION READ ONLY");
+    if (!db.isOpen()) {
+        db = logDb();
+        query = db.exec("BEGIN TRANSACTION READ ONLY");
+    }
     return !query.lastError().isValid();
 }
 
 
-QSqlQuery PostgreSqlStorage::prepareAndExecuteQuery(const QString &queryname, const QString &paramstring, const QSqlDatabase &db)
+QSqlQuery PostgreSqlStorage::prepareAndExecuteQuery(const QString &queryname, const QString &paramstring, QSqlDatabase &db)
 {
     // Query preparing is done lazily. That means that instead of always checking if the query is already prepared
     // we just EXECUTE and catch the error
@@ -1641,10 +1670,22 @@ QSqlQuery PostgreSqlStorage::prepareAndExecuteQuery(const QString &queryname, co
         query = db.exec(QString("EXECUTE quassel_%1 (%2)").arg(queryname).arg(paramstring));
     }
 
-    if (db.lastError().isValid()) {
-        // and once again: Qt leaves us without error codes so we either parse (language dependant(!)) strings
+    if (!db.isOpen() || db.lastError().isValid()) {
+        // If the query failed because the DB connection was down, reopen the connection and start a new transaction.
+        if (!db.isOpen()) {
+            db = logDb();
+            if (!beginTransaction(db)) {
+                qWarning() << "PostgreSqlStorage::prepareAndExecuteQuery(): cannot start transaction while recovering from connection loss!";
+                qWarning() << " -" << qPrintable(db.lastError().text());
+                return query;
+            }
+            db.exec("SAVEPOINT quassel_prepare_query");
+        } else {
+            db.exec("ROLLBACK TO SAVEPOINT quassel_prepare_query");
+        }
+
+        // and once again: Qt leaves us without error codes so we either parse (language dependent(!)) strings
         // or we just guess the error. As we're only interested in unprepared queries, this will be our guess. :)
-        db.exec("ROLLBACK TO SAVEPOINT quassel_prepare_query");
         QSqlQuery checkQuery = db.exec(QString("SELECT count(name) FROM pg_prepared_statements WHERE name = 'quassel_%1' AND from_sql = TRUE").arg(queryname.toLower()));
         checkQuery.first();
         if (checkQuery.value(0).toInt() == 0) {
@@ -1655,7 +1696,7 @@ QSqlQuery PostgreSqlStorage::prepareAndExecuteQuery(const QString &queryname, co
                 return QSqlQuery(db);
             }
         }
-        // we alwas execute the query again, even if the query was already prepared.
+        // we always execute the query again, even if the query was already prepared.
         // this ensures, that the error is properly propagated to the calling function
         // (otherwise the last call would be the testing select to pg_prepared_statements
         // which always gives a proper result and the error would be lost)
@@ -1674,7 +1715,7 @@ QSqlQuery PostgreSqlStorage::prepareAndExecuteQuery(const QString &queryname, co
 }
 
 
-QSqlQuery PostgreSqlStorage::executePreparedQuery(const QString &queryname, const QVariantList &params, const QSqlDatabase &db)
+QSqlQuery PostgreSqlStorage::executePreparedQuery(const QString &queryname, const QVariantList &params, QSqlDatabase &db)
 {
     QSqlDriver *driver = db.driver();
 
@@ -1700,7 +1741,7 @@ QSqlQuery PostgreSqlStorage::executePreparedQuery(const QString &queryname, cons
 }
 
 
-QSqlQuery PostgreSqlStorage::executePreparedQuery(const QString &queryname, const QVariant &param, const QSqlDatabase &db)
+QSqlQuery PostgreSqlStorage::executePreparedQuery(const QString &queryname, const QVariant &param, QSqlDatabase &db)
 {
     QSqlField field;
     field.setType(param.type());
@@ -1720,6 +1761,26 @@ void PostgreSqlStorage::deallocateQuery(const QString &queryname, const QSqlData
 }
 
 
+void PostgreSqlStorage::safeExec(QSqlQuery &query)
+{
+    // If the query fails due to the connection being gone, it seems to cause
+    // exec() to return false but no lastError to be set
+    if(!query.exec() && !query.lastError().isValid())
+    {
+        QSqlDatabase db = logDb();
+        QSqlQuery retryQuery(db);
+        retryQuery.prepare(query.lastQuery());
+        QMapIterator<QString, QVariant> i(query.boundValues());
+        while (i.hasNext())
+        {
+            i.next();
+            retryQuery.bindValue(i.key(),i.value());
+        }
+        query = retryQuery;
+        query.exec();
+    }
+}
+
 // ========================================
 //  PostgreSqlMigrationWriter
 // ========================================
index 0d172ec..73b16dc 100644 (file)
@@ -116,10 +116,11 @@ protected:
     virtual bool setupSchemaVersion(int version);
     void safeExec(QSqlQuery &query);
 
+    bool beginTransaction(QSqlDatabase &db);
     bool beginReadOnlyTransaction(QSqlDatabase &db);
 
-    QSqlQuery executePreparedQuery(const QString &queryname, const QVariantList &params, const QSqlDatabase &db);
-    QSqlQuery executePreparedQuery(const QString &queryname, const QVariant &param, const QSqlDatabase &db);
+    QSqlQuery executePreparedQuery(const QString &queryname, const QVariantList &params, QSqlDatabase &db);
+    QSqlQuery executePreparedQuery(const QString &queryname, const QVariant &param, QSqlDatabase &db);
     void deallocateQuery(const QString &queryname, const QSqlDatabase &db);
 
     inline void savePoint(const QString &handle, const QSqlDatabase &db) { db.exec(QString("SAVEPOINT %1").arg(handle)); }
@@ -129,8 +130,8 @@ protected:
 private:
     void bindNetworkInfo(QSqlQuery &query, const NetworkInfo &info);
     void bindServerInfo(QSqlQuery &query, const Network::Server &server);
-    QSqlQuery prepareAndExecuteQuery(const QString &queryname, const QString &paramstring, const QSqlDatabase &db);
-    inline QSqlQuery prepareAndExecuteQuery(const QString &queryname, const QSqlDatabase &db) { return prepareAndExecuteQuery(queryname, QString(), db); }
+    QSqlQuery prepareAndExecuteQuery(const QString &queryname, const QString &paramstring, QSqlDatabase &db);
+    inline QSqlQuery prepareAndExecuteQuery(const QString &queryname, QSqlDatabase &db) { return prepareAndExecuteQuery(queryname, QString(), db); }
 
     QString _hostName;
     int _port;
@@ -140,8 +141,6 @@ private:
 };
 
 
-inline void PostgreSqlStorage::safeExec(QSqlQuery &query) { query.exec(); }
-
 // ========================================
 //  PostgreSqlMigration
 // ========================================