RevX processes terabytes of data on a daily basis. This imposes unique challenges in terms of volume and speed at which data can be processed. At RevX, we experimented with two data processing patterns. In this blog, we will talk about pros and cons of both patterns based on our experience of dealing with large volume of real-time data.
Before going into the details, let’s briefly discuss the high-level architecture of a real-time bidding (RTB) system. A RTB platform like RevX integrates with programmatic exchanges such as Google DoubleClick Exchange(AdX), MoPub, PubMatic, Rubicon, AppNexus and more. These programmatic exchanges expect a successful bid response from RevX in less than 100ms. RevX’s bidder takes 5-10ms for internal processing, leaving 90-95ms for the network latency between bidder and exchange endpoints. This almost always require bidder to be deployed in the same region where exchanges are located.
Typically an RTB exchange serves traffic from different data centers(DC) which are close to the end user. For example, Google AdX send bid requests from DC’s located in Asia, Europe, and North America and hence RevX also has datacenters in Asia, Europe, and North America to serve this traffic efficiently. Every DC generates a massive amount of data which is shipped to a central DC for processing. This further adds to the complexity of processing large volume of data because the centralized data processing system expects data to arrive not only in time but also in the right sequence.
With shipping delays in mind, the most critical question that any big data platform team needs to answer is “When to start processing the data?”
There are generally two choices:
1. Start processing after all the data has arrived
2. Start processing as soon as data arrives
Start Processing After All The Data Has Arrived
This is generally the most preferred approach. One of the reasons is the availability of powerful open source workflow management systems like “Oozie”. Oozie has a concept of dependency management wherein data processing starts only when all the dependencies are met. For example, if we have 10 bidding servers spread across multiple DC, data processing for a specific slot of bucket will start only when all the files from all bidding servers for that timeslot have arrived. Oozie also has useful timeout feature wherein it waits for the configured time before it gracefully fails a data processing job. This worked well in the beginning but as RevX platform evolved, we started seeing following challenges:-
1. As the RTB traffic grew, RevX evolved from a fixed number of bidding servers to auto-scaling infrastructure. With the number of servers generating logs changing based on the traffic, the dependency management module went out for a toss. There was a need to communicate to “Oozie”, the number of servers it should expect data from for every hourly slot. We overcame this by implementing a new system called “Success File Writer”. We configured the Oozie workflow to process the data of any given hour only when it finds a success flag written for an hourly data bucket. The problem was now transferred to “Success File Writer” which reliably needed to figure out when all files from all servers have been successfully shipped. However, this created other dependencies which created new unique issues (more on that in another blog).
2. Another challenge in this approach is the delay it introduces in making processed data available for reporting and analytics. The data processing system waits until all files are available. RevX bidder generates thousands of files for an hour worth of data. In order to give accurate report, the system doesn’t process the data even if one file is missing.
3. This approach requires manual intervention in scenarios like hardware issues, shipping delays exceeding configured timeout in Oozie etc. Usually, there is no full-proof way to select the correct timeouts. We generally configure the timeout as per the business needs. But the time spent in manual intervention when jobs fail due to timeouts was an unnecessary overhead.
4. Lastly, a major challenge with this approach is time and energy spent in fixing data due to Hadoop cluster issues. When Hadoop cluster fails, one needs to manually re-run the jobs one by one for each lost time bucket to fix historical data.
The learning and challenges from this experiment made us explore the other pattern “Process the data as it comes”
Process The Data As It Comes
After learning from the shortcomings of the first approach, we migrated data processing to a more real-time driven processing. This required building an accounting system for all the log files that were shipped to a central repository. The file accounting system is pretty fundamental to this pattern. In RevX, all log files are moved to a central S3 repository on AWS. Hence, we built a file accounting component over AWS Lambda. This subsystem adds file’s meta info (such as time, name, server, size) to a SQL database.
We built another sub-system (internally called “Input File Generator” IFG) that generates input files for every data processing job. This component tracks newly arrived files in S3 by looking into SQL database and kicks off the process job. If data processing job fails for any reason, the IFG marks the file accordingly and submits it for processing in the next batch. We also built a monitoring tool which gives the visibility into the status of data processing pipeline via the file accounting system.
After making these changes, the failure rate and manual intervention to reprocess the data went down massively.
We have seen in the market that most engineering teams dealing with large amount of data takes the first approach of “Start processing after all data has arrived” due to availability of work flow management frameworks like Oozie. However, as organization grows and the problems become complex, teams start realizing the pitfalls and initiate migration towards the second approach of “Processing The Data As It Comes”
Reflecting back on our experience, there are pros and cons of each approach depending on the state of the organization, platform architecture, and business needs:
1. The first approach works well if the platform has fixed number of servers generating data. If the platform auto-scales on elastic computing infrastructure such as AWS, the first approach might not work reliably.
2. If the latency in shipping data to a central repository is predictable, the first approach can work. But if the latency is not predictable, you will need far more sophisticated file accounting system and infrastructure to handle the data that comes with variable pattern.
3. The second approach works well if the business use case demands processed, aggregated data as soon as possible. For non-critical reporting system, the first approach might be sufficient.
4. At RevX, there are multiple parallel data processing pipelines. This means data is processed and aggregated into multiple destination DB simultaneously. In order to reduce discrepancy in aggregated data across pipelines, a more robust system is necessary that can process the data as soon as it comes and gracefully process delayed data.
5. The first approach requires a reliable monitoring and re-trial system. The second approach mandates building a reliable accounting system at file level which makes the overall data processing more predictable.
6. The first pattern works really well for workflows where we have multiple pipeline of jobs and output of one is input for other. At RevX, we use this pattern for such workflows but avoid running these at low frequency intervals like every hour and instead prefer a longer interval like daily frequencies.
Big data processing is a technically complex field despite the availability of systems like Hadoop, Oozie etc. The architecture that one would need to build to sequence, process, and aggregate data would largely depend on how data is being generated (fixed vs. variable), level of discrepancy acceptable in the business, and real-time nature of the processing pipeline.