Using Amazon EMR Serverless, Athena, andApache DolphinScheduler to build an on-cloud and off-cloud data synchronization solution

Apache SeaTunnel
9 min readJun 3, 2024

--

Introduction

In the data-driven landscape of today’s enterprises, the need for reliable and high-performance solutions to manage the ever-growing data demands is becoming increasingly crucial. This blog series offers insights from the perspective of a B2C fintech client that values data security and compliance. It explores how to leverage Amazon Web Services’ cloud-native services, open-source community products, and third-party tools to construct a decoupled serverless data warehouse in a hybrid deployment scenario.

Amazon EMR (Elastic MapReduce) Serverless is an all-managed serverless big data processing service launched by AWS. Based on computational engines like Apache Spark and Apache Hive, it offers a compute-storage separation architecture that enhances performance while maintaining architectural elasticity.

Apache DolphinScheduler provides a versatile workflow scheduler that operates decoupled from EMR clusters, ensuring efficient and reliable data orchestration and processing. Additionally, Amazon Athena allows clients to perform ad-hoc queries and analyze vast datasets using standard SQL without the complexities of infrastructure management. Open integration testing implemented via the AWS console facilitates seamless integration and validation of these components, significantly accelerating engineers’ productivity. For fintech clients, EMR Serverless offers LOB (Line of Business) level detailed resource consumption analytics for precise monitoring and cost optimization. This feature is particularly valuable in the finance sector where operational agility and cost efficiency are paramount. Given the high stakes of data security and compliance in B2C fintech, the client has adopted a hybrid architecture. Sensitive data is stored locally while benefiting from the scalability and flexibility of cloud computing.

This article focuses on the design of cloud and on-premises data synchronization solutions.

Architecture Design

Data security and compliance are paramount for fintech clients. In the specific case discussed in this blog, business data is stored locally on TiDB while user behavior data is collected through Sensors Data suite and stored on local HDFS. TiDB is a global partner of AWS, and its product services on AWS can be accessed here AWS TiDB link. Sensors Data is a partner in AWS’s Greater China region, accessible here AWS Sensors Data link.

These local data sources are connected to the AWS region via AWS Direct Connect. In the AWS environment, data travels through Interface Endpoint for S3 and AWS PrivateLink before reaching S3 buckets (e.g., bucket named ODS for demonstration). Interface endpoints are registered and managed by DNS resolvers hosted through Amazon Route53.

Data is then processed by Amazon EMR Serverless Job (Hive or Spark jobs) to achieve data warehouse layering logic. Different layers of data are stored under separate or the same S3 buckets but under different S3 prefixes. This data architecture is managed through the Glue Data Catalog and is queryable via the Amazon Athena console.

Third-party BI tools integrate further with Amazon Athena through JDBC to enable data visualization and generate reports meeting diverse business needs, including regulatory requirements.

EMR Serverless Jobs are orchestrated by an Apache DolphinScheduler cluster deployed on three EC2 instances in a clustered manner.

The DolphinScheduler cluster and its orchestrated EMR jobs are deployed decoupled, ensuring high reliability of the system: a failure in one (EMR job or scheduler) does not affect the other (scheduler or EMR job).

Cloud and On-Premises Data Synchronization Solution

From a networking infrastructure perspective, AWS Direct Connect is used to establish connectivity between the client’s on-premises and the AWS region. In the AWS environment, data flows through Interface Endpoint for S3 and AWS PrivateLink before accessing the S3 bucket (ODS for demonstration). For more detailed architecture, operational mechanisms, and deployment guides, refer to the AWS PrivateLink Interface Endpoints documentation.

From a data transfer perspective, a software-level bi-directional data synchronization solution was designed, including three sub-scenarios:

  1. Bulk data from on-premises to AWS region.
  2. Incremental data from on-premises to AWS region.
  3. Data reverse synchronization from AWS region back to on-premises.

Each scenario has specific requirements:

  1. Data synchronization solution should work where the source is TiDB, HDFS, and the target is AWS S3.
  2. Requires a data integrity check mechanism to ensure consistent data synchronization.

Table 1 describes the specific solutions that meet each sub-scenario’s requirements.

Table 1: Cloud and On-Premises Data Synchronization Solution Design

Specific solutions explained:

Bulk Data Synchronization

