return IdentityId();
}
- qDebug() << "creatId" << query.first() << query.value(0).toInt();
+ query.first();
identityId = query.value(0).toInt();
identity.setId(identityId);
return NetworkId();
}
- qDebug() << "createNet:" << query.first() << query.value(0).toInt();
+ query.first();
networkId = query.value(0).toInt();
if(!networkId.isValid()) {
return lastSeenHash;
}
-MsgId PostgreSqlStorage::logMessage(Message msg) {
+bool PostgreSqlStorage::logMessage(Message &msg) {
QSqlDatabase db = logDb();
if(!db.transaction()) {
qWarning() << "PostgreSqlStorage::logMessage(): cannot start transaction!";
return false;
}
- QSqlQuery logMessageQuery(db);
- logMessageQuery.prepare(queryString("insert_message"));
- logMessageQuery.bindValue(":time", msg.timestamp().toTime_t());
- logMessageQuery.bindValue(":bufferid", msg.bufferInfo().bufferId().toInt());
- logMessageQuery.bindValue(":type", msg.type());
- logMessageQuery.bindValue(":flags", (int)msg.flags());
- logMessageQuery.bindValue(":sender", msg.sender());
- logMessageQuery.bindValue(":message", msg.contents());
- safeExec(logMessageQuery);
+ if(!prepareQuery("insert_message", queryString("insert_message"), db)) {
+ qWarning() << "PostgreSqlStorage::logMessages(): unable to prepare query:" << queryString("insert_message");
+ qWarning() << " Error:" << db.lastError().text();
+ db.rollback();
+ return false;
+ }
+
+ QVariantList params;
+ params << msg.timestamp()
+ << msg.bufferInfo().bufferId().toInt()
+ << msg.type()
+ << (int)msg.flags()
+ << msg.sender()
+ << msg.contents();
+ QSqlQuery logMessageQuery = executePreparedQuery("insert_message", params, db);
if(logMessageQuery.lastError().isValid()) {
// first we need to reset the transaction
db.rollback();
db.transaction();
- QSqlQuery addSenderQuery(db);
- addSenderQuery.prepare(queryString("insert_sender"));
- addSenderQuery.bindValue(":sender", msg.sender());
- safeExec(addSenderQuery);
- safeExec(logMessageQuery);
+ // it's possible that the sender was already added by another thread
+ // since the insert might fail we're setting a savepoint
+ savePoint("sender_sp1", db);
+ QSqlQuery addSenderQuery = executePreparedQuery("insert_sender", msg.sender(), db);
+ if(addSenderQuery.lastError().isValid())
+ rollbackSavePoint("sender_sp1", db);
+ else
+ releaseSavePoint("sender_sp1", db);
+
+ logMessageQuery = db.exec(logMessageQuery.lastQuery());
if(!watchQuery(logMessageQuery)) {
+ qDebug() << "==================== Sender Query:";
+ watchQuery(addSenderQuery);
+ qDebug() << "==================== /Sender Query";
db.rollback();
- return MsgId();
+ return false;
}
}
logMessageQuery.first();
MsgId msgId = logMessageQuery.value(0).toInt();
db.commit();
+ if(msgId.isValid()) {
+ msg.setMsgId(msgId);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool PostgreSqlStorage::logMessages(MessageList &msgs) {
+ QSqlDatabase db = logDb();
+ if(!db.transaction()) {
+ qWarning() << "PostgreSqlStorage::logMessage(): cannot start transaction!";
+ qWarning() << " -" << qPrintable(db.lastError().text());
+ return false;
+ }
+
+ if(!prepareQuery("insert_sender", queryString("insert_sender"), db)) {
+ qWarning() << "PostgreSqlStorage::logMessages(): unable to prepare query:" << queryString("insert_sender");
+ qWarning() << " Error:" << db.lastError().text();
+ db.rollback();
+ return false;
+ }
+ QSet<QString> senders;
+ for(int i = 0; i < msgs.count(); i++) {
+ const QString &sender = msgs.at(i).sender();
+ if(senders.contains(sender))
+ continue;
+ senders << sender;
+
+ savePoint("sender_sp", db);
+ QSqlQuery addSenderQuery = executePreparedQuery("insert_sender", sender, db);
+ if(addSenderQuery.lastError().isValid())
+ rollbackSavePoint("sender_sp", db);
+ else
+ releaseSavePoint("sender_sp", db);
+ }
+
+ // yes we loop twice over the same list. This avoids alternating queries.
+ if(!prepareQuery("insert_message", queryString("insert_message"), db)) {
+ qWarning() << "PostgreSqlStorage::logMessages(): unable to prepare query:" << queryString("insert_message");
+ qWarning() << " Error:" << db.lastError().text();
+ db.rollback();
+ return false;
+ }
+ bool error = false;
+ for(int i = 0; i < msgs.count(); i++) {
+ Message &msg = msgs[i];
+ QVariantList params;
+ params << msg.timestamp()
+ << msg.bufferInfo().bufferId().toInt()
+ << msg.type()
+ << (int)msg.flags()
+ << msg.sender()
+ << msg.contents();
+ QSqlQuery logMessageQuery = executePreparedQuery("insert_message", params, db);
+ if(!watchQuery(logMessageQuery)) {
+ db.rollback();
+ error = true;
+ break;
+ } else {
+ logMessageQuery.first();
+ msg.setMsgId(logMessageQuery.value(0).toInt());
+ }
+ }
+
+ if(error) {
+ // we had a rollback in the db so we need to reset all msgIds
+ for(int i = 0; i < msgs.count(); i++) {
+ msgs[i].setMsgId(MsgId());
+ }
+ return false;
+ }
- Q_ASSERT(msgId.isValid());
- return msgId;
+ db.commit();
+ return true;
}
QList<Message> PostgreSqlStorage::requestMsgs(UserId user, BufferId bufferId, MsgId first, MsgId last, int limit) {
return messagelist;
}
+ QDateTime timestamp;
for(int i = 0; i < limit && query.next(); i++) {
- Message msg(QDateTime::fromTime_t(query.value(1).toInt()),
+ timestamp = query.value(1).toDateTime();
+ timestamp.setTimeSpec(Qt::UTC);
+ Message msg(timestamp,
bufferInfo,
(Message::Type)query.value(2).toUInt(),
query.value(5).toString(),
return messagelist;
}
+ QDateTime timestamp;
for(int i = 0; i < limit && query.next(); i++) {
- Message msg(QDateTime::fromTime_t(query.value(2).toInt()),
+ timestamp = query.value(1).toDateTime();
+ timestamp.setTimeSpec(Qt::UTC);
+ Message msg(timestamp,
bufferInfoHash[query.value(1).toInt()],
(Message::Type)query.value(3).toUInt(),
query.value(6).toString(),
QSqlQuery query = db.exec("BEGIN TRANSACTION READ ONLY");
return !query.lastError().isValid();
}
+
+bool PostgreSqlStorage::prepareQuery(const QString &handle, const QString &query, const QSqlDatabase &db) {
+ if(_preparedQueries.contains(db.connectionName()) && _preparedQueries[db.connectionName()].contains(handle))
+ return true; // already prepared
+
+ QMutexLocker locker(&_queryHashMutex);
+
+ static unsigned int stmtCount = 0;
+ QString queryId = QLatin1String("quassel_") + QString::number(++stmtCount, 16);
+ // qDebug() << "prepare:" << QString("PREPARE %1 AS %2").arg(queryId).arg(query);
+ db.exec(QString("PREPARE %1 AS %2").arg(queryId).arg(query));
+ if(db.lastError().isValid()) {
+ return false;
+ } else {
+ _preparedQueries[db.connectionName()][handle] = queryId;
+ return true;
+ }
+}
+
+QSqlQuery PostgreSqlStorage::executePreparedQuery(const QString &handle, const QVariantList ¶ms, const QSqlDatabase &db) {
+ if(!_preparedQueries.contains(db.connectionName()) || !_preparedQueries[db.connectionName()].contains(handle)) {
+ qWarning() << "PostgreSqlStorage::executePreparedQuery() no prepared Query with handle" << handle << "on Database" << db.connectionName();
+ return QSqlQuery();
+ }
+
+ QSqlDriver *driver = db.driver();
+
+ QStringList paramStrings;
+ QSqlField field;
+ for(int i = 0; i < params.count(); i++) {
+ const QVariant &value = params.at(i);
+ field.setType(value.type());
+ if(value.isNull())
+ field.clear();
+ else
+ field.setValue(value);
+
+ paramStrings << driver->formatValue(field);
+ }
+
+ const QString &queryId = _preparedQueries[db.connectionName()][handle];
+ if(params.isEmpty()) {
+ return db.exec(QString("EXECUTE %1").arg(queryId));
+ } else {
+ // qDebug() << "preparedExec:" << QString("EXECUTE %1 (%2)").arg(queryId).arg(paramStrings.join(", "));
+ return db.exec(QString("EXECUTE %1 (%2)").arg(queryId).arg(paramStrings.join(", ")));
+ }
+}
+
+QSqlQuery PostgreSqlStorage::executePreparedQuery(const QString &handle, const QVariant ¶m, const QSqlDatabase &db) {
+ if(!_preparedQueries.contains(db.connectionName()) || !_preparedQueries[db.connectionName()].contains(handle)) {
+ qWarning() << "PostgreSqlStorage::executePreparedQuery() no prepared Query with handle" << handle << "on Database" << db.connectionName();
+ return QSqlQuery();
+ }
+
+ QSqlField field;
+ field.setType(param.type());
+ if(param.isNull())
+ field.clear();
+ else
+ field.setValue(param);
+
+ const QString &queryId = _preparedQueries[db.connectionName()][handle];
+ QString paramString = db.driver()->formatValue(field);
+
+ // qDebug() << "preparedExec:" << QString("EXECUTE %1 (%2)").arg(queryId).arg(paramString);
+ return db.exec(QString("EXECUTE %1 (%2)").arg(queryId).arg(paramString));
+}
+
+void PostgreSqlStorage::deallocateQuery(const QString &handle, const QSqlDatabase &db) {
+ if(!_preparedQueries.contains(db.connectionName()) || !_preparedQueries[db.connectionName()].contains(handle)) {
+ return;
+ }
+ QMutexLocker locker(&_queryHashMutex);
+ QString queryId = _preparedQueries[db.connectionName()].take(handle);
+ db.exec(QString("DEALLOCATE %1").arg(queryId));
+}
+