Apache Solr is "the popular, blazing fast open source enterprise
search platform" built on top of Lucene. In order to do a search (and
find results) there is the initial requirement of data ingestion usually
from disparate sources like content management systems, relational
databases, legacy systems, you name it... Then there is also the
challenge of keeping the index up to date by adding new data, updating
existing records, removing obsolete data. The new sources of data could
be the same as the initial ones, but could also be sources like twitter, AWS or rest endpoints.
Solr can understand different file formats and provides fair amount of options for data indexing:
- Direct HTTP and remote streaming - allows you to interact with Solr over HTTP by posting a file for direct indexing or the path to the file for remote streaming.
- DataImportHandler - is a module that enables both full and incremental delta imports from relational databases or file system.
- SolrJ - a java client to access Solr using Apache Commons HTTP Client.
All started few months ago, during basecamp days at Sourcesense, where me and my colleague Alex were experimenting with different projects to implement a pipeline for indexing data into Solr. As expected we discovered Camel and after few days of pairing, we were ready with the initial version of the Solr component which got committed to Camel and extended further by Ben Oday. At the moment it is full featured Solr connector, that uses SolrJ behind the scene and lets you to: configure all parameters of SolrServer and StreamingUpdateSolrServer; supports the operations: insert, add_bean, delete_by_id, delete_by_query, commit, rolback, optimize; index files, SolrInputDocument instances, beans with annotations or individual message headers.
Creating a Camel route to index all the data from a relational database table and local file system is simple:
public void configure() {The above route will first clear the index by deleting all the documents followed by a commit. Then it will start polling files from src/data folder, read each file and send it to Solr endpoint. Assuming that the files are in a format Solr can understand, they will be indexed and committed. The third route will retrieve all the products from database (in memory), split them into individual records, map each record to Solr fields, and digest :)
from("timer://clear?repeatCount=1")
.to("direct:clearIndex");
from("file:src/data?noop=true")
.to("direct:insert");
from("timer://database?repeatCount=1")
.to("sql:select * from products?dataSourceRef=productDataSource")
.split(body())
.process(new SqlToSolrMapper())
.to("direct:insert");
from("direct:insert")
.setHeader(SolrConstants.OPERATION, constant(SolrConstants.OPERATION_INSERT))
.to(SOLR_URL)
.setHeader(SolrConstants.OPERATION, constant(SolrConstants.OPERATION_COMMIT))
.to(SOLR_URL);
from("direct:clearIndex")
.setHeader(SolrConstants.OPERATION, constant(SolrConstants.OPERATION_DELETE_BY_QUERY))
.setBody(constant("*:*"))
.to(SOLR_URL)
.setHeader(SolrConstants.OPERATION, constant(SolrConstants.OPERATION_COMMIT))
.to(SOLR_URL);
}
Luckily, in 2012, the life of software developer is not that
1. Get the backup files from amazon S3 and index. If a document is approved, commit it as soon as possible, otherwise commit every 10 minutes.
How can Camel help you with this requirement? Camel supports most popular amazon APIs including S3. Using aws-s3 component, it is possible to read files from a S3 bucket, then apply a filter for approved documents, in order to send them into a separate route for instant commit.
<route>2. Retrieve customer data from database every 5 seconds by reading10 records at a time. Also look for deltas. Enrich the address data with latitute/longitute by calling XXX external service to facilitate spatial search in Solr.
<from uri="aws-s3://MyBucket?delay=5000&maxMessagesPerPoll=5"/>
<choice>
<when>
<xpath>/add/doc[@status='approved']</xpath>
<to uri="direct:indexAndCommit"/>
</when>
<otherwise>
<to uri="direct:index"/>
</otherwise>
</choice>
</route>
<route>
<from uri="timer://commit?fixedRate=true&period=600s"/>
<from uri="direct:commit"/>
</route>
<route id="fromDB">The above route reads from Customer table 10 records at a time, and for each one will call google's maps API to get latitude and longitude using the customer address field. The coordinates are extracted from response using XPath and merged back into Customer object. Simple, isn't it.
<from uri="jpa://com.ofbizian.pipeline.Customer?consumer.namedQuery= newCustomers&maximumResults=10&delay=5000"/>
<enrich uri="direct:coordinateEnricher" strategyRef="latLongAggregationStrategy"/>
<to uri="direct:index"/>
</route>
<route>
<from uri="direct:coordinateEnricher"/>
<setHeader headerName="CamelHttpQuery">
<simple>address='${body.address}'&sensor=false</simple>
</setHeader>
<to uri="http://maps.google.com/maps/api/geocode/xml"/>
<setHeader headerName="lat">
<xpath resultType="java.lang.Double">//result[1]/geometry/location/lat/text()</xpath>
</setHeader>
<setHeader headerName="lng">
<xpath resultType="java.lang.Double">//result[1]/geometry/location/lng/text()</xpath>
</setHeader>
</route>
3. Index the content under this/that/path in our content management system and also monitor for updates.
<route>Camel has a jcr connector, which allows you to create content in any java content repository. There is also an improvement submitted in CAMEL-5155 which will allow reading content from JCR v.2 supporting repositories soon.
<from uri="jcr://user:pass@repository/import/inbox/signal?eventTypes=3&deep=true&synchronous=false"/>
<to uri="direct:index"/>
</route>
If you are lucky and your CMS supports CMIS you can use my camel-cmis connector from github for the same purpose.
4. Listen for tweets about our product/company, do sentiment analysis, and index only positive tweets.
<route id="fromTwitter">This route is going to listen for tweets using twitter's real time api, url encode the tweet and call tweetsentiments api for sentiment analysis. In addition it will apply throttling, so only one request at most is made every 1500 milliseconds, because there is restriction on the number of calls per second. Then the route is applying a filter to ignore all the negative tweets, before indexing.
<from uri="twitter://streaming/filter?type=event&keywords=productName&consumerKey={{consumer.key}}&consumerSecret={{consumer.secret}}"/>
<setHeader headerName="CamelHttpQuery">
<language language="beanshell">
"q=" + java.net.URLEncoder.encode(request.getBody().getText(), "UTF-8")
</language>
</setHeader>
<throttle timePeriodMillis="1500">
<constant>1</constant>
<to uri="http://data.tweetsentiments.com:8080/api/analyze.xml"/>
<setHeader headerName="sentiment">
<xpath resultType="java.lang.Double">/sentiment/value/text()</xpath>
</setHeader>
<filter>
<simple>${in.header.sentiment} > 0</simple>
<to uri="direct:index"/>
</filter>
</throttle>
</route>
As you can see Camel can interact with many disparate systems (including Solr) easily, and even if you have a very custom application, writing a connector for it would not be difficult. But this is only one side of the story. At the other side, there is a full list of Enterprise Integration Patterns implemented by Camel which are needed for any serious data ingestion pipeline: Routers, Translator, Filter, Splitter, Aggregator, Content Enricher, Load Balancer... Last but not least: Exception Handling, Logging, Monitoring, DSLs... In two words: Camel Rocks!
PS: The full source code of the examples can be found on my github account.