Monitoring at eBay with Druid

At eBay, we switched one of our monitoring tech stacks from legacy homegrown architecture to a Druid-based real-time monitoring system. In this article, we discuss how we transitioned our journey to a new stack and also the benefits it has to offer.

eBay supports millions of users every day for ecommerce transactions. Large user growth has also come with an explosion of data produced by various applications that support different products. Logs are the heart of the application that determine what an application is doing. It is very difficult to visualize logs as the application size grows. We have a centralized log store that processes all the logs. Harnessing useful information is difficult and not feasible in real-time directly from the logs. At eBay, the monitoring team visualized the problem in a different way. Extracting useful events from the logs and processing those events through a data pipeline is a better way to solve the problem.

The number of events correlate directly to the amount of logs generated based on the current system’s traffic. Some applications may be generating hundreds to thousands of events while others may be generating millions of events. Our interest is to monitor how the individual applications are performing, based on the events extracted from logs and the ability to alert the users in case of an abnormal behavior when there are too many errors or anomalies happening in the system.

Application events consist of error status codes, url transactions, command executions, and the build id for the application artifact running on different hosts. These events serve various purposes.

These events are of interest to the individual app developers and site reliability engineering (SRE) teams to monitor performance of an application in real time. They are able to visualize how many errors are happening in the system, slice and dice those errors by command executions and build causing these errors, and then set up alerting based on error thresholds that can impact the application performance.

This information provides critical insights when the application teams have to deploy new artifacts of the application in production. They will be able to do a sampled rollout of code on a small percentage of hosts and visualize real-time dashboards to determine the behavior of the new code with respect to errors generated, and then compare real-time data with historical data that gives a level of confidence.

Legacy Architecture 

Screen Shot 2019 05 15 at 12.00.50 PM

The legacy architecture was designed years ago when the number of events generated by the entire site were in the order of 10 of millions every day. This was scalable at that time and for a few years down the lane.

There were some of the shortcomings of the legacy architecture over time:

  1. Cube generation was custom written code for each of the intervals. Generation of data for the current time used to take a few minutes, which was not acceptable for real-time monitoring. This delay increased with the increase in the amount of data.
  2. Horizontal scaling of the custom cube generation had less success over time as the amount of data increased.
  3. Slow generation or failure to create cubes in case of very high cardinality of the dimensions (a few hundred thousand to a few million combinations).

New architecture 

Screen Shot 2019 05 15 at 12.01.02 PM

In the new architecture, the Tibco dependency has been removed, and Kafka is used as a layer for persisting messages temporarily for consumption. Tranquility is used to consume data from Kafka and push into Druid.

Key points of the new architecture:

  • Minimal end-to-end latency from the event generation to realizing it at the egress (< 10 seconds at max for very large applications).
  • Use Druid for processing multiple granularities of data such as 1 minute, 15 minutes, 1 hour and so on. Reindex data for 1 day interval.
  • Kubernetes deployment enables us to delete the cluster and recreate it in a matter of a few minutes in case of upgrades or maintenance. It’s very easy to perform rolling updates with 100s of nodes.
  • Druid efficiently handles high cardinality data. Even millions of dimension values are possible with Druid without incurring any additional delay as long as the horizontal scaling is provided sufficiently for indexing tasks, which is achievable with zero down time.

(Tibco is an enterprise message bus for data transport. Tranquility is a part of druid-io which has APIs to send data streams to Druid.)

Event processing

Events comprise the things happening in a system that are sporadic in nature. A few apps generate a few events a day while others generate millions of events in a minute. Different types of events may be generated based on the purpose they serve. We discuss monitoring event in this context.

In our use case, data has a fixed dimension keys (11 dimensions), a timestamp, and two metrics to be computed: Count and Latency. Count is the number of events happened on a host while data was collected at a particular timestamp. Latency represents the sum of latency across all the transactions. Hundreds to millions of events may be generated by thousands of hosts across apps and can contain different set of dimension values per event. The dimension values for each of the dimensions can vary from ten to a few thousand per application.

The developers and SRE teams are interested in events like the above to find out the number of errors happening on the site for a particular application or across multiple applications when there is a large impact. Collecting a few million events per minute in real time into a central store and processing them comes with a set of challenges of accuracy, speed, reliability, and resiliency. 

Scale

Monitoring events are generated across the entire fleet at the rate of 8 million events/sec on an average to 10 million events/sec at peak traffic, from more than 5,000+ applications. Monitoring events require slicing and dicing across various dimensions, such as application name, application type, operation name, error status, build running the application, host and so on. All the data should be aggregated and available in near real-time service level agreement. There are 11 fixed dimensions, and dimension value cardinality across all dimensions is between 1.4 to 2 million unique combinations.

Our Druid clusters are deployed across multiple availability zones for high availability and 2 copies of replicas per shard per data center are maintained. This allows us to have a total of 4 replicas available across 2 data centers. Each data center has a few hundred middle managers, 2 overlord + coordinator nodes, 15 broker nodes, and 80 historical nodes.

Peak data traffic is shown in the following screenshot.

Screen Shot 2019 05 15 at 12.01.15 PM

Egress design

Egressing of data is designed to keep the availability of data high at all times. A layer in front of Druid brokers is designed to query data from Druid to determine the health of each data center. We expect that the health of both data centers is always maintained at optimal and highly available. In case of any data loss in any data center, Egress switches to a cluster that has better data quality.

We get the event counts from each cluster every minute to determine if both clusters have similar data (with a deviation of < 0.5% of the event count difference between the clusters). In case the deviation is too much, we pick the cluster that has better event counts. The calculations happen every minute, and we continue to update the cluster health to determine the best cluster that can serve the data for a time duration. We also mark down the cluster if any data loss is detected so that none of the queries go into the bad cluster’s broker node for querying. 

Screen Shot 2019 05 15 at 12.01.22 PM

We support various granularities that earlier versions of Druid supported (1 minute, 15 minute, 1 hour, 1 day) depending upon the length of time for which the data is queried. This granularity selection happens automatically. When required, the granularity can be forced to fetch more granular data for a longer period of time at the cost of response time due to large volume of data queried.

Conclusion

For site monitoring and event tracing use cases that carries a high cardinality data that requires aggregation in real-time or near real-time is critical for a big ecosystem like eBay to make data driven decisions. The capability for getting insights that an analytics store like Druid can provide is extremely valuable and important from the monitoring aspect, which a lot of teams and developers rely on to maintain the system availability and reliability for eBay’s customers.

References

All references to Druid in this blog post refers to the open source version of Druid. Please refer to the following links:

  • http://druid.io/
  • https://github.com/apache/incubator-druid/

Acknowledgements 

  • Mohan GaradiSoftware Engineer
  • Premendra Singh—Software Engineer
  • Mahesh Somani—Architect
  • Saurabh Mehta—Engineering Manager
  • Amber Vaidya—Product Manager
  • Andy Santosa—Engineering Director
  • Rami El-Charif—VP Infrastructure Engineering
  • Our Infrastructure and Partner teams - Tess.IO, Capacity, SREs, Operations