Sony Arouje

a programmer's log

Posts Tagged ‘Elasticsearch

nginx and Elasticsearch in Windows

leave a comment »

The system I was working might need more than one Elasticsearch node configured as Master or Load balancer. I wanted to come up with a logic to connect to a different node in case of node failure. That means I have to write logic to check the status of a node before sending any index or search request. If the status failed, take the next node from the configured list and check the status. This is a viable approach but more concerned about the performance impact of my logic.

I started searching for a system that can open up a proxy IP and group the elasticsearch instance’s under that ip. This way the client will be aware of this proxy ip and not the individual ip and port details of Elasticsearch nodes. Also any node failures will be handled by the proxy system. After some exploration I decided to try nginx, it has built in load balancing with very simple configuration. I downloaded the mainline version of nginx for Windows and installed it.

To modify the nginx configuration go to conf\nginx.conf and open the configuration file in any text editor and add your configuration. See my configuration below.

worker_processes  1;
events {
worker_connections  1024;
}

http {

include       mime.types;
default_type  application/octet-stream;

upstream elasticcluster{
server 192.168.11.105:9200 weight=1 fail_timeout=1;
server 192.168.11.105:9201 weight=1 fail_timeout=1;
}

sendfile        on;
keepalive_timeout  65;
server {
listen       8080;
server_name  127.0.0.1;

location / {
proxy_pass  http://elasticcluster;
proxy_connect_timeout 1;
}

error_page   500 502 503 504  /50x.html;
location = /50x.html {
root   html;
}

}
}

In the upstream I group all my Elasticsearch nodes under elasticcluster. In the server’s proxy_pass I specify the upstream name, eg. http://elasticcluster. That’s it, now I can access my elastic search node via 127.0.0.1:8080 and nginx will route the traffic to any one of my Elasticsearch node in round robin manner. As you can see nginx ip and port is configured via listen and server_name settings.

Now all my search or index request goes to nginx ip and nginx will route the traffic to any of the Elasticsearch node running in my dev machine. I test failover by stopping one of my Elasticsearch node and nginx routed the traffic to the other live nodes.

Issue with localhost

Initially I configured nginx and Elasticsearch to use different ports of localhost. This causes some huge delay in nginx to route traffic to Elasticsearch nodes. I found this post from nginx forum and realized that the issue is with localhost. So I configured all my Elasticsearch instance to use the ip address instead of localhost. Also configured nginx.conf and specify 127.0.0.1 as server_name, you can also give the IP address instead of 127.0.0.1.

Leave your comments below if you find it helpful or have any questions. Thanks for reading.

Happy coding…

Written by Sony Arouje

March 3, 2014 at 6:39 pm

Indexing and Searching with ElasticSearch

with 2 comments

Last couple of days I was experimenting with ElasticSearch and different client libraries for .NET. In this post I will detailed the implementation of Indexing and searching using ElasticSearch in .NET. For detailed documentation of Elasticsearch visit official site or Joel Abrahamsson post.

I use PlainElastic.Net as my Elastic search client. PlainElastic is a very simple lightweight library for  Elasticsearch. It uses plain json for indexing and query, this gives me more freedom and tooling to create json from the user inputs for indexing and query.

To make it more flexible, our system gets data from the database using views. Datareader class converts this data to Dynamic objects. Then converts it to json and pass it to PlainElastic for indexing. Dynamic object makes life more easier as we can reuse this class with different views without worrying about strong types. Below is the Dynamic class I created.

    public class ElasticEntity:DynamicObject
    {
        private Dictionary<string, object> _members = new Dictionary<string, object>();

        public static ElasticEntity CreateFrom(Dictionary<string, object> members)
        {
            ElasticEntity entity = new ElasticEntity();
            entity.SetMembers(members);
            return entity;
        }

        public string GetValue(string property)
        {
            if (_members.ContainsKey(property))
            {
                object tmp= _members[property];
                if (tmp == null)
                    return string.Empty;
                else
                    return Convert.ToString(tmp);
            }
            return string.Empty;
        }

        public Dictionary<string, object> GetDictionary()
        {
            return _members;
        }

        internal void SetMembers(Dictionary<string, object> members)
        {
            this._members = members;
        }

        public void SetPropertyAndValue(string property, object value)
        {
            if (!_members.ContainsKey(property))
                _members.Add(property, value);
            else
                _members[property] = value;
        }
  
        public override bool TrySetMember(SetMemberBinder binder, object value)
        {
            if (!_members.ContainsKey(binder.Name))
                _members.Add(binder.Name, value);
            else
                _members[binder.Name] = value;

            return true;
        }
  
        public override bool TryGetMember(GetMemberBinder binder, out object result)
        {
            if (_members.ContainsKey(binder.Name))
            {
                result = _members[binder.Name];
                return true;
            }
            else
            {
                return base.TryGetMember(binder, out result);
            }
        }
  
        public override bool TryInvokeMember(InvokeMemberBinder binder, object[] args, out object result)
        {
            if (_members.ContainsKey(binder.Name)
                      && _members[binder.Name] is Delegate)
            {
                result = (_members[binder.Name] as Delegate).DynamicInvoke(args);
                return true;
            }
            else
            {
                return base.TryInvokeMember(binder, args, out result);
            }
        }

        public override IEnumerable<string> GetDynamicMemberNames()
        {
            return _members.Keys;
        }
    }

 

