X-Git-Url: https://git.quassel-irc.org/?p=quassel.git;a=blobdiff_plain;f=src%2Fcore%2Fpostgresqlstorage.cpp;h=f0978086fa48ded34d40c3c82406e2ccee9c26b8;hp=2c9c5b4498dc71c95929214392033ae9414bf225;hb=91113c1a557409765625c41b05432af6d0f7e9d3;hpb=41f90ea96aad92b534a982296168baff8b8da2d7 diff --git a/src/core/postgresqlstorage.cpp b/src/core/postgresqlstorage.cpp index 2c9c5b44..f0978086 100644 --- a/src/core/postgresqlstorage.cpp +++ b/src/core/postgresqlstorage.cpp @@ -276,7 +276,7 @@ IdentityId PostgreSqlStorage::createIdentity(UserId user, CoreIdentity &identity return IdentityId(); } - qDebug() << "creatId" << query.first() << query.value(0).toInt(); + query.first(); identityId = query.value(0).toInt(); identity.setId(identityId); @@ -488,7 +488,7 @@ NetworkId PostgreSqlStorage::createNetwork(UserId user, const NetworkInfo &info) return NetworkId(); } - qDebug() << "createNet:" << query.first() << query.value(0).toInt(); + query.first(); networkId = query.value(0).toInt(); if(!networkId.isValid()) { @@ -1104,7 +1104,7 @@ QHash PostgreSqlStorage::bufferLastSeenMsgIds(UserId user) { return lastSeenHash; } -MsgId PostgreSqlStorage::logMessage(Message msg) { +bool PostgreSqlStorage::logMessage(Message &msg) { QSqlDatabase db = logDb(); if(!db.transaction()) { qWarning() << "PostgreSqlStorage::logMessage(): cannot start transaction!"; @@ -1112,39 +1112,125 @@ MsgId PostgreSqlStorage::logMessage(Message msg) { return false; } - QSqlQuery logMessageQuery(db); - logMessageQuery.prepare(queryString("insert_message")); - logMessageQuery.bindValue(":time", msg.timestamp().toTime_t()); - logMessageQuery.bindValue(":bufferid", msg.bufferInfo().bufferId().toInt()); - logMessageQuery.bindValue(":type", msg.type()); - logMessageQuery.bindValue(":flags", (int)msg.flags()); - logMessageQuery.bindValue(":sender", msg.sender()); - logMessageQuery.bindValue(":message", msg.contents()); - safeExec(logMessageQuery); + if(!prepareQuery("insert_message", queryString("insert_message"), db)) { + qWarning() << "PostgreSqlStorage::logMessages(): unable to prepare query:" << queryString("insert_message"); + qWarning() << " Error:" << db.lastError().text(); + db.rollback(); + return false; + } + + QVariantList params; + params << msg.timestamp() + << msg.bufferInfo().bufferId().toInt() + << msg.type() + << (int)msg.flags() + << msg.sender() + << msg.contents(); + QSqlQuery logMessageQuery = executePreparedQuery("insert_message", params, db); if(logMessageQuery.lastError().isValid()) { // first we need to reset the transaction db.rollback(); db.transaction(); - QSqlQuery addSenderQuery(db); - addSenderQuery.prepare(queryString("insert_sender")); - addSenderQuery.bindValue(":sender", msg.sender()); - safeExec(addSenderQuery); - safeExec(logMessageQuery); + // it's possible that the sender was already added by another thread + // since the insert might fail we're setting a savepoint + savePoint("sender_sp1", db); + QSqlQuery addSenderQuery = executePreparedQuery("insert_sender", msg.sender(), db); + if(addSenderQuery.lastError().isValid()) + rollbackSavePoint("sender_sp1", db); + else + releaseSavePoint("sender_sp1", db); + + logMessageQuery = db.exec(logMessageQuery.lastQuery()); if(!watchQuery(logMessageQuery)) { + qDebug() << "==================== Sender Query:"; + watchQuery(addSenderQuery); + qDebug() << "==================== /Sender Query"; db.rollback(); - return MsgId(); + return false; } } logMessageQuery.first(); MsgId msgId = logMessageQuery.value(0).toInt(); db.commit(); + if(msgId.isValid()) { + msg.setMsgId(msgId); + return true; + } else { + return false; + } +} + +bool PostgreSqlStorage::logMessages(MessageList &msgs) { + QSqlDatabase db = logDb(); + if(!db.transaction()) { + qWarning() << "PostgreSqlStorage::logMessage(): cannot start transaction!"; + qWarning() << " -" << qPrintable(db.lastError().text()); + return false; + } + + if(!prepareQuery("insert_sender", queryString("insert_sender"), db)) { + qWarning() << "PostgreSqlStorage::logMessages(): unable to prepare query:" << queryString("insert_sender"); + qWarning() << " Error:" << db.lastError().text(); + db.rollback(); + return false; + } + QSet senders; + for(int i = 0; i < msgs.count(); i++) { + const QString &sender = msgs.at(i).sender(); + if(senders.contains(sender)) + continue; + senders << sender; + + savePoint("sender_sp", db); + QSqlQuery addSenderQuery = executePreparedQuery("insert_sender", sender, db); + if(addSenderQuery.lastError().isValid()) + rollbackSavePoint("sender_sp", db); + else + releaseSavePoint("sender_sp", db); + } + + // yes we loop twice over the same list. This avoids alternating queries. + if(!prepareQuery("insert_message", queryString("insert_message"), db)) { + qWarning() << "PostgreSqlStorage::logMessages(): unable to prepare query:" << queryString("insert_message"); + qWarning() << " Error:" << db.lastError().text(); + db.rollback(); + return false; + } + bool error = false; + for(int i = 0; i < msgs.count(); i++) { + Message &msg = msgs[i]; + QVariantList params; + params << msg.timestamp() + << msg.bufferInfo().bufferId().toInt() + << msg.type() + << (int)msg.flags() + << msg.sender() + << msg.contents(); + QSqlQuery logMessageQuery = executePreparedQuery("insert_message", params, db); + if(!watchQuery(logMessageQuery)) { + db.rollback(); + error = true; + break; + } else { + logMessageQuery.first(); + msg.setMsgId(logMessageQuery.value(0).toInt()); + } + } + + if(error) { + // we had a rollback in the db so we need to reset all msgIds + for(int i = 0; i < msgs.count(); i++) { + msgs[i].setMsgId(MsgId()); + } + return false; + } - Q_ASSERT(msgId.isValid()); - return msgId; + db.commit(); + return true; } QList PostgreSqlStorage::requestMsgs(UserId user, BufferId bufferId, MsgId first, MsgId last, int limit) { @@ -1181,8 +1267,11 @@ QList PostgreSqlStorage::requestMsgs(UserId user, BufferId bufferId, Ms return messagelist; } + QDateTime timestamp; for(int i = 0; i < limit && query.next(); i++) { - Message msg(QDateTime::fromTime_t(query.value(1).toInt()), + timestamp = query.value(1).toDateTime(); + timestamp.setTimeSpec(Qt::UTC); + Message msg(timestamp, bufferInfo, (Message::Type)query.value(2).toUInt(), query.value(5).toString(), @@ -1227,8 +1316,11 @@ QList PostgreSqlStorage::requestAllMsgs(UserId user, MsgId first, MsgId return messagelist; } + QDateTime timestamp; for(int i = 0; i < limit && query.next(); i++) { - Message msg(QDateTime::fromTime_t(query.value(2).toInt()), + timestamp = query.value(1).toDateTime(); + timestamp.setTimeSpec(Qt::UTC); + Message msg(timestamp, bufferInfoHash[query.value(1).toInt()], (Message::Type)query.value(3).toUInt(), query.value(6).toString(), @@ -1269,3 +1361,81 @@ bool PostgreSqlStorage::beginReadOnlyTransaction(QSqlDatabase &db) { QSqlQuery query = db.exec("BEGIN TRANSACTION READ ONLY"); return !query.lastError().isValid(); } + +bool PostgreSqlStorage::prepareQuery(const QString &handle, const QString &query, const QSqlDatabase &db) { + if(_preparedQueries.contains(db.connectionName()) && _preparedQueries[db.connectionName()].contains(handle)) + return true; // already prepared + + QMutexLocker locker(&_queryHashMutex); + + static unsigned int stmtCount = 0; + QString queryId = QLatin1String("quassel_") + QString::number(++stmtCount, 16); + // qDebug() << "prepare:" << QString("PREPARE %1 AS %2").arg(queryId).arg(query); + db.exec(QString("PREPARE %1 AS %2").arg(queryId).arg(query)); + if(db.lastError().isValid()) { + return false; + } else { + _preparedQueries[db.connectionName()][handle] = queryId; + return true; + } +} + +QSqlQuery PostgreSqlStorage::executePreparedQuery(const QString &handle, const QVariantList ¶ms, const QSqlDatabase &db) { + if(!_preparedQueries.contains(db.connectionName()) || !_preparedQueries[db.connectionName()].contains(handle)) { + qWarning() << "PostgreSqlStorage::executePreparedQuery() no prepared Query with handle" << handle << "on Database" << db.connectionName(); + return QSqlQuery(); + } + + QSqlDriver *driver = db.driver(); + + QStringList paramStrings; + QSqlField field; + for(int i = 0; i < params.count(); i++) { + const QVariant &value = params.at(i); + field.setType(value.type()); + if(value.isNull()) + field.clear(); + else + field.setValue(value); + + paramStrings << driver->formatValue(field); + } + + const QString &queryId = _preparedQueries[db.connectionName()][handle]; + if(params.isEmpty()) { + return db.exec(QString("EXECUTE %1").arg(queryId)); + } else { + // qDebug() << "preparedExec:" << QString("EXECUTE %1 (%2)").arg(queryId).arg(paramStrings.join(", ")); + return db.exec(QString("EXECUTE %1 (%2)").arg(queryId).arg(paramStrings.join(", "))); + } +} + +QSqlQuery PostgreSqlStorage::executePreparedQuery(const QString &handle, const QVariant ¶m, const QSqlDatabase &db) { + if(!_preparedQueries.contains(db.connectionName()) || !_preparedQueries[db.connectionName()].contains(handle)) { + qWarning() << "PostgreSqlStorage::executePreparedQuery() no prepared Query with handle" << handle << "on Database" << db.connectionName(); + return QSqlQuery(); + } + + QSqlField field; + field.setType(param.type()); + if(param.isNull()) + field.clear(); + else + field.setValue(param); + + const QString &queryId = _preparedQueries[db.connectionName()][handle]; + QString paramString = db.driver()->formatValue(field); + + // qDebug() << "preparedExec:" << QString("EXECUTE %1 (%2)").arg(queryId).arg(paramString); + return db.exec(QString("EXECUTE %1 (%2)").arg(queryId).arg(paramString)); +} + +void PostgreSqlStorage::deallocateQuery(const QString &handle, const QSqlDatabase &db) { + if(!_preparedQueries.contains(db.connectionName()) || !_preparedQueries[db.connectionName()].contains(handle)) { + return; + } + QMutexLocker locker(&_queryHashMutex); + QString queryId = _preparedQueries[db.connectionName()].take(handle); + db.exec(QString("DEALLOCATE %1").arg(queryId)); +} +