Predicting the Life Journey of Consignments through Advanced Data Continuum’s (Part 3)

13 September 2018
Technology

Technical aspects of the data platform architecture that has been deployed to precisely predict every life event of a consignment through intelligent algorithms, analytics and machine learning. The third part of this four-part article covers how we power decision making ahead of time given certain cues using time wheels.

 

Introduction

In the first part of this series, we had covered the overall architecture and details of the data injection mechanism that helps us accurately predict every event in the life of a consignment. In the second part, we gave an overview of the data warehousing techniques and OLAP that power various analytical dashboards. Please go through the previous articles for overall context and background.

In both Part 1 and 2, we have talked about how events that have happened in the past can be visualized and decision making is done in retrospect. In this part of the series, we will go through the technical aspects of the problem where decision making needs to be done ahead of time given certain cues (past lifecycle events that have happened to the consignment). In other words, we will cover delayed processing of events either after user-triggered action or after scheduled timeout.

 

Time wheels for delayed execution

Time wheels were first introduced in a paper by Verghese and Luack in 1987. They studied several approaches for efficient management of timers and explained Hashed and Hierarchical time wheels in this paper.

Timers are composed of two user facing operations (‘start’ and ‘stop’) and two internal operations (‘per tick book keeping’ and ‘expiry processing’). In our use case, we used the HashedWheelTimer provided by Netty and persisted/reconciled using TimescaleDB and Redis ZSETs. This is explained in a later section on workflows in this article. This data structure can be used in all the timer cases where slight delay (say, milliseconds) is tolerable between the timeout happening and the call-backs received. The HashedTimerWheel is explained in the following paragraph.

A timer wheel is a circular list of buckets of timer tasks. A timer wheel of size *n* has *n* buckets and can hold *t* time units where *t* is the minimum time resolution on which timer can provide the timer advances. Hence, each bucket in the time wheel holds the timer tasks falling in corresponding time buckets and the capacity of time wheel would be *n x t* time interval, i.e., the first bucket holds all timer tasks in the range [0, t), the next bucket holds [t, 2t) and so on. The ith bucket would thus hold timers tasks in the range [t * (i -1), t * i). The time is advanced in a daemon thread and appropriate time feedback is injected into the time wheel based on the clock interrupt. On each periodic movement of time *t*, the timer advances to next bucket and expires all the payloads present the that bucket and corresponding call-backs are fired on all the expired tasks. If a timer task of past duration is received, it is immediately expired and call-back triggered. The emptied time bucket is then available for the next round. Hence if the current bucket is for the time [t1], it becomes the bucket for [t1 + t * i, t1 + (i + 1) * t) after a tick.

If we need to support a timeout that is greater than *n x t* in hashed time wheels, then we bookkeep the same using the number of rounds the timer would make before timing out the tasks in bucket. For example, if two rounds are required before the call-back has to be triggered, we would have payloads as {0, list of timers}, {1, list of timers}, {2, List of timers}. Every time the timer tick visits a bucket, it triggers timeout of all the timers associated with 0 and decrements other counts by 1 to hold larger number of timer tasks in the time bucket. A timing wheel has O(1) cost for insert/delete (start-timer/stop-timer) and supports random delete.

An improvement over hashed time wheels is hierarchical time wheels, where more performance or scalability is desired.

 

Workflow

A lot of ground pertinent to changelog capture is discussed in the first part of the series. We use Kafka streams as choice of streaming framework for processing data streams.

The modules are divided into four workflows. These have been summarized in the diagram below and described in detail in subsequent paragraphs.

The first workflow is employed when a metric has to be generated by triggers that are generated by human actions. Let’s understand this with an example. When the system is notified about the loading of a consignment at origin, it may trigger actions such as sending real-time notifications to consignor (and partner) about the loaded consignment. It also updates in real-time, the cardinality of consignments loaded in a trip which can be aggregated and used to power dashboards. In our toolkit, this is accomplished using the Kafka streams framework.

The second workflow is slightly more complex. This is because we are dealing with two persistence layers (timescaleDB and redis) which can be out of sync of each other due to unavoidable mishaps in the network, code issues etc. Since we do not need very strong consistency guarantees in the use case between both the systems (i.e. eventual consistency) and to avoid two-phase locking, we came up with the second workflow. Here, the source of truth is timescaleDB and all signals get registered there. In case of signals requiring time wheel, they are synced to redis ZSETs and the call-backs scheduled. If the two systems get out of sync, getting them back in sync is just a scheduled job. This job runs a SQL query to find all the timers that need to be scheduled, checks their existence through redis and performs appropriate bookkeeping actions to restore sync.

This workflow is employed in scenarios where we need time-bound actions performed on an action item and human generated trigger may not be generated. Let’s consider a scenario where a trip has been dispatched. Given the turnaround time of a section, an alert needs to be raised if the vehicle gets delayed so that appropriate triggers/alerts can be enabled on the next destination of the trip. These triggers can affect various factors like workforce management, warehouse space planning etc.

The third workflow is fairly simple. Whenever a timer is breached, we get the call-back and make appropriate changes to certain previously generated signals and persist the new state to timescaleDB.

The fourth workflow enables dashboarding using the Apache superset. This is powered by timescaleDB and helps visualize certain breached thresholds in the past to take actions ahead of time.

Given below is a sample dashboard.

 

Rationale for timescaleDB

Following are the primary reasons why we’ve gone ahead with timescaleDB.

  • It supports GIS query using POSTgis
  • It is Built atop postgresql, which is excellent for aggregates and OLAP queries
  • It facilitates automatic 2-D sharing of time series data out of the box
  • It enables easy modelling of mutable time series data
  • It is directly integratable with Apache superset via SQL for interactive visualizations
  • It requires support for aggregates over three months of data only and eliminates the need for horizontal clustering given our data sizes
  • It is easy to draw insights for non-technical team members as well as it is SQL

 

Conclusion

In this post, we covered how we power decision making ahead of time given certain cues using time wheels. The workflows described in this section take us another step closer to build intelligent systems that can accurately predict events in the life journey of a consignment. In the upcoming final part of this series, we will conclude with an overview of our timeseries analysis.