Realtime Search for Hadoop

In the previous part of this article series we focused on the efficient storage of log data in Hadoop. We described how to store the data in Hadoop’s MapFiles, and we tweaked the configuration settings for increased data storage capacity and greater retrieval speed. Today, we discuss how to perform near real-time searches on up to 36.6 billion log messages by a clever combination of Hadoop with Lucene and Solr.

Search requirements

Let’s start with a quick recap of the data we have stored in Hadoop. Our system will store up to 20 TB of log messages. A single log message has an average size of around 600 byte. This means that we will have up to 36.6 billion distinct log messages stored in Hadoop. Our search requirements state that 95% percent of all search queries should display the results in less than 10 seconds. As will be shown, our solution is actually much faster than required (2-3 seconds per query).

However, these requirements could not be met with a pure Hadoop solution. In Hadoop we can use Map/Reduce jobs to retrieve data. A Map/Reduce job that has to read all the data in our cluster typically runs for about 2 hours. This is way above our response time requirements!

Introducing Lucene

The only way to be able to search in real-time is to build a search index on the stored data. We have evaluated Lucene for this purpose. Lucene seems to be a very good partner for Hadoop. It is implemented in Java, which means a very good integration in our own Java-based application. Lucene is also highly scalable and has a powerful query syntax.

Now let’s look at how our solution works. Lucene is able to distinguish multiple indexed fields in a single document. As we have learned in part 2 of this series, our log data can be split up in distinct fields like the timestamp of the message, the log level, the message itself, etc. An initial proof-of-concept implementations showed that this feature is necessary if we want to be able to search for date ranges, or ranges of different log levels.

A Lucene index consists of documents. Each document has a number of fields. The contents of a field can consist of one or more terms. The number of unique terms is on criteria for the memory requirements of an index.

One of the most important findings during the evaluation of Lucene was that the memory requirements for our index was a limiting factor. Lucene is able to index multiple terabytes of data. Usually, Lucene is used to build up a full text index of rather large files. In these scenarios, 1 TB of data consists often of maybe 10-20 million documents or even less. This is not the case with our data structure. For us, 1 TB of data consist of around 1.8 billion documents. Remember, our documents actually are log messages with a size of about 600 bytes. And we want to store 36.6 billion of them!

In our tests we have learned that we need about 1 GB heap memory for every 150 million documents in the Lucene index. These memory requirements depend heavily on the number of indexed fields, the type of the indexed fields and whether the contents of a field have to be stored in Lucene or not.

Reducing memory requirements of the Lucene index

A key factor in using Lucene is to reduce the memory requirements of the index as much as possible. We have analyzed typical search queries of our customer and identified fields which need to be indexed in order to be able to run 95% of all search queries in real-time. It turned out that we only needed 6 fields in the Lucene index:

  • timestamp of the log message
  • numeric id of the application which created this log message
  • numeric log level
  • name of application server the application is running on
  • host name of the server the application is running on
  • path in the Hadoop file system where the complete log message can be read

The next step was to optimize the memory requirements for each field. Especially the timestamp proved to be a very memory-intensive field. Our timestamps have an accuracy of milliseconds. This leads to a huge number of unique values inside the timestamp field which quickly eat up memory. On the other hand in the search queries timestamps are only specified up to an accuracy of minutes. We only need the higher accuracy to sort the search results.

After evaluating different ways to reduce the memory requirements for the timestamp field, we settled on a solution where we split up this field into 2 separate fields in the Lucene index. One field stores the timestamp with an accuracy of minutes and is indexed. The other field stores the timestamp with full accuracy and is only stored in Lucene, not indexed. With this solution we have reduced the number of unique terms that Lucene needs to handle and therefore reduced the memory requirements by a great deal. Another benefit of this approach is increased performance when searching for date ranges. The downside is that we need to sort the result set ourselves using the detailed timestamp field after getting the search results from Lucene.