Using TiDB Dumpling to Sync Data from TiDB to AWS S3

To implement local TiDB data synchronization to AWS S3, refer to the Export Data to Amazon S3 Cloud Storage guide. By executing the following command, data stored in TiDB can be dumped as CSV files and stored in AWS S3 buckets.

./dumpling -u root -P 4000 -h 127.0.0.1 -r 200000 -o "s3://${Bucket}/${Folder}" –filetype csv

Existing Data Synchronization

Synchronize data from local HDFS to Amazon S3 using Amazon DataSync

The Amazon DataSync agent should be installed on the customer’s local server. When connected to a Hadoop cluster, the Amazon DataSync agent acts as an HDFS client, communicates with the primary NameNode in the Hadoop cluster, and then copies file data from the DataNode. You can get this operation guide to synchronize data from Hadoop HDFS to Amazon S3 through Amazon DataSync.

Incremental Data Synchronization
Using TiDB Dumpling and Self-managed Checkpoints

In order to achieve incremental data synchronization through the TiDB Dumpling tool, you need to manage the checkpoints of the target synchronization data yourself. A recommended approach is to store the id of the last ingested record in a specific medium (such as ElastiCache for Redis, DynamoDB) to achieve self-managed checkpoints when executing the shell/python job that triggers TiDB Dumpling.

Of course, the premise of implementing this solution is that the target table has a monotonically increasing id field as the primary key.

Filter the exported data to get the specific TiDB Dumpling command. The sample command is shown below.

./dumpling -u root -P 4000 -h 127.0.0.1 -o /tmp/test --where "id < 100"

Incremental Data Synchronization

Using TiDB CDC Connector from TiDB to Amazon Web Services S3

The benefit of using TiDB CDC Connector to implement incremental data synchronization from TiDB to Amazon Web Services S3 is that there is a native CDC mechanism, and the performance is fast because the backend engine is Flink. However, there is a tricky point or trade-off in this approach: quite a few Flink tables need to be created to map the ODS tables on Amazon Web Services.

This TiDB CDC Connector operation guide is available through Tidb CDC.

Incremental Data Synchronization

Using EMR Serverless Job to Reversely Synchronize Data from Glue Catalog Table to TiDB Table

Most data flows from the customer’s local to Amazon Web Services. However, there are scenarios where data flows from Amazon Web Services to the customer’s local according to the needs of specific businesses.

Once the data lands on Amazon Web Services, it will be packaged/managed through the Glue Data Catalog through Athena tables created with specific table structures. The table DDL script is as follows:

CREATE EXTERNAL TABLE IF NOT EXISTS `table_name`(
`id` string,
……
`created_at` string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
LOCATION 's3://bucket_name/prefix_name/';

In such cases, an EMR Serverless Spark Job can accomplish the task of syncing data from the AWS Glue table back to the customer’s local table.

If the Spark job is written in Scala, the example code is as follows:

package com.example
import org.apache.spark.sql.{DataFrame, SparkSession}

object Main {

def main(args: Array[String]): Unit = {

val spark = SparkSession.builder()
.appName("<app name>")
.enableHiveSupport()
.getOrCreate()

spark.sql("show databases").show()
spark.sql("use default")
var df=spark.sql("select * from <glue table name>")

df.write
.format("jdbc")
.option("driver","com.mysql.cj.jdbc.Driver")
.option("url", "jdbc:mysql://tidbcloud_endpoint:4000/namespace")
.option("dbtable", "table_name")
.option("user", "use_name")
.option("password", "password_string")
.save()

spark.close()
}

}

After packaging the Scala code into a jar file using SBT, the job can be submitted to the EMR Serverless engine with the following AWS CLI command:

export applicationId=00fev6mdk***

export job_role_arn=arn:aws:iam::<aws account id>:role/emr-serverless-job-role

aws emr-serverless start-job-run \
--application-id $applicationId \
--execution-role-arn $job_role_arn \
--job-driver '{
"sparkSubmit": {
"entryPoint": "s3://spark-sql-test-nov23rd/scripts/dec13-1/scala-glue_2.13-1.0.1.jar",
"sparkSubmitParameters": "--conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory --conf spark.driver.cores=1 --conf spark.driver.memory=3g --conf spark.executor.cores=4 --conf spark.executor.memory=3g --jars s3://spark-sql-test-nov23rd/mysql-connector-j-8.2.0.jar"
}
}'

