RIVIGO Reporting Platform

8 August 2019
Architecture

This article talks about how the engineers at RIVIGO Labs built a serverless, cost-effective reporting platform that helps convert data into valuable insights.


 

Is Data the new Gold?

 

Yes, it is, but gold has to pass through fire to realize its value. Similarly, data must be refined and converted into valuable insights to qualify for the tag of “New Gold”.

Rivigo, at its core, is a data-driven organization and we generate lots of data. It’s important for us to convert this data into gold to drive  business decisions. Here is how we have built end to end reporting platform to convert our data into gold. The reporting platform is built completely on AWS a mostly using server-less architecture to minimize costs, and we call it lean reporting.

 

Services needed for Reporting Platform

 

Following is the overall architecture of the reporting platform we built. We will talk about each service in detail in the subsequent section.

 

Ingestion Service

The first step is to bring all the data to the data lake. We have built our data lake on Amazon S3 Storage. We needed one service which supports different sources and sinks to load data for batch processing. Hence we built an inhouse service called “Vaayu” which is highly configurable in-terms of a source of data. It can be configured as follows:

source {
  type = "jdbc"
  url = "jdbc:mysql://server-name.com:3306/dbname"
  dbtable = "dbname.tablename"
  ssm-path = "/test/db/prod/read/"
  aws-region = "ap-south-1"
  user = "???"
  password = "???"
  fetchsize = "20000"
}
sink {
  type = "s3"
  bucket = "bucket-name"
  folder = "ingestion/vaayu/dbname_tablename/"
  format = "parquet"
  save_mode = "error"
}
spark {
  app.name = "ingestion"
}
schema {
  epoch_cols = ["last_updated_date", "created_date"]
  numeric_cols = ["latitude", "longitude", "rating"]
  bool_cols = [is_active]
}

This service runs on EMR and ingests data from different sources and dumps data in time/date portioned directories on S3. We use parquet as the file format which is a columnar storage format. Parquet enables our downstream job to optimize the read due to columnar nature. There are other options like ORC which is equally good and well supported.

 

ETL Jobs

Every data source we ingest from “Vaayu” needs to be enriched and annotated with other data sources before it can become really useful.

We have chosen spark for ETL processing on the data. We built a multi-module project which supports writing ETL either in scala or python as spark support both programming interfaces. All common functions like time/partitions handling, spark configuration, job config parsing, and common annotations and computations are provided by the framework so any new ETL job for a report can be quickly added.

Each job has its own configuration file which defines all job level configs.

Jobs can run either in live mode (Processing current data) Or replay mode (Processing some old-time range data).

Example: Command to run a job in live mode:

spark-submit
--deploy-mode cluster
--master yarn
--class com.rivigo.dsetl.app.MainDriver
<path/to/dsetl/jar>
--app-name <application_name>
--day-lag 1

Example: Command to run a job in replay mode:

spark-submit
--deploy-mode cluster
--master yarn
--class com.rivigo.dsetl.app.MainDriver
<path/to/dsetl/jar>
--app-name <application_name>
--start-time <2019-07-08>--end-time <2019-07-08>

Debugging spark jobs on EMR is not as intutive as it is on other data platforms, thats where this blog has really helped us.  Once proxy setting is done as mentioned in blog, spark history server UI and yarn resource manager UI can be accessed for debugging and performance optmization  of spark jobs.

Meta Store

Meta store is needed to maintain metadata about databases and table definitions in a central catalog. Processed data as the output of ETL has to be made available for a query using SQL like interfaces. For that, we need to store metadata about processed data like a path of files on S3, a schema of tables, the format of data (Parquet) here, etc and corresponding database name and table name mapping. Hive is most commonly used for this purpose. It provides a hive meta store as well as HQL. We have used Data catalog provided with AWS Service Glue. Registering data into Glue can be done via either Glue crawler or using Glue Catalog API. We are using API as we have integrated API as part of tejaspy for Serverless Feature Store details available here.

 

Query Engine

Once the data is made available in Glue catalog it can be queried using Athena service. It is completely serverless. If used effectively it gives significant cost benefits.

Practically in every business data schema will be changing over time and that’s one of benefit we get out of using parquet with Athena. It supports schema evolution. Its schema on read and works well with parquet.

There are some restrictions on kind of schema updates Athena can support hence these have to be carefully planned. Details of those can be found here

Our data partition strategy is designed based on a potential query pattern. Using the column on which mostly data is queried as partition key can save lots of Athena cost. General documents on how to optimize Athena query can be found here.

 

Visualization

We have explored multiple open source/cloud visualisation tools and picked zeppelin and superset for our platform needs. Both are open source and we have configured them to work with Athena. For superset, there is a driver PyAthenaJDBC which can be used to interact with Athena database and tables. URI to register Athena database in superset will be like this

awsathena+jdbc://athena.ap-south-1.amazonaws.com/<dbname>? AwsCredentialsProviderClass=com.simba.athena.amazonaws.auth.profile.ProfileCredentialsProvider&s3_staging_dir=s3://<bucket name>/ athena/superset

Similarly for zeppelin AWS provided JDBC driver can be used to interact with Athena

jdbc:awsathena://athena.ap-south-1.amazonaws.com:443;S3OutputLocation=s3://bucketname/<output_stg_dir>;Schema=<dbanem>;AwsCredentialsProviderClass=com.simba.athena.amazonaws.auth.profile.ProfileCredentialsProvider;AwsCredentialsProviderArguments="<Instance Profile Name of Zeppelin Instance"

Superset is a useful tool for a Business Analyst and provides good features for reporting and exploratory analysis, a Data Scientist can be more comfortable using zeppelin which gives a notebook-like interface and supports all big data backends. With zeppelin best part is data explored via Athena interpreter can be used in python or spark-shell interpreter using  Zeppelin Context.

Both superset and zeppelin are hosted and managed by us on EC2 instances.

 

Scheduler

To maintain the workflow of these multiple jobs we have used airflow. Airflow supports multiple operators for AWS which can be leveraged to schedule workflow and apply sensors to trigger dependent jobs.

We have hosted airflow on EC2 instance, it really thin server backed by RDS.

 

Cost

We have scheduled ingestion and analytical jobs on EMR which are triggered from airflow instance. We use EMR triggers to create cluster before a job starts and use “EMR steps” when we need to run multiple jobs on the same cluster.

Cost is controlled by using below configurations in “AWS EMR create-cluster” command

 --scale-down-behavior TERMINATE_AT_TASK_COMPLETION \
 --auto-terminate

Detailed EMR pricing is available here.

Athena charges are $5 per TB data scan and our partitioning strategy and columnar format have helped us a lot in minimising cost.

As we are using Glue Catalog via API, not crawler, cost of Glue is $1 per 100K objects stored (First 1 Million Objects are free).

Data stored on S3 is charged $0.025/GB. For example, CSV file of size 1.6 GB will be ~ 200 MB in parquet. So the monthly cost of storage is 8 times less and Athena queries run time will be very less with columnar along with reduced data scan cost. If we keep CSV in gzip approx. size of that will still be twice of size in parquet with no Athena query optimisations possible.

Hence this reporting architecture is quite lean in terms of cost, maintenance and adding new reports.

 

Saswata Dutta, Jishant Singh, Jyoti Arora, Chirag Maheshwari and Arun Singh have also contributed to this article.