embedded-druid: Leveraging Druid Capabilities in Stand-alone Applications

Co-authors:  Ramachandran Ramesh, Mahesh Somani, and Sankar Venkatraman

The eBay Cloud Platform team is happy to announce the open-source project embedded-druid, which aims to provide Druid capability for a reasonably small amount of data without involving the complexity of multi-node setup. That is, embedded-druid is Druid but with a single JVM process.

Background

Druid is an open-source data store designed for real-time exploratory analytics on large data sets. Combining a column-oriented storage layout, a distributed, shared-nothing architecture, and an advanced indexing structure, Druid allows for the arbitrary exploration of billion-row tables with sub-second latencies. It also supports fast aggregations and OLAP queries. Druid is proven technology for executing OLAP kinds of queries involving Big Data, and for providing sub-second response times.

Motivation

Given its distributed, shared-nothing architecture for large amounts of data, Druid has multiple components: real-time node, historical node, broker node, co-ordinator node, deep storage, MySQL, ZooKeeper, etc. If the input data size is small (say, up to hundreds of millions of rows), then the overhead involved in deploying Druid might be excessive; one might prefer to use in-memory database systems like Apache Derby or PostgreSQL if the report requirement is very simple (such as grouping by some dimension or retrieving topN values). There are numerous use cases where the input is not “Big Data” but rather medium or small data requiring OLAP-like capabilities, such as grouping by multiple dimensions and handling percentile and other aggregation functions.

For example, in eBay we generate operational metrics reports for applications that run on multiple machines across data centers. This report contains total request counts, average request durations, etc. across different dimensions, such as the type of request, data center, request status, dependency, etc. Each application owner might view different types of information from this report – top hosts with errors, slowest requests by request type / data center, or requests by error code, as just a few examples. Given the dynamic nature of such queries, if Druid capabilities can be leveraged without deployment complexity, then the lives of developers, debuggers, and analysts can be made much easier.

embedded-druid in action

Let us assume a simple database that represents the number of characters added for a particular page in Wikipedia. We have the following columns representing the dimensions for our data:

Timestamp, Page, Username, Gender, City

Here is the metric we are interested in:

CharsAdded

The following table shows sample data for the above schema:

Timestamp Page Username Gender City CharsAdded
1234567 JB Abc Male SF 1290
1234576 JB Xyz Female SJ 3421
1234687 AB Qwe Male LA 2345
1234789 AB Def Female LV 1234

 

We want to generate different reports, such as “How many edits were made on page AB grouped by gender?” or “What is the histogram for edits made on page JB in SF?” The following sections walk through how to generate such reports using embedded-druid.

Creating the loader

Currently, embedded-druid supports loading CSV files, for which the implementation class CSVLoader is available. One needs to first provide a list of all columns available in the CSV file (including metrics), a list of dimensions, and a column specifying the timestamp (if available). For example, for the Wikipedia schema mentioned above, the CSV file might have data in this format:

Timestamp, Page, Username, Gender, City, metric, value

The following code creates the Loader object required to load this data in memory:

List<String> columns = Arrays.asList("Timestamp", "Page", "Username", "Gender", "City", “metric”, “value”);

    List<String> metrics = Arrays.asList("value");
    List<String> dimensions = new ArrayList<String>(columns);
    dimensions.removeAll(metrics);

    Loader loader = new CSVLoader(reader, columns, dimensions, "Timestamp");

Creating Druid segment/index files

Druid generates segment and index files for the given input. Once the Loader object is created, one needs to specify segment and index files required for query purposes. This specification includes the available dimensions and the kind of aggregator function to be used for querying. For example, if one is interested in querying values like total count, max, min, total sum, and percentiles, then the following AggregatorFactory objects need to be created:

DimensionsSpec dimensionsSpec = new DimensionsSpec(dimensions, null, null);
AggregatorFactory[] metricsAgg = new AggregatorFactory[] {
        new LongSumAggregatorFactory("agg_count", "count"),
        new MaxAggregatorFactory("agg_max", "max"),
        new MinAggregatorFactory("agg_min", "min"),
        new DoubleSumAggregatorFactory("agg_sum", "sum"),
        new ApproximateHistogramAggregatorFactory("agg_histogram", "value", null, null, null, null)
    };

