A robust and real-time data collection and processing system design to enable data-driven decision making across the board. The article covers the approach to collecting, sanitizing and processing data real-time from various IoT sensors and generating necessary events.
Being data-driven is a core leadership principle at RIVIGO. We believe what gets measured, gets done. In order to enable data-driven decision making across the board, having a robust data collection and processing system is baseline.
Vehicles are one of the most critical data generation sources for us, through which several critical decisions and optimizations are enabled. Our trucks are implanted with multiple IoT sensors at the time of onboarding. Our pilots (truck drivers) too, carry a smart application on their phones. Millions of data signals are recorded every day from various such IoT sensors like GPS, fuel, temperature, gyro, vitals etc. Chronos and Athena layers are designed to process this data real-time ensuring high availability, live data correction, exception handling and regular archiving. This data pipeline is also flexible enough to handle any new future data streams, as may be needed.
RIVIGO data pipeline consists of two major layers, Chronos and Athena:
Figure 1: System Architecture Diagram
Chronos – Data Collection Layer
Sensor vendors work on different API models. Some use a push model, whereas others provide a pull mechanism to fetch data from various sensors. Chronos provides both the interfaces, ensuring compatibility with all industry vendors. Alongside, GPS and gyro data from the Pilot app works on a queuing model. As this is our own application, the Pilot app pushes events to Kafka queue, which are later consumed by Chronos. Queuing model is the most reliable way to handle these events as there is no deployment or any other inherent downtime of the queues.
We use three storage mechanisms for this raw data. After formatting, cleaning and sanity, high frequency data (every second) is parked in S3 through Kafka streams while low frequency data (every minute) is stored in MongoDB. Most recent data for each source is stored in Redis (cache) for fast retrieval.
Athena – Data Aggregation / Event Generation Layer
This layer consumes vehicle movement raw data via Kafka and generates events like node in-out, vehicle stoppage etc. These events are further consumed by various microservices as per need. Athena also consolidates this raw data into meaningful data legs of contiguous data type, i.e., running/stopped.
Let’s look at the picture below. It shows the speed & fuel of a vehicle with time.
Figure 2: GPS Data Stream
These contiguous legs of type Running and Stopped carry consolidated and meaningful information about a vehicle or pilot’s movement. It helps other microservices like vehicle tracking or data science products deal with lesser and more consolidated information as compared to big raw data.
Being the base for other products, this pipeline needed to be highly available. Other applications/microservices consume data via REST APIs exposed by application servers. To ensure availability of these servers, we implemented the following things:
In our context, there are some scenarios where getting accurate data can be a challenge. For example, in the absence of accurate mobile GPS, sensors may throw a random GPS point or in case of a vehicle that starts to move suddenly, fuel sensors may show a fluctuating reading due to random motion of the liquid. To remove these anomalies, we compare a set of past data points with current data points to figure out the type of anomaly and further process it through either noise removal or smoothening of data.
Figure 3: Data correction
Sometimes sensors can start to send out completely random data as well. This is generally a case of sensor failure, wherein we see a huge gap in the median value of past data vs current data. In such a case where we detect unexpected variance for a long enough duration, we generate alerts to the corresponding team.
Deduplication: Another potential anomaly is not receiving the data in FIFO order or receiving multiple events of the same motion. By keeping the time and sensor ID as a key, it is easy to avoid duplicate events. Breaking FIFO, however, does create some challenges for us.
Late Data Handling: For instance, we calculate distance between any two consecutive points using the haversine distance formula. Whenever a late entry is received, we modify all the processed data till that point. Let’s consider an example to illustrate this.
Figure 4: Distance Calculation
In figure 4, P2 arrived late. Before its arrival, the distance between P1 and P3 was calculated using the haversine distance formula. However, once P2 is received, we can generate more accurate data with higher precision. Now d13 becomes d12 + d23. The following picture demonstrates this in more depth.
Figure 5: Late Entries Data Correction
In figure 5, the calculated distance changes when the late entry is received. Note that distance is only one of the factors that has been shown here for illustration. There are many more variables which are modified in such cases to get more accurate data and decisions.
When combined, all the sensors in our vehicles and the pilot app generate around 400 GB data every day. We generate events from raw data which are used for all data science applications. Hence this raw data, gets archived in yearly/monthly buckets, as may be needed. We use AWS Athena to query this data.
Since the data is huge, we compress it before archiving. Majority of this is high frequency data which is generated every second. We compress it to hour-level retrievable format which saves around 70-75% space.
Chronos gets around 30 million requests every day and almost every application in our suite is dependent on this data. Therefore, it becomes very important to take care of latency. To do this, we store the most recent data of pilots and vehicles in cache for faster retrieval. With real-time and batch processing, we also aggregate raw data to avoid any unnecessary delay in APIs.
This data collection and processing system is the foundation of several analytics and machine learning applications at Rivigo. As the business scales and more vehicles, pilots and thereby, sensors are added, scalability of this system will be critical. Our data pipeline has been developed keeping in mind its future scalability and can already handle 10x more load on the systems.