Bulk-insert messages into the db (again)
authorManuel Nickschas <sputnick@quassel-irc.org>
Mon, 25 Oct 2010 09:09:43 +0000 (11:09 +0200)
committerManuel Nickschas <sputnick@quassel-irc.org>
Mon, 25 Oct 2010 09:13:50 +0000 (11:13 +0200)
This accidentally got slaughtered while moving to the new event backend. Now
messages are buffered in CoreSession until the event queue is empty, which means
that a bulk of incoming messages will completely be processed before inserting into
the database, resulting in a huge performance gain.

This buffers even more aggressive than the old version, as we process Qt events in between,
which means we'll receive and process any additional data from the socket before flushing the
message queue. The downside is that nothing will arrive in the client as long as your socket gets
flooded, but on the other hand, it will greatly reduce db I/O in such cases.

src/common/eventmanager.cpp
src/common/eventmanager.h
src/core/coresession.cpp
src/core/coresession.h

index fd3cf9b..3fe581c 100644 (file)
@@ -180,6 +180,8 @@ void EventManager::processEvents() {
   _eventQueue.removeFirst();
   if(_eventQueue.count())
     QCoreApplication::postEvent(this, new QEvent(QEvent::User));
+  else
+    emit eventQueueEmptied();
 }
 
 void EventManager::dispatchEvent(Event *event) {
index 025493b..9b6e5a0 100644 (file)
@@ -138,6 +138,9 @@ public slots:
    */
   void sendEvent(Event *event);
 
+signals:
+  void eventQueueEmptied();
+
 protected:
   virtual void customEvent(QEvent *event);
 
index 3fd539e..0173a95 100644 (file)
@@ -68,7 +68,6 @@ CoreSession::CoreSession(UserId uid, bool restoreState, QObject *parent)
     _ctcpParser(new CtcpParser(this)),
     _ircParser(new IrcParser(this)),
     scriptEngine(new QScriptEngine(this)),
-    _processMessages(false),
     _ignoreListManager(this)
 {
   SignalProxy *p = signalProxy();
@@ -97,6 +96,7 @@ CoreSession::CoreSession(UserId uid, bool restoreState, QObject *parent)
   loadSettings();
   initScriptEngine();
 
+  connect(eventManager(), SIGNAL(eventQueueEmptied()), SLOT(processMessages()));
   eventManager()->registerObject(ircParser(), EventManager::NormalPriority);
   eventManager()->registerObject(sessionEventProcessor(), EventManager::HighPriority); // needs to process events *before* the stringifier!
   eventManager()->registerObject(ctcpParser(), EventManager::NormalPriority);
@@ -223,6 +223,9 @@ void CoreSession::msgFromClient(BufferInfo bufinfo, QString msg) {
   CoreNetwork *net = network(bufinfo.networkId());
   if(net) {
     net->userInput(bufinfo, msg);
+    // FIXME as soon as user input is event-based
+    // until then, user input doesn't trigger a message queue flush!
+    processMessages();
   } else {
     qWarning() << "Trying to send to unconnected network:" << msg;
   }
@@ -247,10 +250,6 @@ void CoreSession::recvMessageFromServer(NetworkId networkId, Message::Type type,
     return;
 
   _messageQueue << rawMsg;
-  if(!_processMessages) {
-    _processMessages = true;
-    QCoreApplication::postEvent(this, new ProcessMessagesEvent());
-  }
 }
 
 void CoreSession::recvStatusMsgFromServer(QString msg) {
@@ -271,15 +270,10 @@ QList<BufferInfo> CoreSession::buffers() const {
   return Core::requestBuffers(user());
 }
 
-void CoreSession::customEvent(QEvent *event) {
-  if(event->type() != QEvent::User)
+void CoreSession::processMessages() {
+  if(_messageQueue.isEmpty())
     return;
 
-  processMessages();
-  event->accept();
-}
-
-void CoreSession::processMessages() {
   if(_messageQueue.count() == 1) {
     const RawMessage &rawMsg = _messageQueue.first();
     bool createBuffer = !(rawMsg.flags & Message::Redirected);
@@ -335,7 +329,6 @@ void CoreSession::processMessages() {
       emit displayMsg(messages[i]);
     }
   }
-  _processMessages = false;
   _messageQueue.clear();
 }
 
index 3dd77f3..9d8ba55 100644 (file)
@@ -155,6 +155,7 @@ private slots:
 
   void recvStatusMsgFromServer(QString msg);
   void recvMessageFromServer(NetworkId networkId, Message::Type, BufferInfo::Type, const QString &target, const QString &text, const QString &sender = "", Message::Flags flags = Message::None);
+  void processMessages();
 
   void destroyNetwork(NetworkId);
 
@@ -167,13 +168,9 @@ private slots:
 
   void saveSessionState() const;
 
-protected:
-  virtual void customEvent(QEvent *event);
-
 private:
   void loadSettings();
   void initScriptEngine();
-  void processMessages();
 
   /// Hook for converting events to the old displayMsg() handlers
   Q_INVOKABLE void processMessageEvent(MessageEvent *event);
@@ -203,7 +200,6 @@ private:
   QScriptEngine *scriptEngine;
 
   QList<RawMessage> _messageQueue;
-  bool _processMessages;
   CoreIgnoreListManager _ignoreListManager;
 };