Indexing

Our views will fetch the data and return as IDataReader. While indexing data, the index helper will iterate through the reader and from the reader the data get loaded to Dynamic ElasticEntity as shown below.

        private ElasticEntity GetRecordAsElasticEntity(IDataReader reader)
        {
            ElasticEntity entity = new ElasticEntity();
            for (int i = 0; i < reader.FieldCount; i++)
            {
                entity.SetPropertyAndValue(reader.GetName(i), reader.GetValue(i));
            }

            return entity;
        }

Before indexing the ElasticEntity, it will be serialized to json using the solution provided in Stackoverflow, it uses JavaScriptSerializer and is very fast. Same approach used while deserializing the json result from Elasticsearch while searching. For deserializing Elasticsearch result, I used json.net initially but deserializing is very slow compare to Javascript serializer.

Below is my Elasticsearch Indexer. It’s a very simple class that uses PlainElastic.Net.

    public class PlainElasticIndexer:Interfaces.IElasticIndexer
    {
        private ElasticConnection _connection;

        private string _indexName;
        private string _type;
        private string _defaultHost;
        private int _port;

        /// <summary>
        /// Default host to localhost and port to 9200. If the host and port is different
        /// then use the parameterized constructor and specify the details.
        /// </summary>
        private PlainElasticIndexer()
        {
            _connection = new ElasticConnection("localhost", 9200);
        }

        public PlainElasticIndexer(string host, int port, string indexName, string type)
        {
            this._defaultHost = host;
            this._port = port;
            _indexName = indexName;
            _type = type;
            _connection = new ElasticConnection(_defaultHost, _port);
        }

        /// <summary>
        /// Default host to localhost and port to 9200. If the host and port is different
        /// then use the parameterized constructor and specify the details.
        /// </summary>
        public PlainElasticIndexer(string indexName, string type):this()
        {
            _indexName = indexName;
            _type = type;
        }


        /// <summary>
        /// Add or update an index. If the ID exist then update the index with the provided json.
        /// </summary>
        /// <param name="json"></param>
        /// <param name="id"></param>
        public void Write(string json, string id)
        {
            string command = Commands.Index(_indexName, _type, id);
            string response = _connection.Put(command, json);
        }
    }

 

Search

Searching also uses a very simple class as shown below.

    public class PlainElasticSearcher:Interfaces.IElasticSearcher
    {
        ElasticConnection _connection;
        private string _indexName;
        private string _type;
        private string _defaultHost;
        private int _port;

        /// <summary>
        /// Default host to localhost and port to 9200. If the host and port is different
        /// then use the parameterized constructor and specify the details.
        /// </summary>
        private PlainElasticSearcher()
        {
            _connection = new ElasticConnection("localhost", 9200);
        }

        public PlainElasticSearcher(string host, int port, string indexName, string type)
        {
            this._defaultHost = host;
            this._port = port;
            _indexName = indexName;
            _type = type;
            _connection = new ElasticConnection(_defaultHost, _port);
        }

        /// <summary>
        /// Default host to localhost and port to 9200. If the host and port is different
        /// then use the other parameterized constructor and specify the details.
        /// </summary>
        public PlainElasticSearcher(string indexName, string type) : this()
        {
            _indexName = indexName;
            _type = type;
        }

        public string Search(string jsonQuery)
        {
            string command = new SearchCommand(_indexName, _type).WithParameter("search_type", "query_and_fetch")
                                    .WithParameter("size","100");
            string result = _connection.Post(command, jsonQuery);
            return result;
        }
    }

 

The Search function returns the result as plain json. The caller will convert the json to ElasticEntity using the function shown below.

        public static IList<ElasticEntity> ToElasticEntity(string json)
        {
            IList<ElasticEntity> results = new List<ElasticEntity>();
            JavaScriptSerializer jss = new JavaScriptSerializer();
            jss.RegisterConverters(new JavaScriptConverter[] { new DynamicJsonConverter() });

            JsonTextReader jsonReader = new JsonTextReader(new StringReader(json));

            var jObj = JObject.Parse(json); 
            foreach (var child in jObj["hits"]["hits"])
            {
                var tmp = child["_source"].ToString();
                dynamic dynamicDict = jss.Deserialize(tmp, typeof(object)) as dynamic;
                ElasticEntity elasticEntity = ElasticEntity.CreateFrom(dynamicDict);
                results.Add(elasticEntity);
            }

            return results;
        }

