Sony Arouje

a programmer's log

Asynchronous Messaging in iTraveller

with 2 comments

Download iTraveller

I decided to use Lucene.Net to implement Search functionality in iTraveller. But the main design constrain was how to implement the indexing process without affecting the performance of any other functionality. Below are my use cases

  • Should start indexing asynchronously, no other process like Insert/Update should wait to complete indexing.
  • Other process like Insert/Update send a message to Indexing process and continue without waiting to finish the indexing process.
  • Lucene.Net will apply lock before start indexing. So only one process can access the Lucene.Net indexing API at a time.
  • Should have a single entry point for other process to invoke indexing.
  • Before indexing remove the record (if exist) from Index file.
  • Gather all the data related to photo like Photo Title, Description, Comments, etc. Index the data using Lucene.Net

In the above points, the first  one is the important thing I wanted to achieve to make iTraveller running smoothly. It’s obvious that I have to implement threading, and started my search for a thread safe queuing approach. At last I decided to use ‘Queue’ in System.Collections.Generic namespace. Let’s go through how I implemented message queue in iTraveller.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.IO;
using ITraveller.Domain;
using ITraveller.Utils.Logging;
namespace ITraveller.Indexer
{
    public class IndexQueue
    {
        private static Queue<Photo> _queue;
        private static ManualResetEvent _enqueuedEvent;
        private static Thread _workerThread;
        private static Photo _currentPhoto;
        private static ILogger _logger;
        static IndexQueue()
        {
            _queue = new Queue<Photo>();
            _logger = Logger.DefaultLogger;
            _enqueuedEvent = new ManualResetEvent(false);
            _workerThread = new Thread(new ThreadStart(PerformIndex));
            _workerThread.Start();
        }
        public static void RestartThread()
        {
            if (_workerThread.ThreadState == ThreadState.Stopped)
            {
                _workerThread.Abort();
                _workerThread = new Thread(new ThreadStart(PerformIndex));
                _workerThread.Start();
            }
        }
        public static void Add(Photo photo)
        {
            try
            {
                if (_queue == null)
                    _queue = new Queue<Photo>();
                lock (_queue)
                {
                    _queue.Enqueue(photo);
                    _enqueuedEvent.Set();
                }
                RestartThread();
            }
            catch (Exception ex)
            {
                _logger.LogInfo("Error while Adding to index Queue");
                _logger.LogError(ex);
            }
        }
        public static void Add(List<Photo> photos)
        {
            if (_queue == null)
                _queue = new Queue<Photo>();
            for (int i = 0; i < photos.Count; i++)
            {
                try
                {
                    lock (_queue)
                    {
                        _queue.Enqueue(photos[i]);
                        _enqueuedEvent.Set();
                    }
                }
                catch (Exception ex)
                {
                    _logger.LogInfo("Error while Adding to index Queue in loop");
                    _logger.LogError(ex);
                }
            }
            RestartThread();
        }
        private static bool Dequeue()
        {
            lock (_queue)
            {
                if (_queue.Count > 0)
                {
                    _enqueuedEvent.Reset();
                    _currentPhoto = _queue.Dequeue();
                }
                else
                {
                    return false;
                }

                return true;
            }
        }

        private static void PerformIndex()
        {
            IndexWriter writer=null;
            try
            {
                writer = new IndexWriter("./SearchIndex");
                while (Dequeue())
                {
                    IndexData indexData = new IndexData();
                    writer.DeleteFromIndex("ImageId:" + _currentPhoto.ImageID.ToString());
                    writer.WriteToIndex(indexData.GetIndexingData(_currentPhoto));
                }
            }
            catch (Exception ex)
            {
                _logger.LogInfo("Error while indexing data in worker thread");
                _logger.LogError(ex);
            }
            finally
            {
                if(writer!=null)
                    writer.Dispose();
            }
        }
    }
}

Other process can add Photo entity for indexing just by calling the static Add function and pass the required Photo( s ) object as parameter. Once the photo got added to queue, the Add function will call RestartThread function. The RestartThread function will check whether the worker thread Stopped, if Stopped then Recreate and Start the thread.

The function PerformIndex will get called when the worker thread starts. The PerformIndex function will dequeue the photo one by one from the Queue. Once the photo is dequeued successfully then one helper class will create and format the data for indexing and then  pass it to Lucene.Net class for further processing. The dequeue process will remove the photo object from the queue as well.

With the above approach I achieved a complete asynchronous messaging system. Any suggestion or better approach is always welcome.

Written by Sony Arouje

February 21, 2011 at 6:30 pm

2 Responses

Subscribe to comments with RSS.

  1. Bravo, this rather good thought

    Гордей

    March 9, 2011 at 3:14 pm


Leave a Reply to Гордей Cancel reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: