Note: Knowledge of Flume, Kafka and AWS EC2, Auto-scaling, S3, Spot instances would be helpful for understanding this article.
Being a big data marketingcompany, we do a lot of analytics on the data that gets generated by this traffic to websites. This poses considerable scale challenges when we begin to think of implementing systems which are real-time. One such challenge was seen when we wanted to implement real-time logging for all our Real Time Bidding (RTB) bid servers. Our bid severs mainly has the jetty web service running in it which process HTTP bid requests sent by ad-exchanges and generates some log entries for the requests that it has processed. These log entries are later used for Analytics.
Here are some numbers to understand how the RTB infra is distributed.
- We are geographically spread in 6 regions (US-East, US-West, EU-Weast, AP-Southeast, AP-Northeast & China).
- Each of the bigger geographies (APSE & USE), has about 20 c3.2xlarge serving bid traffic during peak hours. And across all geographies, there could be about 75 c3.2xlarge machines handling bid traffic during these peak hours.
- Each region is treated very independently i.e. every region has its own set of cache servers, front-end bid servers, back-end cache population servers, etc.
- Each bid server handles approx. 4000 requests per second and the size of logs generated by each bid sever is around 300 MB / min. So, at peak times, the bigger regions would be generating around 20 * 300 ~ 6 GB of uncompressed data every minute or 100 MB / second.
All these logs are written to files and uploaded to S3 to be consumed later by analytics systems. Earlier, we used to use the below method to make these logs available for Analytics:
- Uncompressed logs got uploaded from each bid sever every half-an-hour to AWS S3.
- As soon as a log got uploaded to S3, the bid sever would send a SQS message giving details about the log that got uploaded.
- There used to be a set of offline logging instances, which were subscribed to the SQS queue. As soon as a message was recieved in the queue, one of the offline-logging server, would download and decipher the message.
- The offline-logging server would then proceed to download the log-file mentioned in the message, compress it and upload to S3. We preferred to do the compression on the offline-logging instances rather than on the bid severs as we would otherwise end up with spikes in CPU utilization on bid severs at the time of compression.
This worked well as S3 was acting as the intermediate layer and each offline logging server was able to independently download any newly uploaded log file, compress it and re-upload it to S3 for consumption by analytics systems. However, there was one major concern with this architecture. If a bid server got terminated suddenly, upto the last half-an-hour data on that bid server would get lost. Also, our bid servers were on auto-scaling and we were thinking of using spot instances rather than on-demand instances. With spot instances, there is a high chance of sudden termination. So, this implied that if we were to use spot instances, we had to make the bid-servers stateless and so we did not have the luxury of accumulating log data for half-hour and uploading it periodically. We had to push the logs out of the bid-server real-time!
Architecture And Design of New Implementation
So, here was the solution that we implemented:
- We decided to use kafka as a broker for the log events.
- Each bid sever would push its log events/messages to a kafka topic.
- The offline-logging servers would connect to the specific topic as a kafka consumer and keep reading the log entries and writing to files.
- Periodically (currently every half-an-hour), the consumers would stop reading data and upload the files that got generated to S3, which would later be consumed by analytics systems.
The java process running in the offline-logging severs is a multi-threaded multi-layered appliaction with the important thread pools being as follows:
- Consumer Thread Pool: This is responsible for running one consumer per thread to read data from Kafka Broker
- File Thread Pool: The consumer thread pool would submit data to the File thread pool which would be responsible for compressing it and writing it to files.
- S3 Upload thread pool: Periodically (currently every half-an-hour), the consumer in the first layer, would stop consuming data from the kafka broker, close all the files that were being written to by the File Thread pool, and then inform the S3 Upload thread pool to upload the files generated from this consumer thread to S3. Once this is done, the offsets in the Kafka partitions would be committed, indicated that the data in the topic partitions from which the consumer was reading has been successfully uploaded to S3 upto the committed offset.
We faced several challenges, while implementing this solution. I am documenting some of them here:
Ensuring that data uploaded to S3 is complete and not redundant:
If you observe the design of offline-logging java process, it can be observed that there are multiple points at which if something fails, we could end up with missing data or redundant data uploaded to S3. For ex. if a file is uploaded to S3 from offline-logging server, but before it can commit the offset to Kafka, let us say the java process crashes. In this case, another offline-logging consumer would re-process the data of the partition (from the last committed offset) and we could end up with redundant data in S3. To address this, we suffixed each file with the partition and the offset from where it was read. So, if a partition data was re-processed from the last committed offset, this uploaded S3 file would just get overwritten.
Handling Kafka (Broker) Server downtime
It sometimes happens that we might have to move to a new Kafka server and so there could be downtime for Kafka broker. During this time, the jetty process running on bid sever would not be able to submit log events to the kafka topic. To overcome this problem, we run flume agent on each of our bid severs. The main jetty process which actually generates the log event, submits it to the flume agent running on the same machine, which in turn submits to kafka broker using its Kafka sink. In the event that the kafka server is unreachable, flume spools them locally (we are using FileChannel) and resubmits to kafka broker once it becomes available.
Initially, we observed that flume was not able to submit data fast enough to Kafka. One of the major reasons was the flume batch size and transaction capacity was small. We realized that increasing the batch size and transaction capacity helped considerably. See http://blog.cloudera.com/blog/2013/01/how-to-do-apache-flume-performance-tuning-part-1/ for more information on this.
Disk and Network I/O bottleneck on the kafka server
We are using one c3.2xlarge for running the kafka server. We saw that the peak bandwidth on these instances is generally around 150 MB/s. We were further using EBS volumes for storing kafka data logs, which peaks out at similar bandwidth. Considering that we were generating about 100 MB of log data every second, and that this 100 MB data again had to be consumed by offline-logging servers as well, the single c3.2xlarge instance became a bottleneck as it couldn’t support 100 MB/s in as well as 100 MB/s out traffic. To work around this, we used snappy compression while pushing data from kafka. This helped us reduced bandwidth by 50%. Also, see http://nehanarkhede.com/2013/03/28/compression-in-kafka-gzip-or-snappy/ on an interesting comparison between using snappy and gzip compression for kafka.
Kafka broker throughput not increasing in spite of available CPU on the kafka server
We also observed that even when the number of bid severs were high, the CPU utilization of the kafka broker server was low and there was a backlog building up on the bid server side (in the flume agent). This was due to the lower number of io threads on the kafka broker. The solution to this was to increase num.io.threads in the Kafka broker.
Rebalancing of offline-logging consumers
Another edge case issue that we faced was related to consumer rebalancing in Kafka. Let us say that we have a offlien-logging server which is consuming data from a particular kafka topic partition. Now, after reading some data from this partition, let us say that partition rebalance has occurred and this partition is now consumed by another offline-logging sever. Now before this first server begins upload to S3, suppose rebalance again occurs and the partition is assigned back to this first offline-logging server again, then it would read data from the last committed offset and hence could end up with duplicate data. To address this, we wrote a rebalance notifier, which notifies whenever a partition rebalance occurs and in that case, the offline-logging java process would clear all data that it had written for that specific topic partition until then. So, if the partition is reassigned to the same offline-logging instance, it has no older data related to that partition. An alternative solution to this problem could have been using low-level consumers rather than high-level consumers, but then we would lose the flexibility of dynamically scaling up and scaling down offline logging instances in an elegant manner.
- Increasing flume transaction capacity helps improve throughput
- Use EBS Optimized instances in cases where you want reliable EBS bandwidth
- Using compression helped us reduce disk and network bandwidth considerably on the kafka server
- Increasing num.io.threads in the kafka broker helped increase kafka throughput and utilize CPU to the full
- Higher read throughput on the kafka server is observed in times of backlog as data is no longer available from disk buffers and it has to be read from disk
- Parallel stripe of LVM for EBS volumes does not help much as EBS bandwidth generally becomes bottleneck. On the other hand parallel stripe of ephemeral storage helps increase disk throughput (this was seen when we later moved our Kafka Broker to d2.2xlarge)