If the Spark job is written in Pyspark, the example code is as follows:

import os
import sys
import pyspark.sql.functions as F
from pyspark.sql import SparkSession

if __name__ == "__main__":

spark = SparkSession\
.builder\
.appName("app1")\
.enableHiveSupport()\
.getOrCreate()

df=spark.sql(f"select * from {str(sys.argv[1])}")

df.write.format("jdbc").options(
driver="com.mysql.cj.jdbc.Driver",
url="jdbc:mysql://tidbcloud_endpoint:4000/namespace ",
dbtable="table_name",
user="use_name",
password="password_string").save()

spark.stop()

The job can be submitted to the EMR Serverless engine with the following AWS CLI command:

export applicationId=00fev6mdk***

export job_role_arn=arn:aws:iam::<aws account id>:role/emr-serverless-job-role

aws emr-serverless start-job-run \
--application-id $applicationId \
--execution-role-arn $job_role_arn \
--job-driver '{
"sparkSubmit": {
"entryPoint": "s3://spark-sql-test-nov23rd/scripts/dec13-1/testpython.py",
"entryPointArguments": ["testspark"],
"sparkSubmitParameters": "--conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory --conf spark.driver.cores=1 --conf spark.driver.memory=3g --conf spark.executor.cores=4 --conf spark.executor.memory=3g --jars s3://spark-sql-test-nov23rd/mysql-connector-j-8.2.0.jar"
}
}'

The above Pyspark code and AWS CLI command also implement external parameter passing: transmitting the table name into the SQL statement during job submission.

Custom-developed Data Integrity Check

A complete data integrity verification is implemented by creating a verification database on the source database, selecting non-null unique fields to calculate the verification value and row count, and calculating the verification value and row count on the target database using the same fields from the source database to compare the verification values and row counts between the source and target databases.

If the verification result is inconsistent, then manual comparison and adjustment are required. This verification method presupposes that both the source and target databases are relational databases. In this article, the data synchronization from TiDB to AWS S3, the target is object storage, not a database.

Therefore, there are some trade-offs in terms of data integrity. In practice, we use the comparison of the total number of columns, the total number of rows, and the column names in the target dataset to achieve this.

AWS DataSync Data Integrity

DataSync utilizes an AWS-designed storage-agnostic transmission protocol that performs real-time checksum verification and validation during data movement. Detailed information is available at configure-data-verification-options. In addition to real-time checksum verification and validation, DataSync also supports incremental transfers and inline compression.

DataSync manages the transfer process, so users do not need to write and optimize their own copy scripts or deploy and fine-tune commercial data transfer tools. Built-in monitoring ensures the data integrity of moved files and objects, with automatic retry mechanisms to ensure that the contents arriving at the target file storage match the original files.

Conclusion

Fintech customers attach great importance to data security and compliance. To avoid potential risks, the customer involved in this case placed the user’s deposit and withdrawal data and basic user data (collectively referred to as business data) in the IDC, while the user’s behavioral data and desensitized business data were placed in the Amazon Cloud Technology platform.

From the FSI industry customers served by Amazon Web Services around the world, more and more fintech companies choose to store their business data on the Amazon Web Services platform.

Previously, Beagle Open Source has launched its product WhaleStudio (a commercial version developed based on Apache DolphinScheduler and SeaTunnel) on the Amazon Web Services platform. Interested friends can visit and subscribe: https://aws.amazon.com/marketplace/pp/prodview-563cegc47oxwm?sr=0-1&ref_=beagle&applicationId=AWSMPContessa

The cloud platform and services provided by Amazon Web Services to customers have accumulated a wealth of certifications in terms of security and compliance, including overall platform certification, certification for compliance with regulatory laws and regulations in the country/region, industry certification, etc.; at the same time, Amazon Web Services has also developed a wealth of products and services to help customers cope with various needs from the perspective of data security and compliance.

The article comes from the Internet: https://aws.amazon.com/cn/blogs/china/build-a-serverless-data-warehouse-in-a-hybrid-deployment-environment-part-one/

References

--

--

Apache SeaTunnel

The next-generation high-performance, distributed, massive data integration tool.