Blogroll

Indexing data in Solr from disparate sources using Camel

Apache Solr is "the popular, blazing fast open source enterprise search platform" built on top of Lucene.  In order to do a search (and find results) there is the initial requirement of data ingestion usually from disparate sources like content management systems, relational databases, legacy systems, you name it... Then there is also the challenge of keeping the index up to date by adding new data, updating existing records, removing obsolete data. The new sources of data could be the same as the initial ones, but could also be sources like twitter, AWS or rest endpoints.

Solr can understand different file formats and provides fair amount of options for data indexing:
  1. Direct HTTP and remote streaming - allows you to interact with Solr over HTTP by posting a file for direct indexing or the path to the file for remote streaming.
  2. DataImportHandler - is a module that enables both full and incremental delta imports from relational databases or file system.
  3. SolrJ - a java client to access Solr using Apache Commons HTTP Client.
But in real life, indexing data from different sources with millions of documents, dozens of transformations, filtering, content enriching, replication, parallel processing  requires much more than that. One way to cope with such a challenge is by reinventing the wheel: write few custom applications, combine them with some scripts or run cronjobs. Another approach would be to use a tool that is flexible and designed to be configurable and plugable, that can help you to scale and distribute the load with ease. Such a tool is Apache Camel which has also a Solr connector now.

All started few months ago, during basecamp days at Sourcesense, where me and my colleague Alex were experimenting with different projects to implement a pipeline for indexing data into Solr. As expected we discovered Camel and after few days of pairing, we were ready with the initial version of the Solr component which got committed to Camel and extended further by Ben Oday. At the moment it is full featured Solr connector, that uses SolrJ behind the scene and lets you to: configure all parameters of SolrServer and StreamingUpdateSolrServer;  supports the operations: insert, add_bean, delete_by_id, delete_by_query, commit, rolback, optimize; index files, SolrInputDocument instances, beans with annotations or individual message headers.

Creating a Camel route to index all the data from a relational database table and local file system is simple:
public void configure() {
from("timer://clear?repeatCount=1")
        .to("direct:clearIndex");

from("file:src/data?noop=true")
        .to("direct:insert");

from("timer://database?repeatCount=1")
        .to("sql:select * from products?dataSourceRef=productDataSource")
        .split(body())
        .process(new SqlToSolrMapper())
        .to("direct:insert");

from("direct:insert")
        .setHeader(SolrConstants.OPERATION, constant(SolrConstants.OPERATION_INSERT))
        .to(SOLR_URL)
        .setHeader(SolrConstants.OPERATION, constant(SolrConstants.OPERATION_COMMIT))
        .to(SOLR_URL);

from("direct:clearIndex")
        .setHeader(SolrConstants.OPERATION, constant(SolrConstants.OPERATION_DELETE_BY_QUERY))
        .setBody(constant("*:*"))
        .to(SOLR_URL)
        .setHeader(SolrConstants.OPERATION, constant(SolrConstants.OPERATION_COMMIT))
        .to(SOLR_URL);
}
The above route will first clear the index by deleting all the documents followed by a commit. Then it will start polling files from src/data folder, read each file and send it to Solr endpoint. Assuming that the files are in a format Solr can understand, they will be indexed and committed. The third route will retrieve all the products from database (in memory), split them into individual records, map each record to Solr fields, and digest :)

Luckily, in 2012, the life of software developer is not that simple boring. Instead nowadays a more realistic indexing requirement would consist of something like this:

1. Get the backup files from amazon S3 and index. If a document is approved, commit it as soon as  possible, otherwise commit every 10 minutes.

How can Camel help you with this requirement? Camel supports most popular amazon APIs including S3. Using aws-s3 component, it is possible to read files from a S3 bucket, then apply a filter for approved documents, in order to send them into a separate route for instant commit.
<route>
  <from uri="aws-s3://MyBucket?delay=5000&maxMessagesPerPoll=5"/>
  <choice>
    <when>
      <xpath>/add/doc[@status='approved']</xpath>
      <to uri="direct:indexAndCommit"/>
    </when>
    <otherwise>
      <to uri="direct:index"/>
    </otherwise>
  </choice>