Each document needs have a primary key field, which specifies how the document can be retrieved. In our case, the primary key field contains the full path inside the HDFS to the MapFile which contains the log message, followed by the index of the log message inside this MapFile. This enables us to directly access the referenced log message.

Working with Index Shards and Solr

With 1 GB heap memory required per 150 million documents, we would need 240 GB of heap memory to build an index for all documents in Hadoop. This is way too much for a single index! Using a single index has also disadvantages when you look at high availability requirements. If there is a problem with the index, you loose the real time query functionality.

We already have a number of data nodes running on the Hadoop cluster, so we can split up the Lucene index into smaller parts which can be served on each datanode. We assign 6 GB of heap memory on each data node to Lucene so that each data node is able to run the index for up to 1 billion documents.

When we realized that we had to split up the Lucene index into multiple so-called shards, we moved from Lucene to Solr. Solr is a search platform based on Lucene. It provides a Web-based interface to access the index. This means we can use a simple HTTP/REST request to index documents, perform queries and even move an index from one data node to another.

Handling Search Requests. An incoming search query (red arrow) is analyzed for the queried fields. If all fields are indexed, parallel search queries will be sent to all index shards (yellow arrow). The responses will be commulated, sorted and then returned to the user. If the search fields are not index, a new MapReduce search job will be created and submitted to the Hadoop jobtracker (green arrow).

Each data node is running a single Solr server which can host multiple Lucene indexes. New log messages are indexed into different shards, so that each index has approximately the same number of documents. This evens out the load on each shard and enables scalability. When a new data node is integrated into the cluster, the index shard on this datanode will be primarily used for indexing new documents.

For performance reasons, the index data files are stored on the local file system of each data node. Each time an index has been modified, it will be backed up into the Hadoop file system. Now we are able to quickly redeploy this index onto another data node, in case the data node which originally hosted this index has failed.

When performing a query, all index shards are queried in parallel. This ensures a fast response times. When a user formulates a query, it is first analyzed if this query can be run against the Lucene indexes. This is not the case if the user specifies search fields which are not indexed. In that case the query will be run as a Map/Reduce job. If the query can be run against the Lucene indexes, it will be forwarded to all data nodes in parallel. The results of these subqueries are cumulated and sorted. Then the log messages are read from the HDFS using the primary keys inside the Lucene index results.

If a query to a single shard fails, the search results may be incomplete, but the queries to the other shards are not affected. This greatly enhances the availability of the system. Typical query times are about 2-3 seconds with this approach, which is considerably less than stated in our customer’s requirements.

The downside of this approach is that we need a sophisticated mechanism to handle the large number of index shards. For example, we need to be able to identify which shard is running on which data node, which shards are currently not deployed on data nodes. We also need to identify failures of index shards and move indexes from one data node to another. We will discuss these topic in the next part of this series.

Share

Leave a Reply

You must be logged in to post a comment.

