From: Marcus Eggenberger Date: Sun, 22 Feb 2009 15:21:37 +0000 (+0100) Subject: Further improvements to the postgres backend: X-Git-Tag: 0.5-rc1~333 X-Git-Url: https://git.quassel-irc.org/?p=quassel.git;a=commitdiff_plain;h=b9828e0dd235964b8e2e97f844f4bed3476d3bd4 Further improvements to the postgres backend: - prohibit Qt's postgres driver from deallocating frequently used queries - grouping message logging: all messages available on the IRC socket are now stored at once. this can be further improved by breaking protocol and sending a that messagegroup as one chunk to the client. --- diff --git a/src/core/SQL/PostgreSQL/14/insert_message.sql b/src/core/SQL/PostgreSQL/14/insert_message.sql index 867f066c..d35022cd 100644 --- a/src/core/SQL/PostgreSQL/14/insert_message.sql +++ b/src/core/SQL/PostgreSQL/14/insert_message.sql @@ -1,3 +1,3 @@ INSERT INTO backlog (time, bufferid, type, flags, senderid, message) -VALUES (:time, :bufferid, :type, :flags, (SELECT senderid FROM sender WHERE sender = :sender), :message) +VALUES ($1, $2, $3, $4, (SELECT senderid FROM sender WHERE sender = $5), $6) RETURNING messageid diff --git a/src/core/SQL/PostgreSQL/14/insert_sender.sql b/src/core/SQL/PostgreSQL/14/insert_sender.sql index 0e84f63c..215e6037 100644 --- a/src/core/SQL/PostgreSQL/14/insert_sender.sql +++ b/src/core/SQL/PostgreSQL/14/insert_sender.sql @@ -1,2 +1,2 @@ INSERT INTO sender (sender) -VALUES (:sender) \ No newline at end of file +VALUES ($1) \ No newline at end of file diff --git a/src/core/basichandler.cpp b/src/core/basichandler.cpp index d9cb1245..a053b522 100644 --- a/src/core/basichandler.cpp +++ b/src/core/basichandler.cpp @@ -30,8 +30,8 @@ BasicHandler::BasicHandler(CoreNetwork *parent) _network(parent), initDone(false) { - connect(this, SIGNAL(displayMsg(Message::Type, BufferInfo::Type, QString, QString, QString, Message::Flags)), - network(), SIGNAL(displayMsg(Message::Type, BufferInfo::Type, QString, QString, QString, Message::Flags))); + connect(this, SIGNAL(displayMsg(Message::Type, BufferInfo::Type, const QString &, const QString &, const QString &, Message::Flags)), + network(), SLOT(displayMsg(Message::Type, BufferInfo::Type, const QString &, const QString &, const QString &, Message::Flags))); connect(this, SIGNAL(putCmd(QString, const QList &, const QByteArray &)), network(), SLOT(putCmd(QString, const QList &, const QByteArray &))); @@ -174,7 +174,7 @@ void BasicHandler::putCmd(const QString &cmd, const QByteArray ¶m, const QBy emit putCmd(cmd, list, prefix); } -void BasicHandler::displayMsg(Message::Type msgType, QString target, QString text, QString sender, Message::Flags flags) { +void BasicHandler::displayMsg(Message::Type msgType, QString target, const QString &text, const QString &sender, Message::Flags flags) { if(!target.isEmpty() && network()->prefixes().contains(target[0])) target = target.mid(1); diff --git a/src/core/basichandler.h b/src/core/basichandler.h index f67fe7f1..ad954b58 100644 --- a/src/core/basichandler.h +++ b/src/core/basichandler.h @@ -56,12 +56,12 @@ public: QList userEncode(const QString &userNick, const QStringList &stringlist); signals: - void displayMsg(Message::Type, BufferInfo::Type, QString target, QString text, QString sender = "", Message::Flags flags = Message::None); + void displayMsg(Message::Type, BufferInfo::Type, const QString &target, const QString &text, const QString &sender = "", Message::Flags flags = Message::None); void putCmd(const QString &cmd, const QList ¶ms, const QByteArray &prefix = QByteArray()); void putRawLine(const QByteArray &msg); protected: - void displayMsg(Message::Type, QString target, QString text, QString sender = "", Message::Flags flags = Message::None); + void displayMsg(Message::Type, QString target, const QString &text, const QString &sender = "", Message::Flags flags = Message::None); void putCmd(const QString &cmd, const QByteArray ¶m, const QByteArray &prefix = QByteArray()); virtual void handle(const QString &member, QGenericArgument val0 = QGenericArgument(0), diff --git a/src/core/core.h b/src/core/core.h index 6d743e89..3e19474f 100644 --- a/src/core/core.h +++ b/src/core/core.h @@ -256,16 +256,26 @@ class Core : public QObject { return instance()->_storage->getBufferInfo(user, bufferId); } - //! Store a Message in the backlog. + //! Store a Message in the storage backend and set it's unique Id. /** \note This method is threadsafe. * - * \param msg The message object to be stored - * \return The globally unique id for the stored message + * \param message The message object to be stored + * \return true on success */ - static inline MsgId storeMessage(const Message &message) { + static inline bool storeMessage(Message &message) { return instance()->_storage->logMessage(message); } + //! Store a list of Messages in the storage backend and set their unique Id. + /** \note This method is threadsafe. + * + * \param messages The list message objects to be stored + * \return true on success + */ + static inline bool storeMessages(MessageList &messages) { + return instance()->_storage->logMessages(messages); + } + //! Request a certain number messages stored in a given buffer. /** \param buffer The buffer we request messages from * \param first if != -1 return only messages with a MsgId >= first diff --git a/src/core/corenetwork.cpp b/src/core/corenetwork.cpp index ece012eb..4da6dbd9 100644 --- a/src/core/corenetwork.cpp +++ b/src/core/corenetwork.cpp @@ -303,7 +303,7 @@ void CoreNetwork::socketError(QAbstractSocket::SocketError error) { _previousConnectionAttemptFailed = true; qWarning() << qPrintable(tr("Could not connect to %1 (%2)").arg(networkName(), socket.errorString())); emit connectionError(socket.errorString()); - emit displayMsg(Message::Error, BufferInfo::StatusBuffer, "", tr("Connection failure: %1").arg(socket.errorString())); + displayMsg(Message::Error, BufferInfo::StatusBuffer, "", tr("Connection failure: %1").arg(socket.errorString())); emitConnectionError(socket.errorString()); if(socket.state() < QAbstractSocket::ConnectedState) { socketDisconnected(); @@ -359,7 +359,7 @@ void CoreNetwork::socketDisconnected() { IrcUser *me_ = me(); if(me_) { foreach(QString channel, me_->channels()) - emit displayMsg(Message::Quit, BufferInfo::ChannelBuffer, channel, _quitReason, me_->hostmask()); + displayMsg(Message::Quit, BufferInfo::ChannelBuffer, channel, _quitReason, me_->hostmask()); } setConnected(false); diff --git a/src/core/corenetwork.h b/src/core/corenetwork.h index c11aee84..a0e8c179 100644 --- a/src/core/corenetwork.h +++ b/src/core/corenetwork.h @@ -108,10 +108,14 @@ public slots: inline void resetPingTimeout() { _lastPingTime = 0; } + inline void displayMsg(Message::Type msgType, BufferInfo::Type bufferType, const QString &target, const QString &text, const QString &sender = "", Message::Flags flags = Message::None) { + emit displayMsg(networkId(), msgType, bufferType, target, text, sender, flags); + } + signals: void recvRawServerMsg(QString); void displayStatusMsg(QString); - void displayMsg(Message::Type, BufferInfo::Type, QString target, QString text, QString sender = "", Message::Flags flags = Message::None); + void displayMsg(NetworkId, Message::Type, BufferInfo::Type, const QString &target, const QString &text, const QString &sender = "", Message::Flags flags = Message::None); void disconnected(NetworkId networkId); void connectionError(const QString &errorMsg); diff --git a/src/core/coresession.cpp b/src/core/coresession.cpp index 7a4f6222..38d96acf 100644 --- a/src/core/coresession.cpp +++ b/src/core/coresession.cpp @@ -39,6 +39,11 @@ #include "coreusersettings.h" #include "logger.h" +class ProcessMessagesEvent : public QEvent { +public: + ProcessMessagesEvent() : QEvent(QEvent::User) {} +}; + CoreSession::CoreSession(UserId uid, bool restoreState, QObject *parent) : QObject(parent), _user(uid), @@ -49,7 +54,8 @@ CoreSession::CoreSession(UserId uid, bool restoreState, QObject *parent) _bufferViewManager(new CoreBufferViewManager(_signalProxy, this)), _ircListHelper(new CoreIrcListHelper(this)), _coreInfo(this), - scriptEngine(new QScriptEngine(this)) + scriptEngine(new QScriptEngine(this)), + _processMessages(false) { SignalProxy *p = signalProxy(); connect(p, SIGNAL(peerRemoved(QIODevice *)), this, SLOT(removeClient(QIODevice *))); @@ -196,16 +202,13 @@ void CoreSession::msgFromClient(BufferInfo bufinfo, QString msg) { // ALL messages coming pass through these functions before going to the GUI. // So this is the perfect place for storing the backlog and log stuff. -void CoreSession::recvMessageFromServer(Message::Type type, BufferInfo::Type bufferType, - QString target, QString text, QString sender, Message::Flags flags) { - CoreNetwork *net = qobject_cast(this->sender()); - Q_ASSERT(net); - - BufferInfo bufferInfo = Core::bufferInfo(user(), net->networkId(), bufferType, target); - Message msg(bufferInfo, type, text, sender, flags); - msg.setMsgId(Core::storeMessage(msg)); - Q_ASSERT(msg.msgId() != 0); - emit displayMsg(msg); +void CoreSession::recvMessageFromServer(NetworkId networkId, Message::Type type, BufferInfo::Type bufferType, + const QString &target, const QString &text, const QString &sender, Message::Flags flags) { + _messageQueue << RawMessage(networkId, type, bufferType, target, text, sender, flags); + if(!_processMessages) { + _processMessages = true; + QCoreApplication::postEvent(this, new ProcessMessagesEvent()); + } } void CoreSession::recvStatusMsgFromServer(QString msg) { @@ -218,6 +221,46 @@ QList CoreSession::buffers() const { return Core::requestBuffers(user()); } +void CoreSession::customEvent(QEvent *event) { + if(event->type() != QEvent::User) + return; + + processMessages(); + event->accept(); +} + +void CoreSession::processMessages() { + qDebug() << "processing" << _messageQueue.count() << "messages.."; + if(_messageQueue.count() == 1) { + const RawMessage &rawMsg = _messageQueue.first(); + BufferInfo bufferInfo = Core::bufferInfo(user(), rawMsg.networkId, rawMsg.bufferType, rawMsg.target); + Message msg(bufferInfo, rawMsg.type, rawMsg.text, rawMsg.sender, rawMsg.flags); + Core::storeMessage(msg); + emit displayMsg(msg); + } else { + QHash > bufferInfoCache; + MessageList messages; + BufferInfo bufferInfo; + for(int i = 0; i < _messageQueue.count(); i++) { + const RawMessage &rawMsg = _messageQueue.at(i); + if(bufferInfoCache.contains(rawMsg.networkId) && bufferInfoCache[rawMsg.networkId].contains(rawMsg.target)) { + bufferInfo = bufferInfoCache[rawMsg.networkId][rawMsg.target]; + } else { + bufferInfo = Core::bufferInfo(user(), rawMsg.networkId, rawMsg.bufferType, rawMsg.target); + bufferInfoCache[rawMsg.networkId][rawMsg.target] = bufferInfo; + } + messages << Message(bufferInfo, rawMsg.type, rawMsg.text, rawMsg.sender, rawMsg.flags); + } + + Core::storeMessages(messages); + // FIXME: extend protocol to a displayMessages(MessageList) + for(int i = 0; i < messages.count(); i++) { + emit displayMsg(messages[i]); + } + } + _processMessages = false; + _messageQueue.clear(); +} QVariant CoreSession::sessionState() { QVariantMap v; @@ -322,8 +365,8 @@ void CoreSession::createNetwork(const NetworkInfo &info_, const QStringList &per id = info.networkId.toInt(); if(!_networks.contains(id)) { CoreNetwork *net = new CoreNetwork(id, this); - connect(net, SIGNAL(displayMsg(Message::Type, BufferInfo::Type, QString, QString, QString, Message::Flags)), - this, SLOT(recvMessageFromServer(Message::Type, BufferInfo::Type, QString, QString, QString, Message::Flags))); + connect(net, SIGNAL(displayMsg(NetworkId, Message::Type, BufferInfo::Type, const QString &, const QString &, const QString &, Message::Flags)), + this, SLOT(recvMessageFromServer(NetworkId, Message::Type, BufferInfo::Type, const QString &, const QString &, const QString &, Message::Flags))); connect(net, SIGNAL(displayStatusMsg(QString)), this, SLOT(recvStatusMsgFromServer(QString))); net->setNetworkInfo(info); diff --git a/src/core/coresession.h b/src/core/coresession.h index 6e718bea..48d83f88 100644 --- a/src/core/coresession.h +++ b/src/core/coresession.h @@ -27,6 +27,7 @@ #include "corecoreinfo.h" #include "corealiasmanager.h" #include "message.h" +#include "storage.h" class CoreBacklogManager; class CoreBufferSyncer; @@ -134,7 +135,7 @@ private slots: void removeClient(QIODevice *dev); void recvStatusMsgFromServer(QString msg); - void recvMessageFromServer(Message::Type, BufferInfo::Type, QString target, QString text, QString sender = "", Message::Flags flags = Message::None); + void recvMessageFromServer(NetworkId networkId, Message::Type, BufferInfo::Type, const QString &target, const QString &text, const QString &sender = "", Message::Flags flags = Message::None); void destroyNetwork(NetworkId); @@ -145,9 +146,13 @@ private slots: void updateIdentityBySender(); +protected: + virtual void customEvent(QEvent *event); + private: void loadSettings(); void initScriptEngine(); + void processMessages(); UserId _user; @@ -166,6 +171,19 @@ private: QScriptEngine *scriptEngine; + struct RawMessage { + NetworkId networkId; + Message::Type type; + BufferInfo::Type bufferType; + QString target; + QString text; + QString sender; + Message::Flags flags; + RawMessage(NetworkId networkId, Message::Type type, BufferInfo::Type bufferType, const QString &target, const QString &text, const QString &sender, Message::Flags flags) + : networkId(networkId), type(type), bufferType(bufferType), target(target), text(text), sender(sender), flags(flags) {} + }; + QList _messageQueue; + bool _processMessages; }; #endif diff --git a/src/core/postgresqlstorage.cpp b/src/core/postgresqlstorage.cpp index 2c9c5b44..96805baf 100644 --- a/src/core/postgresqlstorage.cpp +++ b/src/core/postgresqlstorage.cpp @@ -1104,7 +1104,7 @@ QHash PostgreSqlStorage::bufferLastSeenMsgIds(UserId user) { return lastSeenHash; } -MsgId PostgreSqlStorage::logMessage(Message msg) { +bool PostgreSqlStorage::logMessage(Message &msg) { QSqlDatabase db = logDb(); if(!db.transaction()) { qWarning() << "PostgreSqlStorage::logMessage(): cannot start transaction!"; @@ -1112,39 +1112,125 @@ MsgId PostgreSqlStorage::logMessage(Message msg) { return false; } - QSqlQuery logMessageQuery(db); - logMessageQuery.prepare(queryString("insert_message")); - logMessageQuery.bindValue(":time", msg.timestamp().toTime_t()); - logMessageQuery.bindValue(":bufferid", msg.bufferInfo().bufferId().toInt()); - logMessageQuery.bindValue(":type", msg.type()); - logMessageQuery.bindValue(":flags", (int)msg.flags()); - logMessageQuery.bindValue(":sender", msg.sender()); - logMessageQuery.bindValue(":message", msg.contents()); - safeExec(logMessageQuery); + if(!prepareQuery("insert_message", queryString("insert_message"), db)) { + qWarning() << "PostgreSqlStorage::logMessages(): unable to prepare query:" << queryString("insert_message"); + qWarning() << " Error:" << db.lastError().text(); + db.rollback(); + return false; + } + + QVariantList params; + params << msg.timestamp().toTime_t() + << msg.bufferInfo().bufferId().toInt() + << msg.type() + << (int)msg.flags() + << msg.sender() + << msg.contents(); + QSqlQuery logMessageQuery = executePreparedQuery("insert_message", params, db); if(logMessageQuery.lastError().isValid()) { // first we need to reset the transaction db.rollback(); db.transaction(); - QSqlQuery addSenderQuery(db); - addSenderQuery.prepare(queryString("insert_sender")); - addSenderQuery.bindValue(":sender", msg.sender()); - safeExec(addSenderQuery); - safeExec(logMessageQuery); + // it's possible that the sender was already added by another thread + // since the insert might fail we're setting a savepoint + savePoint("sender_sp1", db); + QSqlQuery addSenderQuery = executePreparedQuery("insert_sender", msg.sender(), db); + if(addSenderQuery.lastError().isValid()) + rollbackSavePoint("sender_sp1", db); + else + releaseSavePoint("sender_sp1", db); + + logMessageQuery = db.exec(logMessageQuery.lastQuery()); if(!watchQuery(logMessageQuery)) { + qDebug() << "==================== Sender Query:"; + watchQuery(addSenderQuery); + qDebug() << "==================== /Sender Query"; db.rollback(); - return MsgId(); + return false; } } logMessageQuery.first(); MsgId msgId = logMessageQuery.value(0).toInt(); db.commit(); + if(msgId.isValid()) { + msg.setMsgId(msgId); + return true; + } else { + return false; + } +} + +bool PostgreSqlStorage::logMessages(MessageList &msgs) { + QSqlDatabase db = logDb(); + if(!db.transaction()) { + qWarning() << "PostgreSqlStorage::logMessage(): cannot start transaction!"; + qWarning() << " -" << qPrintable(db.lastError().text()); + return false; + } + + if(!prepareQuery("insert_sender", queryString("insert_sender"), db)) { + qWarning() << "PostgreSqlStorage::logMessages(): unable to prepare query:" << queryString("insert_sender"); + qWarning() << " Error:" << db.lastError().text(); + db.rollback(); + return false; + } + QSet senders; + for(int i = 0; i < msgs.count(); i++) { + const QString &sender = msgs.at(i).sender(); + if(senders.contains(sender)) + continue; + senders << sender; + + savePoint("sender_sp", db); + QSqlQuery addSenderQuery = executePreparedQuery("insert_sender", sender, db); + if(addSenderQuery.lastError().isValid()) + rollbackSavePoint("sender_sp", db); + else + releaseSavePoint("sender_sp", db); + } + + // yes we loop twice over the same list. This avoids alternating queries. + if(!prepareQuery("insert_message", queryString("insert_message"), db)) { + qWarning() << "PostgreSqlStorage::logMessages(): unable to prepare query:" << queryString("insert_message"); + qWarning() << " Error:" << db.lastError().text(); + db.rollback(); + return false; + } + bool error = false; + for(int i = 0; i < msgs.count(); i++) { + Message &msg = msgs[i]; + QVariantList params; + params << msg.timestamp().toTime_t() + << msg.bufferInfo().bufferId().toInt() + << msg.type() + << (int)msg.flags() + << msg.sender() + << msg.contents(); + QSqlQuery logMessageQuery = executePreparedQuery("insert_message", params, db); + if(!watchQuery(logMessageQuery)) { + db.rollback(); + error = true; + break; + } else { + logMessageQuery.first(); + msg.setMsgId(logMessageQuery.value(0).toInt()); + } + } + + if(error) { + // we had a rollback in the db so we need to reset all msgIds + for(int i = 0; i < msgs.count(); i++) { + msgs[i].setMsgId(MsgId()); + } + return false; + } - Q_ASSERT(msgId.isValid()); - return msgId; + db.commit(); + return true; } QList PostgreSqlStorage::requestMsgs(UserId user, BufferId bufferId, MsgId first, MsgId last, int limit) { @@ -1269,3 +1355,81 @@ bool PostgreSqlStorage::beginReadOnlyTransaction(QSqlDatabase &db) { QSqlQuery query = db.exec("BEGIN TRANSACTION READ ONLY"); return !query.lastError().isValid(); } + +bool PostgreSqlStorage::prepareQuery(const QString &handle, const QString &query, const QSqlDatabase &db) { + if(_preparedQueries.contains(db.connectionName()) && _preparedQueries[db.connectionName()].contains(handle)) + return true; // already prepared + + QMutexLocker locker(&_queryHashMutex); + + static unsigned int stmtCount = 0; + QString queryId = QLatin1String("quassel_") + QString::number(++stmtCount, 16); + // qDebug() << "prepare:" << QString("PREPARE %1 AS %2").arg(queryId).arg(query); + db.exec(QString("PREPARE %1 AS %2").arg(queryId).arg(query)); + if(db.lastError().isValid()) { + return false; + } else { + _preparedQueries[db.connectionName()][handle] = queryId; + return true; + } +} + +QSqlQuery PostgreSqlStorage::executePreparedQuery(const QString &handle, const QVariantList ¶ms, const QSqlDatabase &db) { + if(!_preparedQueries.contains(db.connectionName()) || !_preparedQueries[db.connectionName()].contains(handle)) { + qWarning() << "PostgreSqlStorage::executePreparedQuery() no prepared Query with handle" << handle << "on Database" << db.connectionName(); + return QSqlQuery(); + } + + QSqlDriver *driver = db.driver(); + + QStringList paramStrings; + QSqlField field; + for(int i = 0; i < params.count(); i++) { + const QVariant &value = params.at(i); + field.setType(value.type()); + if(value.isNull()) + field.clear(); + else + field.setValue(value); + + paramStrings << driver->formatValue(field); + } + + const QString &queryId = _preparedQueries[db.connectionName()][handle]; + if(params.isEmpty()) { + return db.exec(QString("EXECUTE %1").arg(queryId)); + } else { + // qDebug() << "preparedExec:" << QString("EXECUTE %1 (%2)").arg(queryId).arg(paramStrings.join(", ")); + return db.exec(QString("EXECUTE %1 (%2)").arg(queryId).arg(paramStrings.join(", "))); + } +} + +QSqlQuery PostgreSqlStorage::executePreparedQuery(const QString &handle, const QVariant ¶m, const QSqlDatabase &db) { + if(!_preparedQueries.contains(db.connectionName()) || !_preparedQueries[db.connectionName()].contains(handle)) { + qWarning() << "PostgreSqlStorage::executePreparedQuery() no prepared Query with handle" << handle << "on Database" << db.connectionName(); + return QSqlQuery(); + } + + QSqlField field; + field.setType(param.type()); + if(param.isNull()) + field.clear(); + else + field.setValue(param); + + const QString &queryId = _preparedQueries[db.connectionName()][handle]; + QString paramString = db.driver()->formatValue(field); + + // qDebug() << "preparedExec:" << QString("EXECUTE %1 (%2)").arg(queryId).arg(paramString); + return db.exec(QString("EXECUTE %1 (%2)").arg(queryId).arg(paramString)); +} + +void PostgreSqlStorage::deallocateQuery(const QString &handle, const QSqlDatabase &db) { + if(!_preparedQueries.contains(db.connectionName()) || !_preparedQueries[db.connectionName()].contains(handle)) { + return; + } + QMutexLocker locker(&_queryHashMutex); + QString queryId = _preparedQueries[db.connectionName()].take(handle); + db.exec(QString("DEALLOCATE %1").arg(queryId)); +} + diff --git a/src/core/postgresqlstorage.h b/src/core/postgresqlstorage.h index 536fa61f..f699ca11 100644 --- a/src/core/postgresqlstorage.h +++ b/src/core/postgresqlstorage.h @@ -40,7 +40,7 @@ public slots: QString displayName() const; QString description() const; QVariantMap setupKeys() const; - + // TODO: Add functions for configuring the backlog handling, i.e. defining auto-cleanup settings etc /* User handling */ @@ -78,7 +78,7 @@ public slots: virtual void setAwayMessage(UserId user, NetworkId networkId, const QString &awayMsg); virtual QString userModes(UserId user, NetworkId networkId); virtual void setUserModes(UserId user, NetworkId networkId, const QString &userModes); - + /* Buffer handling */ virtual BufferInfo bufferInfo(UserId user, const NetworkId &networkId, BufferInfo::Type type, const QString &buffer = "", bool create = true); virtual BufferInfo getBufferInfo(UserId user, const BufferId &bufferId); @@ -91,8 +91,8 @@ public slots: virtual QHash bufferLastSeenMsgIds(UserId user); /* Message handling */ - - virtual MsgId logMessage(Message msg); + virtual bool logMessage(Message &msg); + virtual bool logMessages(MessageList &msgs); virtual QList requestMsgs(UserId user, BufferId bufferId, MsgId first = -1, MsgId last = -1, int limit = -1); virtual QList requestAllMsgs(UserId user, MsgId first = -1, MsgId last = -1, int limit = -1); @@ -108,8 +108,18 @@ protected: virtual bool updateSchemaVersion(int newVersion); virtual bool setupSchemaVersion(int version); void safeExec(QSqlQuery &query); + bool beginReadOnlyTransaction(QSqlDatabase &db); + bool prepareQuery(const QString &handle, const QString &query, const QSqlDatabase &db); + QSqlQuery executePreparedQuery(const QString &handle, const QVariantList ¶ms, const QSqlDatabase &db); + QSqlQuery executePreparedQuery(const QString &handle, const QVariant ¶m, const QSqlDatabase &db); + void deallocateQuery(const QString &handle, const QSqlDatabase &db); + + inline void savePoint(const QString &handle, const QSqlDatabase &db) { db.exec(QString("SAVEPOINT %1").arg(handle)); } + inline void rollbackSavePoint(const QString &handle, const QSqlDatabase &db) { db.exec(QString("ROLLBACK TO SAVEPOINT %1").arg(handle)); } + inline void releaseSavePoint(const QString &handle, const QSqlDatabase &db) { db.exec(QString("RELEASE SAVEPOINT %1").arg(handle)); } + private: void bindNetworkInfo(QSqlQuery &query, const NetworkInfo &info); void bindServerInfo(QSqlQuery &query, const Network::Server &server); @@ -119,8 +129,13 @@ private: QString _databaseName; QString _userName; QString _password; - + static int _maxRetryCount; + + typedef QHash QueryHash; + QHash _preparedQueries; // one query hash per db connection + QMutex _queryHashMutex; + }; inline void PostgreSqlStorage::safeExec(QSqlQuery &query) { query.exec(); } diff --git a/src/core/sqlitestorage.cpp b/src/core/sqlitestorage.cpp index 003d57a6..82ab2ab2 100644 --- a/src/core/sqlitestorage.cpp +++ b/src/core/sqlitestorage.cpp @@ -933,7 +933,7 @@ QHash SqliteStorage::bufferLastSeenMsgIds(UserId user) { return lastSeenHash; } -MsgId SqliteStorage::logMessage(Message msg) { +bool SqliteStorage::logMessage(Message &msg) { QSqlQuery logMessageQuery(logDb()); logMessageQuery.prepare(queryString("insert_message")); @@ -954,15 +954,28 @@ MsgId SqliteStorage::logMessage(Message msg) { safeExec(addSenderQuery); safeExec(logMessageQuery); if(!watchQuery(logMessageQuery)) - return 0; + return false; } else { watchQuery(logMessageQuery); } } MsgId msgId = logMessageQuery.lastInsertId().toInt(); - Q_ASSERT(msgId.isValid()); - return msgId; + if(msgId.isValid()) { + msg.setMsgId(msgId); + return true; + } else { + return false; + } +} + +bool SqliteStorage::logMessages(MessageList &msgs) { + // FIXME: optimize! + for(int i = 0; i < msgs.count(); i++) { + if(!logMessage(msgs[i])) + return false; + } + return true; } QList SqliteStorage::requestMsgs(UserId user, BufferId bufferId, MsgId first, MsgId last, int limit) { diff --git a/src/core/sqlitestorage.h b/src/core/sqlitestorage.h index feb2c5aa..20f9da7a 100644 --- a/src/core/sqlitestorage.h +++ b/src/core/sqlitestorage.h @@ -92,8 +92,8 @@ public slots: virtual QHash bufferLastSeenMsgIds(UserId user); /* Message handling */ - - virtual MsgId logMessage(Message msg); + virtual bool logMessage(Message &msg); + virtual bool logMessages(MessageList &msgs); virtual QList requestMsgs(UserId user, BufferId bufferId, MsgId first = -1, MsgId last = -1, int limit = -1); virtual QList requestAllMsgs(UserId user, MsgId first = -1, MsgId last = -1, int limit = -1); diff --git a/src/core/storage.h b/src/core/storage.h index 17fdd70a..5012c24a 100644 --- a/src/core/storage.h +++ b/src/core/storage.h @@ -341,11 +341,17 @@ public slots: /* Message handling */ - //! Store a Message in the backlog. + //! Store a Message in the storage backend and set its unique Id. /** \param msg The message object to be stored - * \return The globally unique id for the stored message + * \return true on success */ - virtual MsgId logMessage(Message msg) = 0; + virtual bool logMessage(Message &msg) = 0; + + //! Store a list of Messages in the storage backend and set their unique Id. + /** \param msgs The list message objects to be stored + * \return true on success + */ + virtual bool logMessages(MessageList &msgs) = 0; //! Request a certain number messages stored in a given buffer. /** \param buffer The buffer we request messages from