</route>
<route>
  <from uri="timer://commit?fixedRate=true&period=600s"/>
  <from uri="direct:commit"/>
</route>
2. Retrieve customer data from database every 5 seconds by reading10 records at a time. Also look for deltas. Enrich the address data with latitute/longitute by calling XXX external service to facilitate spatial search in Solr.
<route id="fromDB">
  <from uri="jpa://com.ofbizian.pipeline.Customer?consumer.namedQuery= newCustomers&amp;maximumResults=10&amp;delay=5000"/>
  <enrich uri="direct:coordinateEnricher" strategyRef="latLongAggregationStrategy"/>
  <to uri="direct:index"/>
</route>

<route>
  <from uri="direct:coordinateEnricher"/>
  <setHeader headerName="CamelHttpQuery">
    <simple>address='${body.address}'&amp;sensor=false</simple>
  </setHeader>
  <to uri="http://maps.google.com/maps/api/geocode/xml"/>
  <setHeader headerName="lat">
    <xpath resultType="java.lang.Double">//result[1]/geometry/location/lat/text()</xpath>
  </setHeader>
  <setHeader headerName="lng">
    <xpath resultType="java.lang.Double">//result[1]/geometry/location/lng/text()</xpath>
  </setHeader>
</route>
The above route reads from Customer table 10 records at a time, and for each one will call google's maps API to get latitude and longitude using the customer address field. The coordinates are extracted from response using XPath and merged back into Customer object. Simple, isn't it.

3. Index the content under this/that/path in our content management system and also monitor for updates.
<route>
  <from uri="jcr://user:pass@repository/import/inbox/signal?eventTypes=3&deep=true&synchronous=false"/>
  <to uri="direct:index"/>
</route>
Camel has a jcr connector, which allows you to create content in any java content repository. There is also an improvement submitted in CAMEL-5155 which will allow reading content from JCR v.2 supporting repositories soon.
If you are lucky and your CMS supports CMIS you can use my camel-cmis connector from github for the same purpose.

4. Listen for tweets about our product/company, do sentiment analysis, and index only positive tweets.
<route id="fromTwitter">
  <from uri="twitter://streaming/filter?type=event&keywords=productName&consumerKey={{consumer.key}}&consumerSecret={{consumer.secret}}"/>
  <setHeader headerName="CamelHttpQuery">
    <language language="beanshell">
      "q=" + java.net.URLEncoder.encode(request.getBody().getText(), "UTF-8")
    </language>
  </setHeader>
  <throttle timePeriodMillis="1500">
    <constant>1</constant>
    <to uri="http://data.tweetsentiments.com:8080/api/analyze.xml"/>
    <setHeader headerName="sentiment">
      <xpath resultType="java.lang.Double">/sentiment/value/text()</xpath>
    </setHeader>
    <filter>
      <simple>${in.header.sentiment} > 0</simple>
      <to uri="direct:index"/>
    </filter>
  </throttle>
</route>
This route is going to listen for tweets using twitter's real time api, url encode the tweet and call tweetsentiments api for sentiment analysis. In addition it will apply throttling, so only one request at most is made every 1500 milliseconds, because there is restriction on the number of calls per second. Then the route is applying a filter to ignore all the negative tweets, before indexing.

As you can see Camel can interact with many disparate systems (including Solr) easily, and even if you have a very custom application, writing a connector for it would not be difficult. But this is only one side of the story. At the other side, there is a full list of Enterprise Integration Patterns implemented by Camel which are needed for any serious data ingestion pipeline: Routers, Translator, Filter, Splitter, Aggregator, Content Enricher, Load Balancer... Last but not least: Exception Handling, Logging, Monitoring, DSLs... In two words: Camel Rocks!

PS: The full source code of the examples can be found on my github account.

About Me