26 Responses to “Realtime Search for Hadoop”

  1. Nice post, thanks for sharing.

    So it sounds like you are using Lucene/Solr really just for filtering? Can you share a few typical queries you run against this index?

    How come you didn’t use the Trie date fields instead?

    So in the end how many shards and how many servers have you got and can you share how large in terms of disk space each shard is?

    I can’t wait to read the post on shard management topic. When are you going to publish that?

  2. Peter Dikant says:

    Thanks for the feedback!

    Yes, Solr is only used to query the data. Most of the queries search for a specific application-id combined with a time range. I would say that this is our typical use case.

    Our dates are stored in a TrieLongField. We still needed to reduce the resolution of the timestamps to keep the index size small. The Trie-Fields do a very good job in speeding up range queries, we always use Trie-Fields for storing numerical data.

    For full capacity we need 40 Servers with a total of 80 index shards. A single shard needs up to 20 GB disk space. Currently we are running the cluster with 30 servers and 60 index shards.

    I hope the next article will be finished at the beginning of July.

  3. Paul says:

    Just curious if you gave ElasticSearch consideration? About to do some eval and is one of my candidates, as it automates the shard management and replication across a cluster. I’m a little concerned about it being production ready due to it’s age vs solr.

    Will be interested to read about the shard mgmt with Solr.

    Thanks!

  4. Peter Dikant says:

    ElasticSearch seems to be really impressive. We did not look at ElasticSearch, because we started the design phase in mid 2009. Our Implementation began at the end of 2009 and at the time ElasticSearch released their first version, we already started integration tests with our customer. So for us ElasticSearch was too late.

    What we did evaluate was Katta, but at the time of our evaluation, we found some bugs, which we have reported and which have been fixed in a following version. Nevertheless we felt more comfortable in building our own tailored solution.

    If I would start a new project now, I would definitely give ElasticSearch a closer look. It seems to have most of the features we needed to implement around Solr to make it work in our cluster. It might also be a good idea to take another look at Katta. There have been 4 new versions since our evaluation.

  5. Mike Pilkington says:

    This is great stuff. Thank you for sharing your experiences. I’m searching for log management solutions and this is very interesting.

    Question for you…do all servers need to be in the same data center for high-bandwidth/low-latency connectivity? If it’s not a requirement, I assume the primary effect of geographically separating nodes would be increased search times? My thought is to have log collection servers geographically dispersed. Part of the reasoning for geographic separation is for privacy concerns/regulations (e.g. complying with EU regulations).

    Thanks again!

    • Peter Dikant says:

      Well, our Hadoop cluster is sitting in one room. Good connectivity is key to good performance, because the nodes exchange quite a lot of data between them. Technically it is possible to seperate them, but we experienced a big performance drop when one of our network cards dropped from 1GBit to 10MBit. So I would not recommend spanning the cluster over more than 1 data center.

  6. Balazs Vamos says:

    Dear Peter,

    What about the limitation of Lucene? numDocs() returns an int what means that the maximum number of documents ca not be larger the ~2billion.

    • Peter Dikant says:

      We can not hit these limitations, because we index a maximum of 250 million documents per Solr instance. When we accumulate the results from all instances, we use a long to count the number of total hits.

  7. Thanks for article.

    But there is a question: How are you feeding the solr with new log information (delta-import strategy):
    1) pulling new logs by querying timestamps from existing log-files (basically offset based) and pushing them to solr?
    2) working log-event driven, i.e. if application is doing log, it send another log-event to a centralized log-event-server (e.g. like graylog2 with GELF messages)?

    • Peter Dikant says:

      We are using a daily batch import to update the solr indexes in the Hadoop cluster. All log messages which arrived during the day are indexed into 3 Solr instances using a round robin algorithm. Once the batch is complete, the indexes are backed up to HDFS.

      We have an additional Solr instance which stores an index of the log messages that arrive during the day. Here the messages are indexed immediately after they arrive at our application. This index is truncated when the messages are moved into the cluster.

    • I see.

      How do you push new incoming messages which arrive during the day? How do you direct the log messages to solr.

      Currently I am working on an integration with graylog2-server:
      1) Custom log4j-appender pushes GELF messages to graylog2 server.
      2) On graylog2-server backend mongo-database is holding round-robin style the last xxx-messages.
      3) cron-job asynchronously (every 20 seconds) reads out data from mongodb and pushes simple .csv file to solr. For full import it pushes all messages. For delta-import a timestamp is saved which cron picks up for next delta-import to find the next delta-import “boundary”.

  8. Richard Park says:

    Hi, this is a very informative article. I had a question about the above paragraph:

    >>> For performance reasons, the index data files are stored on the local file system of each data node. Each time an index has been modified, it will be backed up into the Hadoop file system. Now we are able to quickly redeploy this index onto another data node, in case the data node which originally hosted this index has failed.

    So if I understand correctly, every index file is stored in two places: on a data node’s local file system and in HDFS. When you copy the files to HDFS, do you store them in a subdirectory so you know which data node they belong to?

    So if a data node fails, I assume you quickly copy its subdirectory of index files to the failover data node.

    This is an unrelated question but did you consider using libraries such as Cascading to help with writing MapReduce programs? I have also heard of people using Hadoop Streaming with other languages such as Ruby and Python.

    • Peter Dikant says:

      Correct. Each index is stored in two places. We use a single directory for all index backups. We tar and compress the backups before we copy them to the HDFS.

      If a data node fails, we copy the tarred index to another data node and untar it there. This whole process takes only a couple of minutes which is feasible for us.

      Our MapReduce jobs are written in Java. We feel very comfortable with this choice so there was no need to consider other language wrappers or libraries.

  9. tiru says:

    Hi
    I am very new to lucene and Hadoop trying to configure lucene with Hadoop cluster for search Index, can any one help me to do that.

  10. tiru says:

    Hi All,
    How can i configure Lucene with Hadoop,
    It could be great if any one provide Steps to configure lucene with Hadoop HDFS.

    cheers
    tiru

    • Peter Dikant says:

      Lucene can not work with HDFS directly. The index files need to be located on the local HDD. We store only index backups in the HDFS.

  11. Hadoop Learner says:

    You mentined, Lucene cannot directly store indexes on HDFS. In your case is Lucene reading log files from HDFS to index it? Can it read from HDFS directly?

    • Peter Dikant says:

      No, Lucene can not read files from HDFS directly.

      So what we do is a Map/Reduce job that creates CSV files from the data in HDFS. These CSV files are imported into Solr with the UpdateCSV method. This is a really fast solution for indexing files in HDFS.

  12. Ben Hsu says:

    Peter, thank you for sharing your experiences with us.

    We want to use your idea of storing date fields with minute granularity. We believe it will help our memory problems. Can you share how you implemented it?

    Thank you

    • Peter Dikant says:

      Sorry for the late answer. Somehow the post notification went amiss.

      We are using two date fields. One is indexed and stores the timestamp as a long integer. The other is only stored and not indexed and uses the full precision.

      Implementation is really straight forward. When searching, we have a GUI where a user can only enter timestamps with the indexed precision. After retrieving the results from Solr, we sort them against the full precision timestamp. Of course, this only works with limited result sets. We don’t deliver more than 50k results, so this works quite well.

  13. Sumit says:

    Very informative post. Thank you for taking the trouble to write it up and share with rest of us.

    I have a basic question about posting random data to SOLR like in your case, log files. Since, first few fields in any log statement are important fields (timestamp, severity, etc) which are separated by space bar, how do we tell SOLR where these fields end and where free text begins (which itself has many space characters in it)?

  14. Peter Dikant says:

    We receive the message via a REST call. One REST call usually contains a couple of hundred log messages. These messages are stored on the harddisk and then indexed via the SolrJ addBeans() method. All messages of single REST call are indexed with a single addBeans() call. We have limited the maximum number of log messages per REST call to 10000, so that we have a nice packages size for indexing in Solr.

  15. Alig says:

    Thank you for this nice article.
    Have you ever thought about using HBase as Storage for the log entries?
    Random access to Hbase using a row key retrieved from Solr should be quite fast.

  16. Peter Dikant says:

    Yes, in the Beginning of the project we evaluated HBase. At that time we had some trouble to get it working reliably. Another issue we had was that HBase had heavy requirements on the main memory of the data nodes. We already need as much memory for Solr as possible, so keeping the Hadoop part slim was a high priority for us.

    Today, HBase has matured. I think we would use HBase now, if we had to start the project again.

  17. dizh says:

    Thanks for sharing this artiche,now I have a few questions:

    1、how often the log on Hadoop are imported into csv and then sent to Solr,if too late, then it is not called near the time search
    2、how do you combine the result among the Solr shards

    Thank you