To create segment and index files locally, one needs to create a QueryableIndex object as follows:

IncrementalIndexSchema indexSchema = new IncrementalIndexSchema(0, QueryGranularity.ALL, dimensionsSpec, metricsAgg);
QueryableIndex index = IndexHelper.getQueryableIndex(loader, indexSchema);

By default, segment files are created at the location System.getProperty("druid.segment.dir"). If this property is not set, then the files will be created at the temporary location System.getProperty("java.io.tmpdir") + File.separator + "druid-tmp-index-". Therefore, if one wants to create segment files at a specified location, then  property “druid.segment.dir” needs to be set first.

Querying data

Once segment files are created, one can execute different kinds of queries using the QueryableIndex object. For example, if one wants to execute GroupByQuery for the above mentioned schema, then the code looks like this:

List<DimFilter> filters = new ArrayList<DimFilter>();
filters.add(DimFilters.dimEquals("Page", "JB"));
filters.add(DimFilters.dimEquals("Gender", "Male"));
filters.add(DimFilters.dimEquals("metric", "CharsAdded"));

GroupByQuery query = GroupByQuery.builder()
        .setDataSource("test")
        .setQuerySegmentSpec(QuerySegmentSpecs.create(new Interval(0, new DateTime().getMillis())))
        .setGranularity(QueryGranularity.NONE)
        .addDimension("City")
        .addAggregator(new LongSumAggregatorFactory("agg_count", "agg_count"))
        .addAggregator(new MaxAggregatorFactory("agg_max", "agg_max"))
        .addAggregator(new MinAggregatorFactory("agg_min", "agg_min"))
        .addAggregator(new DoubleSumAggregatorFactory("agg_sum", "agg_sum"))
        .addAggregator(new ApproximateHistogramFoldingAggregatorFactory("agg_histogram", "agg_histogram", 20, 5, null, null))
        .addPostAggregator(new QuantilesPostAggregator("agg_quantiles", "agg_histogram", new float[] {0.25f, 0.5f, 0.75f, 0.95f, 0.99f}))
    .setFilter(DimFilters.and(filters))
    .build();
Sequence<Row> sequence = QueryHelper.run(query, index);
    ArrayList<Row> results = Sequences.toList(sequence, Lists.<Row>newArrayList());

Similarly, here is the code snippet for executing TopNQuery:

List<DimFilter> filters = new ArrayList<DimFilter>();
filters.add(DimFilters.dimEquals("Page", "JB"));
filters.add(DimFilters.dimEquals("Gender", "Male"));
filters.add(DimFilters.dimEquals("metric", "CharsAdded"));

TopNQuery query =
        new TopNQueryBuilder()
            .threshold(5)
            .metric("agg_count")
            .dataSource("test")
            .intervals(QuerySegmentSpecs.create(new Interval(0, new DateTime().getMillis())))
            .granularity(QueryGranularity.NONE)
            .dimension("City")
            .aggregators(
                Arrays.<AggregatorFactory>asList(
                    new LongSumAggregatorFactory("agg_count", "agg_count"),
                    new MaxAggregatorFactory("agg_max", "agg_max"),
                    new MinAggregatorFactory("agg_min", "agg_min"),
                    new DoubleSumAggregatorFactory("agg_sum", "agg_sum"))
            .filters(DimFilters.and(filters))
            . build();

Sequence<Result> sequence = QueryHelper.run(query, index);
    ArrayList<Result> results = Sequences.toList(sequence, Lists.<Result>newArrayList());

Future work

We are planning to extend this work by providing (and/or integrating) REST APIs for ingestion and for querying Druid data. For visualization, we also plan to integrate with an easy-to-use UI like Grafana. These enhancements will help users analyze data quickly and surface meaningful information promptly.