I added another class to convert the ElasticEntity to typed object. This helps the caller to convert the ElasticEntity to the Domain objects or to DTO.

    /// <summary>
    /// This class maps ElasticEntity to any DomainSpecific strongly typed class. Say for e.g.
    /// the requesting class wants the search result as a Customer class. This mapper will 
    /// create an instance of Customer and get the value from ElasticEntity and set 
    /// it to the Properties of Customer. One rule you should follow is
    /// the Columns in the index and Properties in the Domain class should match. 
    /// We can say it's a convention based mapping.
    /// </summary>
    public class EntityMapper
    {
        private string _pathToClientEntityAssembly;
        private Assembly _loadedAssembly = null;

        public EntityMapper(string pathToClientEntityAssembly)
        {
            _pathToClientEntityAssembly = pathToClientEntityAssembly;
        }

        internal T Map<T>(ElasticEntity entity) where T:class
        {
            T instance = this.GetInstance<T>();
            Type type = instance.GetType();
            PropertyInfo[] properties = type.GetProperties();

            foreach (PropertyInfo property in properties)
            {
                object value = entity.GetValue(property.Name);
                property.SetValue(instance, Convert.ChangeType(value, property.PropertyType), null);
            }
            return instance;
        }

        private T GetInstance<T>()
        {
            if(string.IsNullOrWhiteSpace(_pathToClientEntityAssembly))
                throw new NullReferenceException(string.Format("Unable to create {0}, path to the 
                                        assembley is not specified.", typeof(T).ToString()));

            if(_loadedAssembly==null)
                _loadedAssembly = Assembly.LoadFile(_pathToClientEntityAssembly);

            object instance = _loadedAssembly.CreateInstance(typeof(T).ToString());
            return (T)instance;
        }

    }

The client can call the search function as shown below.

var searcher = new SearchFacade(new PlainElasticSearcher(), assmPath);
IList<Customer> resultAsCustomer = searcher.Search("Customer", jsonQry).Results<Customer>();

 

where assmPath is the path to Assembly where Customer entity resides. If the user needs the result as ElasticEntity, then set the assmPath as null and make the call as shown below.

IList<ElasticEntity> searchResult = searcher.Search("Customer", jsonQry).Results();

 

Below code shows the implementation of SearchFacade.

    public class SearcherFacade
    {
        private IElasticSearcher _elasticSearcher;
        private string _pathToEntityAssembly;
        private IList<ElasticEntity> _searchResults;
        /// <summary>
        /// Use this constructor if the client will be dealing with dynamic ElasticEntity.
        /// </summary>
        /// <param name="elasticSearcher"></param>
        public SearcherFacade(IElasticSearcher elasticSearcher)
        {
            _elasticSearcher = elasticSearcher;
        }

        /// <summary>
        /// Use this constructor if the client wants to convert the ElasticEntity to domain specific class.
        /// </summary>
        /// <param name="elasticSearcher"></param>
        /// <param name="pathToEntityAssembly">path to Assembley to locate the Domain specific class</param>
        public SearcherFacade(IElasticSearcher elasticSearcher, string pathToEntityAssembly):this(elasticSearcher)
        {
            _pathToEntityAssembly = pathToEntityAssembly;
        }

        public SearcherFacade Search(string jsonQuery)
        {
            string jsonResult = _elasticSearcher.Search(jsonQuery);
            _searchResults=  DeserializeJson.ToElasticEntity(jsonResult);
            return this;
        }

        public IList<ElasticEntity> Results()
        {
            return _searchResults;
        }

        /// <summary>
        /// Converts dynamic ElasticEntity to strongly typed Domain Entity.
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <returns></returns>
        public IList<T> Results<T>() where T:class
        {
            if (string.IsNullOrWhiteSpace(_pathToEntityAssembly))
                throw new NullReferenceException(string.Format("Please provide the Path and Assembly 
                          name in which class {0} resides. Set the fully qualified Assembly path 
                          via the constructor that take two parameters.", typeof(T).ToString()));

            EntityMapper mapper = new EntityMapper(_pathToEntityAssembly);
            IList<T> convertedResults = new List<T>();

            foreach (ElasticEntity entity in _searchResults)
            {
                T instance = mapper.Map<T>(entity);
                convertedResults.Add(instance);
            }
            return convertedResults;
        }
    }

 

This post gives a very simplistic and basic view of our ElasticSearch layer that I created. We have more functionality tailored for our needs. Hope this post helps some one to build a system using ElasticSearch.

 

Happy Coding…

Written by Sony Arouje

February 27, 2014 at 11:35 am

%d bloggers like this: