Further improvements to the postgres backend:
authorMarcus Eggenberger <egs@quassel-irc.org>
Sun, 22 Feb 2009 15:21:37 +0000 (16:21 +0100)
committerMarcus Eggenberger <egs@quassel-irc.org>
Tue, 3 Mar 2009 19:57:04 +0000 (20:57 +0100)
 - prohibit Qt's postgres driver from deallocating frequently used queries
 - grouping message logging: all messages available on the IRC socket
   are now stored at once. this can be further improved by breaking
   protocol and sending a that messagegroup as one chunk to the client.

14 files changed:
src/core/SQL/PostgreSQL/14/insert_message.sql
src/core/SQL/PostgreSQL/14/insert_sender.sql
src/core/basichandler.cpp
src/core/basichandler.h
src/core/core.h
src/core/corenetwork.cpp
src/core/corenetwork.h
src/core/coresession.cpp
src/core/coresession.h
src/core/postgresqlstorage.cpp
src/core/postgresqlstorage.h
src/core/sqlitestorage.cpp
src/core/sqlitestorage.h
src/core/storage.h

index 867f066..d35022c 100644 (file)
@@ -1,3 +1,3 @@
 INSERT INTO backlog (time, bufferid, type, flags, senderid, message)
 INSERT INTO backlog (time, bufferid, type, flags, senderid, message)
-VALUES (:time, :bufferid, :type, :flags, (SELECT senderid FROM sender WHERE sender = :sender), :message)
+VALUES ($1, $2, $3, $4, (SELECT senderid FROM sender WHERE sender = $5), $6)
 RETURNING messageid
 RETURNING messageid
index 0e84f63..215e603 100644 (file)
@@ -1,2 +1,2 @@
 INSERT INTO sender (sender)
 INSERT INTO sender (sender)
-VALUES (:sender)
\ No newline at end of file
+VALUES ($1)
\ No newline at end of file
index d9cb124..a053b52 100644 (file)
@@ -30,8 +30,8 @@ BasicHandler::BasicHandler(CoreNetwork *parent)
     _network(parent),
     initDone(false)
 {
     _network(parent),
     initDone(false)
 {
-  connect(this, SIGNAL(displayMsg(Message::Type, BufferInfo::Type, QString, QString, QString, Message::Flags)),
-         network(), SIGNAL(displayMsg(Message::Type, BufferInfo::Type, QString, QString, QString, Message::Flags)));
+  connect(this, SIGNAL(displayMsg(Message::Type, BufferInfo::Type, const QString &, const QString &, const QString &, Message::Flags)),
+         network(), SLOT(displayMsg(Message::Type, BufferInfo::Type, const QString &, const QString &, const QString &, Message::Flags)));
 
   connect(this, SIGNAL(putCmd(QString, const QList<QByteArray> &, const QByteArray &)),
          network(), SLOT(putCmd(QString, const QList<QByteArray> &, const QByteArray &)));
 
   connect(this, SIGNAL(putCmd(QString, const QList<QByteArray> &, const QByteArray &)),
          network(), SLOT(putCmd(QString, const QList<QByteArray> &, const QByteArray &)));
@@ -174,7 +174,7 @@ void BasicHandler::putCmd(const QString &cmd, const QByteArray &param, const QBy
   emit putCmd(cmd, list, prefix);
 }
 
   emit putCmd(cmd, list, prefix);
 }
 
-void BasicHandler::displayMsg(Message::Type msgType, QString target, QString text, QString sender, Message::Flags flags) {
+void BasicHandler::displayMsg(Message::Type msgType, QString target, const QString &text, const QString &sender, Message::Flags flags) {
   if(!target.isEmpty() && network()->prefixes().contains(target[0]))
     target = target.mid(1);
 
   if(!target.isEmpty() && network()->prefixes().contains(target[0]))
     target = target.mid(1);
 
index f67fe7f..ad954b5 100644 (file)
@@ -56,12 +56,12 @@ public:
   QList<QByteArray> userEncode(const QString &userNick, const QStringList &stringlist);
 
 signals:
   QList<QByteArray> userEncode(const QString &userNick, const QStringList &stringlist);
 
 signals:
-  void displayMsg(Message::Type, BufferInfo::Type, QString target, QString text, QString sender = "", Message::Flags flags = Message::None);
+  void displayMsg(Message::Type, BufferInfo::Type, const QString &target, const QString &text, const QString &sender = "", Message::Flags flags = Message::None);
   void putCmd(const QString &cmd, const QList<QByteArray> &params, const QByteArray &prefix = QByteArray());
   void putRawLine(const QByteArray &msg);
 
 protected:
   void putCmd(const QString &cmd, const QList<QByteArray> &params, const QByteArray &prefix = QByteArray());
   void putRawLine(const QByteArray &msg);
 
 protected:
-  void displayMsg(Message::Type, QString target, QString text, QString sender = "", Message::Flags flags = Message::None);
+  void displayMsg(Message::Type, QString target, const QString &text, const QString &sender = "", Message::Flags flags = Message::None);
   void putCmd(const QString &cmd, const QByteArray &param, const QByteArray &prefix = QByteArray());
 
   virtual void handle(const QString &member, QGenericArgument val0 = QGenericArgument(0),
   void putCmd(const QString &cmd, const QByteArray &param, const QByteArray &prefix = QByteArray());
 
   virtual void handle(const QString &member, QGenericArgument val0 = QGenericArgument(0),
index 6d743e8..3e19474 100644 (file)
@@ -256,16 +256,26 @@ class Core : public QObject {
     return instance()->_storage->getBufferInfo(user, bufferId);
   }
 
     return instance()->_storage->getBufferInfo(user, bufferId);
   }
 
-  //! Store a Message in the backlog.
+  //! Store a Message in the storage backend and set it's unique Id.
   /** \note This method is threadsafe.
    *
   /** \note This method is threadsafe.
    *
-   *  \param msg  The message object to be stored
-   *  \return The globally unique id for the stored message
+   *  \param message The message object to be stored
+   *  \return true on success
    */
    */
-  static inline MsgId storeMessage(const Message &message) {
+  static inline bool storeMessage(Message &message) {
     return instance()->_storage->logMessage(message);
   }
 
     return instance()->_storage->logMessage(message);
   }
 
+  //! Store a list of Messages in the storage backend and set their unique Id.
+  /** \note This method is threadsafe.
+   *
+   *  \param messages The list message objects to be stored
+   *  \return true on success
+   */
+  static inline bool storeMessages(MessageList &messages) {
+    return instance()->_storage->logMessages(messages);
+  }
+
   //! Request a certain number messages stored in a given buffer.
   /** \param buffer   The buffer we request messages from
    *  \param first    if != -1 return only messages with a MsgId >= first
   //! Request a certain number messages stored in a given buffer.
   /** \param buffer   The buffer we request messages from
    *  \param first    if != -1 return only messages with a MsgId >= first
index ece012e..4da6dbd 100644 (file)
@@ -303,7 +303,7 @@ void CoreNetwork::socketError(QAbstractSocket::SocketError error) {
   _previousConnectionAttemptFailed = true;
   qWarning() << qPrintable(tr("Could not connect to %1 (%2)").arg(networkName(), socket.errorString()));
   emit connectionError(socket.errorString());
   _previousConnectionAttemptFailed = true;
   qWarning() << qPrintable(tr("Could not connect to %1 (%2)").arg(networkName(), socket.errorString()));
   emit connectionError(socket.errorString());
-  emit displayMsg(Message::Error, BufferInfo::StatusBuffer, "", tr("Connection failure: %1").arg(socket.errorString()));
+  displayMsg(Message::Error, BufferInfo::StatusBuffer, "", tr("Connection failure: %1").arg(socket.errorString()));
   emitConnectionError(socket.errorString());
   if(socket.state() < QAbstractSocket::ConnectedState) {
     socketDisconnected();
   emitConnectionError(socket.errorString());
   if(socket.state() < QAbstractSocket::ConnectedState) {
     socketDisconnected();
@@ -359,7 +359,7 @@ void CoreNetwork::socketDisconnected() {
   IrcUser *me_ = me();
   if(me_) {
     foreach(QString channel, me_->channels())
   IrcUser *me_ = me();
   if(me_) {
     foreach(QString channel, me_->channels())
-      emit displayMsg(Message::Quit, BufferInfo::ChannelBuffer, channel, _quitReason, me_->hostmask());
+      displayMsg(Message::Quit, BufferInfo::ChannelBuffer, channel, _quitReason, me_->hostmask());
   }
 
   setConnected(false);
   }
 
   setConnected(false);
index c11aee8..a0e8c17 100644 (file)
@@ -108,10 +108,14 @@ public slots:
 
   inline void resetPingTimeout() { _lastPingTime = 0; }
 
 
   inline void resetPingTimeout() { _lastPingTime = 0; }
 
+  inline void displayMsg(Message::Type msgType, BufferInfo::Type bufferType, const QString &target, const QString &text, const QString &sender = "", Message::Flags flags = Message::None) {
+    emit displayMsg(networkId(), msgType, bufferType, target, text, sender, flags);
+  }
+
 signals:
   void recvRawServerMsg(QString);
   void displayStatusMsg(QString);
 signals:
   void recvRawServerMsg(QString);
   void displayStatusMsg(QString);
-  void displayMsg(Message::Type, BufferInfo::Type, QString target, QString text, QString sender = "", Message::Flags flags = Message::None);
+  void displayMsg(NetworkId, Message::Type, BufferInfo::Type, const QString &target, const QString &text, const QString &sender = "", Message::Flags flags = Message::None);
   void disconnected(NetworkId networkId);
   void connectionError(const QString &errorMsg);
 
   void disconnected(NetworkId networkId);
   void connectionError(const QString &errorMsg);
 
index 7a4f622..38d96ac 100644 (file)
 #include "coreusersettings.h"
 #include "logger.h"
 
 #include "coreusersettings.h"
 #include "logger.h"
 
+class ProcessMessagesEvent : public QEvent {
+public:
+  ProcessMessagesEvent() : QEvent(QEvent::User) {}
+};
+
 CoreSession::CoreSession(UserId uid, bool restoreState, QObject *parent)
   : QObject(parent),
     _user(uid),
 CoreSession::CoreSession(UserId uid, bool restoreState, QObject *parent)
   : QObject(parent),
     _user(uid),
@@ -49,7 +54,8 @@ CoreSession::CoreSession(UserId uid, bool restoreState, QObject *parent)
     _bufferViewManager(new CoreBufferViewManager(_signalProxy, this)),
     _ircListHelper(new CoreIrcListHelper(this)),
     _coreInfo(this),
     _bufferViewManager(new CoreBufferViewManager(_signalProxy, this)),
     _ircListHelper(new CoreIrcListHelper(this)),
     _coreInfo(this),
-    scriptEngine(new QScriptEngine(this))
+    scriptEngine(new QScriptEngine(this)),
+    _processMessages(false)
 {
   SignalProxy *p = signalProxy();
   connect(p, SIGNAL(peerRemoved(QIODevice *)), this, SLOT(removeClient(QIODevice *)));
 {
   SignalProxy *p = signalProxy();
   connect(p, SIGNAL(peerRemoved(QIODevice *)), this, SLOT(removeClient(QIODevice *)));
@@ -196,16 +202,13 @@ void CoreSession::msgFromClient(BufferInfo bufinfo, QString msg) {
 
 // ALL messages coming pass through these functions before going to the GUI.
 // So this is the perfect place for storing the backlog and log stuff.
 
 // ALL messages coming pass through these functions before going to the GUI.
 // So this is the perfect place for storing the backlog and log stuff.
-void CoreSession::recvMessageFromServer(Message::Type type, BufferInfo::Type bufferType,
-                                        QString target, QString text, QString sender, Message::Flags flags) {
-  CoreNetwork *net = qobject_cast<CoreNetwork*>(this->sender());
-  Q_ASSERT(net);
-
-  BufferInfo bufferInfo = Core::bufferInfo(user(), net->networkId(), bufferType, target);
-  Message msg(bufferInfo, type, text, sender, flags);
-  msg.setMsgId(Core::storeMessage(msg));
-  Q_ASSERT(msg.msgId() != 0);
-  emit displayMsg(msg);
+void CoreSession::recvMessageFromServer(NetworkId networkId, Message::Type type, BufferInfo::Type bufferType,
+                                        const QString &target, const QString &text, const QString &sender, Message::Flags flags) {
+  _messageQueue << RawMessage(networkId, type, bufferType, target, text, sender, flags);
+  if(!_processMessages) {
+    _processMessages = true;
+    QCoreApplication::postEvent(this, new ProcessMessagesEvent());
+  }
 }
 
 void CoreSession::recvStatusMsgFromServer(QString msg) {
 }
 
 void CoreSession::recvStatusMsgFromServer(QString msg) {
@@ -218,6 +221,46 @@ QList<BufferInfo> CoreSession::buffers() const {
   return Core::requestBuffers(user());
 }
 
   return Core::requestBuffers(user());
 }
 
+void CoreSession::customEvent(QEvent *event) {
+  if(event->type() != QEvent::User)
+    return;
+
+  processMessages();
+  event->accept();
+}
+
+void CoreSession::processMessages() {
+  qDebug() << "processing" << _messageQueue.count() << "messages..";
+  if(_messageQueue.count() == 1) {
+    const RawMessage &rawMsg = _messageQueue.first();
+    BufferInfo bufferInfo = Core::bufferInfo(user(), rawMsg.networkId, rawMsg.bufferType, rawMsg.target);
+    Message msg(bufferInfo, rawMsg.type, rawMsg.text, rawMsg.sender, rawMsg.flags);
+    Core::storeMessage(msg);
+    emit displayMsg(msg);
+  } else {
+    QHash<NetworkId, QHash<QString, BufferInfo> > bufferInfoCache;
+    MessageList messages;
+    BufferInfo bufferInfo;
+    for(int i = 0; i < _messageQueue.count(); i++) {
+      const RawMessage &rawMsg = _messageQueue.at(i);
+      if(bufferInfoCache.contains(rawMsg.networkId) && bufferInfoCache[rawMsg.networkId].contains(rawMsg.target)) {
+       bufferInfo = bufferInfoCache[rawMsg.networkId][rawMsg.target];
+      } else {
+       bufferInfo = Core::bufferInfo(user(), rawMsg.networkId, rawMsg.bufferType, rawMsg.target);
+       bufferInfoCache[rawMsg.networkId][rawMsg.target] = bufferInfo;
+      }
+      messages << Message(bufferInfo, rawMsg.type, rawMsg.text, rawMsg.sender, rawMsg.flags);
+    }
+
+    Core::storeMessages(messages);
+    // FIXME: extend protocol to a displayMessages(MessageList)
+    for(int i = 0; i < messages.count(); i++) {
+      emit displayMsg(messages[i]);
+    }
+  }
+  _processMessages = false;
+  _messageQueue.clear();
+}
 
 QVariant CoreSession::sessionState() {
   QVariantMap v;
 
 QVariant CoreSession::sessionState() {
   QVariantMap v;
@@ -322,8 +365,8 @@ void CoreSession::createNetwork(const NetworkInfo &info_, const QStringList &per
   id = info.networkId.toInt();
   if(!_networks.contains(id)) {
     CoreNetwork *net = new CoreNetwork(id, this);
   id = info.networkId.toInt();
   if(!_networks.contains(id)) {
     CoreNetwork *net = new CoreNetwork(id, this);
-    connect(net, SIGNAL(displayMsg(Message::Type, BufferInfo::Type, QString, QString, QString, Message::Flags)),
-           this, SLOT(recvMessageFromServer(Message::Type, BufferInfo::Type, QString, QString, QString, Message::Flags)));
+    connect(net, SIGNAL(displayMsg(NetworkId, Message::Type, BufferInfo::Type, const QString &, const QString &, const QString &, Message::Flags)),
+           this, SLOT(recvMessageFromServer(NetworkId, Message::Type, BufferInfo::Type, const QString &, const QString &, const QString &, Message::Flags)));
     connect(net, SIGNAL(displayStatusMsg(QString)), this, SLOT(recvStatusMsgFromServer(QString)));
 
     net->setNetworkInfo(info);
     connect(net, SIGNAL(displayStatusMsg(QString)), this, SLOT(recvStatusMsgFromServer(QString)));
 
     net->setNetworkInfo(info);
index 6e718be..48d83f8 100644 (file)
@@ -27,6 +27,7 @@
 #include "corecoreinfo.h"
 #include "corealiasmanager.h"
 #include "message.h"
 #include "corecoreinfo.h"
 #include "corealiasmanager.h"
 #include "message.h"
+#include "storage.h"
 
 class CoreBacklogManager;
 class CoreBufferSyncer;
 
 class CoreBacklogManager;
 class CoreBufferSyncer;
@@ -134,7 +135,7 @@ private slots:
   void removeClient(QIODevice *dev);
 
   void recvStatusMsgFromServer(QString msg);
   void removeClient(QIODevice *dev);
 
   void recvStatusMsgFromServer(QString msg);
-  void recvMessageFromServer(Message::Type, BufferInfo::Type, QString target, QString text, QString sender = "", Message::Flags flags = Message::None);
+  void recvMessageFromServer(NetworkId networkId, Message::Type, BufferInfo::Type, const QString &target, const QString &text, const QString &sender = "", Message::Flags flags = Message::None);
 
   void destroyNetwork(NetworkId);
 
 
   void destroyNetwork(NetworkId);
 
@@ -145,9 +146,13 @@ private slots:
 
   void updateIdentityBySender();
 
 
   void updateIdentityBySender();
 
+protected:
+  virtual void customEvent(QEvent *event);
+
 private:
   void loadSettings();
   void initScriptEngine();
 private:
   void loadSettings();
   void initScriptEngine();
+  void processMessages();
 
   UserId _user;
 
 
   UserId _user;
 
@@ -166,6 +171,19 @@ private:
 
   QScriptEngine *scriptEngine;
 
 
   QScriptEngine *scriptEngine;
 
+  struct RawMessage {
+    NetworkId networkId;
+    Message::Type type;
+    BufferInfo::Type bufferType;
+    QString target;
+    QString text;
+    QString sender;
+    Message::Flags flags;
+    RawMessage(NetworkId networkId, Message::Type type, BufferInfo::Type bufferType, const QString &target, const QString &text, const QString &sender, Message::Flags flags)
+      : networkId(networkId), type(type), bufferType(bufferType), target(target), text(text), sender(sender), flags(flags) {}
+  };
+  QList<RawMessage> _messageQueue;
+  bool _processMessages;
 };
 
 #endif
 };
 
 #endif
index 2c9c5b4..96805ba 100644 (file)
@@ -1104,7 +1104,7 @@ QHash<BufferId, MsgId> PostgreSqlStorage::bufferLastSeenMsgIds(UserId user) {
   return lastSeenHash;
 }
 
   return lastSeenHash;
 }
 
-MsgId PostgreSqlStorage::logMessage(Message msg) {
+bool PostgreSqlStorage::logMessage(Message &msg) {
   QSqlDatabase db = logDb();
   if(!db.transaction()) {
     qWarning() << "PostgreSqlStorage::logMessage(): cannot start transaction!";
   QSqlDatabase db = logDb();
   if(!db.transaction()) {
     qWarning() << "PostgreSqlStorage::logMessage(): cannot start transaction!";
@@ -1112,39 +1112,125 @@ MsgId PostgreSqlStorage::logMessage(Message msg) {
     return false;
   }
 
     return false;
   }
 
-  QSqlQuery logMessageQuery(db);
-  logMessageQuery.prepare(queryString("insert_message"));
-  logMessageQuery.bindValue(":time", msg.timestamp().toTime_t());
-  logMessageQuery.bindValue(":bufferid", msg.bufferInfo().bufferId().toInt());
-  logMessageQuery.bindValue(":type", msg.type());
-  logMessageQuery.bindValue(":flags", (int)msg.flags());
-  logMessageQuery.bindValue(":sender", msg.sender());
-  logMessageQuery.bindValue(":message", msg.contents());
-  safeExec(logMessageQuery);
+  if(!prepareQuery("insert_message", queryString("insert_message"), db)) {
+    qWarning() << "PostgreSqlStorage::logMessages(): unable to prepare query:" << queryString("insert_message");
+    qWarning() << "  Error:" << db.lastError().text();
+    db.rollback();
+    return false;
+  }
+
+  QVariantList params;
+  params << msg.timestamp().toTime_t()
+        << msg.bufferInfo().bufferId().toInt()
+        << msg.type()
+        << (int)msg.flags()
+        << msg.sender()
+        << msg.contents();
+  QSqlQuery logMessageQuery = executePreparedQuery("insert_message", params, db);
 
   if(logMessageQuery.lastError().isValid()) {
     // first we need to reset the transaction
     db.rollback();
     db.transaction();
 
 
   if(logMessageQuery.lastError().isValid()) {
     // first we need to reset the transaction
     db.rollback();
     db.transaction();
 
-    QSqlQuery addSenderQuery(db);
-    addSenderQuery.prepare(queryString("insert_sender"));
-    addSenderQuery.bindValue(":sender", msg.sender());
-    safeExec(addSenderQuery);
-    safeExec(logMessageQuery);
 
 
+    // it's possible that the sender was already added by another thread
+    // since the insert might fail we're setting a savepoint
+    savePoint("sender_sp1", db);
+    QSqlQuery addSenderQuery = executePreparedQuery("insert_sender", msg.sender(), db);
+    if(addSenderQuery.lastError().isValid())
+      rollbackSavePoint("sender_sp1", db);
+    else
+      releaseSavePoint("sender_sp1", db);
+
+    logMessageQuery = db.exec(logMessageQuery.lastQuery());
     if(!watchQuery(logMessageQuery)) {
     if(!watchQuery(logMessageQuery)) {
+      qDebug() << "==================== Sender Query:";
+      watchQuery(addSenderQuery);
+      qDebug() << "==================== /Sender Query";
       db.rollback();
       db.rollback();
-      return MsgId();
+      return false;
     }
   }
 
   logMessageQuery.first();
   MsgId msgId = logMessageQuery.value(0).toInt();
   db.commit();
     }
   }
 
   logMessageQuery.first();
   MsgId msgId = logMessageQuery.value(0).toInt();
   db.commit();
+  if(msgId.isValid()) {
+    msg.setMsgId(msgId);
+    return true;
+  } else {
+    return false;
+  }
+}
+
+bool PostgreSqlStorage::logMessages(MessageList &msgs) {
+  QSqlDatabase db = logDb();
+  if(!db.transaction()) {
+    qWarning() << "PostgreSqlStorage::logMessage(): cannot start transaction!";
+    qWarning() << " -" << qPrintable(db.lastError().text());
+    return false;
+  }
+
+  if(!prepareQuery("insert_sender", queryString("insert_sender"), db)) {
+    qWarning() << "PostgreSqlStorage::logMessages(): unable to prepare query:" << queryString("insert_sender");
+    qWarning() << "  Error:" << db.lastError().text();
+    db.rollback();
+    return false;
+  }
+  QSet<QString> senders;
+  for(int i = 0; i < msgs.count(); i++) {
+    const QString &sender = msgs.at(i).sender();
+    if(senders.contains(sender))
+      continue;
+    senders << sender;
+
+    savePoint("sender_sp", db);
+    QSqlQuery addSenderQuery = executePreparedQuery("insert_sender", sender, db);
+    if(addSenderQuery.lastError().isValid())
+      rollbackSavePoint("sender_sp", db);
+    else
+      releaseSavePoint("sender_sp", db);
+  }
+
+  // yes we loop twice over the same list. This avoids alternating queries.
+  if(!prepareQuery("insert_message", queryString("insert_message"), db)) {
+    qWarning() << "PostgreSqlStorage::logMessages(): unable to prepare query:" << queryString("insert_message");
+    qWarning() << "  Error:" << db.lastError().text();
+    db.rollback();
+    return false;
+  }
+  bool error = false;
+  for(int i = 0; i < msgs.count(); i++) {
+    Message &msg = msgs[i];
+    QVariantList params;
+    params << msg.timestamp().toTime_t()
+          << msg.bufferInfo().bufferId().toInt()
+          << msg.type()
+          << (int)msg.flags()
+          << msg.sender()
+          << msg.contents();
+    QSqlQuery logMessageQuery = executePreparedQuery("insert_message", params, db);
+    if(!watchQuery(logMessageQuery)) {
+      db.rollback();
+      error = true;
+      break;
+    } else {
+      logMessageQuery.first();
+      msg.setMsgId(logMessageQuery.value(0).toInt());
+    }
+  }
+
+  if(error) {
+    // we had a rollback in the db so we need to reset all msgIds
+    for(int i = 0; i < msgs.count(); i++) {
+      msgs[i].setMsgId(MsgId());
+    }
+    return false;
+  }
 
 
-  Q_ASSERT(msgId.isValid());
-  return msgId;
+  db.commit();
+  return true;
 }
 
 QList<Message> PostgreSqlStorage::requestMsgs(UserId user, BufferId bufferId, MsgId first, MsgId last, int limit) {
 }
 
 QList<Message> PostgreSqlStorage::requestMsgs(UserId user, BufferId bufferId, MsgId first, MsgId last, int limit) {
@@ -1269,3 +1355,81 @@ bool PostgreSqlStorage::beginReadOnlyTransaction(QSqlDatabase &db) {
   QSqlQuery query = db.exec("BEGIN TRANSACTION READ ONLY");
   return !query.lastError().isValid();
 }
   QSqlQuery query = db.exec("BEGIN TRANSACTION READ ONLY");
   return !query.lastError().isValid();
 }
+
+bool PostgreSqlStorage::prepareQuery(const QString &handle, const QString &query, const QSqlDatabase &db) {
+  if(_preparedQueries.contains(db.connectionName()) && _preparedQueries[db.connectionName()].contains(handle))
+    return true; // already prepared
+
+  QMutexLocker locker(&_queryHashMutex);
+
+  static unsigned int stmtCount = 0;
+  QString queryId = QLatin1String("quassel_") + QString::number(++stmtCount, 16);
+  // qDebug() << "prepare:" << QString("PREPARE %1 AS %2").arg(queryId).arg(query);
+  db.exec(QString("PREPARE %1 AS %2").arg(queryId).arg(query));
+  if(db.lastError().isValid()) {
+    return false;
+  } else {
+    _preparedQueries[db.connectionName()][handle] = queryId;
+    return true;
+  }
+}
+
+QSqlQuery PostgreSqlStorage::executePreparedQuery(const QString &handle, const QVariantList &params, const QSqlDatabase &db) {
+  if(!_preparedQueries.contains(db.connectionName()) || !_preparedQueries[db.connectionName()].contains(handle)) {
+    qWarning() << "PostgreSqlStorage::executePreparedQuery() no prepared Query with handle" << handle << "on Database" << db.connectionName();
+    return QSqlQuery();
+  }
+
+  QSqlDriver *driver = db.driver();
+
+  QStringList paramStrings;
+  QSqlField field;
+  for(int i = 0; i < params.count(); i++) {
+    const QVariant &value = params.at(i);
+    field.setType(value.type());
+    if(value.isNull())
+      field.clear();
+    else
+      field.setValue(value);
+
+    paramStrings << driver->formatValue(field);
+  }
+
+  const QString &queryId = _preparedQueries[db.connectionName()][handle];
+  if(params.isEmpty()) {
+    return db.exec(QString("EXECUTE %1").arg(queryId));
+  } else {
+    // qDebug() << "preparedExec:" << QString("EXECUTE %1 (%2)").arg(queryId).arg(paramStrings.join(", "));
+    return db.exec(QString("EXECUTE %1 (%2)").arg(queryId).arg(paramStrings.join(", ")));
+  }
+}
+
+QSqlQuery PostgreSqlStorage::executePreparedQuery(const QString &handle, const QVariant &param, const QSqlDatabase &db) {
+  if(!_preparedQueries.contains(db.connectionName()) || !_preparedQueries[db.connectionName()].contains(handle)) {
+    qWarning() << "PostgreSqlStorage::executePreparedQuery() no prepared Query with handle" << handle << "on Database" << db.connectionName();
+    return QSqlQuery();
+  }
+
+  QSqlField field;
+  field.setType(param.type());
+  if(param.isNull())
+    field.clear();
+  else
+    field.setValue(param);
+
+  const QString &queryId = _preparedQueries[db.connectionName()][handle];
+  QString paramString = db.driver()->formatValue(field);
+
+  // qDebug() << "preparedExec:" << QString("EXECUTE %1 (%2)").arg(queryId).arg(paramString);
+  return db.exec(QString("EXECUTE %1 (%2)").arg(queryId).arg(paramString));
+}
+
+void PostgreSqlStorage::deallocateQuery(const QString &handle, const QSqlDatabase &db) {
+  if(!_preparedQueries.contains(db.connectionName()) || !_preparedQueries[db.connectionName()].contains(handle)) {
+    return;
+  }
+  QMutexLocker locker(&_queryHashMutex);
+  QString queryId = _preparedQueries[db.connectionName()].take(handle);
+  db.exec(QString("DEALLOCATE %1").arg(queryId));
+}
+
index 536fa61..f699ca1 100644 (file)
@@ -40,7 +40,7 @@ public slots:
   QString displayName() const;
   QString description() const;
   QVariantMap setupKeys() const;
   QString displayName() const;
   QString description() const;
   QVariantMap setupKeys() const;
-  
+
   // TODO: Add functions for configuring the backlog handling, i.e. defining auto-cleanup settings etc
 
   /* User handling */
   // TODO: Add functions for configuring the backlog handling, i.e. defining auto-cleanup settings etc
 
   /* User handling */
@@ -78,7 +78,7 @@ public slots:
   virtual void setAwayMessage(UserId user, NetworkId networkId, const QString &awayMsg);
   virtual QString userModes(UserId user, NetworkId networkId);
   virtual void setUserModes(UserId user, NetworkId networkId, const QString &userModes);
   virtual void setAwayMessage(UserId user, NetworkId networkId, const QString &awayMsg);
   virtual QString userModes(UserId user, NetworkId networkId);
   virtual void setUserModes(UserId user, NetworkId networkId, const QString &userModes);
-  
+
   /* Buffer handling */
   virtual BufferInfo bufferInfo(UserId user, const NetworkId &networkId, BufferInfo::Type type, const QString &buffer = "", bool create = true);
   virtual BufferInfo getBufferInfo(UserId user, const BufferId &bufferId);
   /* Buffer handling */
   virtual BufferInfo bufferInfo(UserId user, const NetworkId &networkId, BufferInfo::Type type, const QString &buffer = "", bool create = true);
   virtual BufferInfo getBufferInfo(UserId user, const BufferId &bufferId);
@@ -91,8 +91,8 @@ public slots:
   virtual QHash<BufferId, MsgId> bufferLastSeenMsgIds(UserId user);
 
   /* Message handling */
   virtual QHash<BufferId, MsgId> bufferLastSeenMsgIds(UserId user);
 
   /* Message handling */
-
-  virtual MsgId logMessage(Message msg);
+  virtual bool logMessage(Message &msg);
+  virtual bool logMessages(MessageList &msgs);
   virtual QList<Message> requestMsgs(UserId user, BufferId bufferId, MsgId first = -1, MsgId last = -1, int limit = -1);
   virtual QList<Message> requestAllMsgs(UserId user, MsgId first = -1, MsgId last = -1, int limit = -1);
 
   virtual QList<Message> requestMsgs(UserId user, BufferId bufferId, MsgId first = -1, MsgId last = -1, int limit = -1);
   virtual QList<Message> requestAllMsgs(UserId user, MsgId first = -1, MsgId last = -1, int limit = -1);
 
@@ -108,8 +108,18 @@ protected:
   virtual bool updateSchemaVersion(int newVersion);
   virtual bool setupSchemaVersion(int version);
   void safeExec(QSqlQuery &query);
   virtual bool updateSchemaVersion(int newVersion);
   virtual bool setupSchemaVersion(int version);
   void safeExec(QSqlQuery &query);
+
   bool beginReadOnlyTransaction(QSqlDatabase &db);
 
   bool beginReadOnlyTransaction(QSqlDatabase &db);
 
+  bool prepareQuery(const QString &handle, const QString &query, const QSqlDatabase &db);
+  QSqlQuery executePreparedQuery(const QString &handle, const QVariantList &params, const QSqlDatabase &db);
+  QSqlQuery executePreparedQuery(const QString &handle, const QVariant &param, const QSqlDatabase &db);
+  void deallocateQuery(const QString &handle, const QSqlDatabase &db);
+
+  inline void savePoint(const QString &handle, const QSqlDatabase &db) { db.exec(QString("SAVEPOINT %1").arg(handle)); }
+  inline void rollbackSavePoint(const QString &handle, const QSqlDatabase &db) { db.exec(QString("ROLLBACK TO SAVEPOINT %1").arg(handle)); }
+  inline void releaseSavePoint(const QString &handle, const QSqlDatabase &db) { db.exec(QString("RELEASE SAVEPOINT %1").arg(handle)); }
+
 private:
   void bindNetworkInfo(QSqlQuery &query, const NetworkInfo &info);
   void bindServerInfo(QSqlQuery &query, const Network::Server &server);
 private:
   void bindNetworkInfo(QSqlQuery &query, const NetworkInfo &info);
   void bindServerInfo(QSqlQuery &query, const Network::Server &server);
@@ -119,8 +129,13 @@ private:
   QString _databaseName;
   QString _userName;
   QString _password;
   QString _databaseName;
   QString _userName;
   QString _password;
-  
+
   static int _maxRetryCount;
   static int _maxRetryCount;
+
+  typedef QHash<QString, QString> QueryHash;
+  QHash<QString, QueryHash> _preparedQueries; // one query hash per db connection
+  QMutex _queryHashMutex;
+
 };
 
 inline void PostgreSqlStorage::safeExec(QSqlQuery &query) { query.exec(); }
 };
 
 inline void PostgreSqlStorage::safeExec(QSqlQuery &query) { query.exec(); }
index 003d57a..82ab2ab 100644 (file)
@@ -933,7 +933,7 @@ QHash<BufferId, MsgId> SqliteStorage::bufferLastSeenMsgIds(UserId user) {
   return lastSeenHash;
 }
 
   return lastSeenHash;
 }
 
-MsgId SqliteStorage::logMessage(Message msg) {
+bool SqliteStorage::logMessage(Message &msg) {
   QSqlQuery logMessageQuery(logDb());
   logMessageQuery.prepare(queryString("insert_message"));
 
   QSqlQuery logMessageQuery(logDb());
   logMessageQuery.prepare(queryString("insert_message"));
 
@@ -954,15 +954,28 @@ MsgId SqliteStorage::logMessage(Message msg) {
       safeExec(addSenderQuery);
       safeExec(logMessageQuery);
       if(!watchQuery(logMessageQuery))
       safeExec(addSenderQuery);
       safeExec(logMessageQuery);
       if(!watchQuery(logMessageQuery))
-       return 0;
+       return false;
     } else {
       watchQuery(logMessageQuery);
     }
   }
 
   MsgId msgId = logMessageQuery.lastInsertId().toInt();
     } else {
       watchQuery(logMessageQuery);
     }
   }
 
   MsgId msgId = logMessageQuery.lastInsertId().toInt();
-  Q_ASSERT(msgId.isValid());
-  return msgId;
+  if(msgId.isValid()) {
+    msg.setMsgId(msgId);
+    return true;
+  } else {
+    return false;
+  }
+}
+
+bool SqliteStorage::logMessages(MessageList &msgs) {
+  // FIXME: optimize!
+  for(int i = 0; i < msgs.count(); i++) {
+    if(!logMessage(msgs[i]))
+      return false;
+  }
+  return true;
 }
 
 QList<Message> SqliteStorage::requestMsgs(UserId user, BufferId bufferId, MsgId first, MsgId last, int limit) {
 }
 
 QList<Message> SqliteStorage::requestMsgs(UserId user, BufferId bufferId, MsgId first, MsgId last, int limit) {
index feb2c5a..20f9da7 100644 (file)
@@ -92,8 +92,8 @@ public slots:
   virtual QHash<BufferId, MsgId> bufferLastSeenMsgIds(UserId user);
 
   /* Message handling */
   virtual QHash<BufferId, MsgId> bufferLastSeenMsgIds(UserId user);
 
   /* Message handling */
-
-  virtual MsgId logMessage(Message msg);
+  virtual bool logMessage(Message &msg);
+  virtual bool logMessages(MessageList &msgs);
   virtual QList<Message> requestMsgs(UserId user, BufferId bufferId, MsgId first = -1, MsgId last = -1, int limit = -1);
   virtual QList<Message> requestAllMsgs(UserId user, MsgId first = -1, MsgId last = -1, int limit = -1);
 
   virtual QList<Message> requestMsgs(UserId user, BufferId bufferId, MsgId first = -1, MsgId last = -1, int limit = -1);
   virtual QList<Message> requestAllMsgs(UserId user, MsgId first = -1, MsgId last = -1, int limit = -1);
 
index 17fdd70..5012c24 100644 (file)
@@ -341,11 +341,17 @@ public slots:
   
   /* Message handling */
 
   
   /* Message handling */
 
-  //! Store a Message in the backlog.
+  //! Store a Message in the storage backend and set its unique Id.
   /** \param msg  The message object to be stored
   /** \param msg  The message object to be stored
-   *  \return The globally unique id for the stored message
+   *  \return true on success
    */
    */
-  virtual MsgId logMessage(Message msg) = 0;
+  virtual bool logMessage(Message &msg) = 0;
+
+  //! Store a list of Messages in the storage backend and set their unique Id.
+  /** \param msgs The list message objects to be stored
+   *  \return true on success
+   */
+  virtual bool logMessages(MessageList &msgs) = 0;
 
   //! Request a certain number messages stored in a given buffer.
   /** \param buffer   The buffer we request messages from
 
   //! Request a certain number messages stored in a given buffer.
   /** \param buffer   The buffer we request messages from