Develop an SAP RFC interface on Apache SeaTunnel(Incubating), breaking through the last barrier of data collection within the enterprise

Apache SeaTunnel
7 min readSep 9, 2022

--

For both Party A and Party B, when we collect data for data warehouse model construction, once the enterprise’s ERP is switched to the SAP system, it will encounter challenges in security, technical thresholds, and product barriers.

The security challenge is that the traditional data warehouse mode is offline access to SAP HANA. For multi-group companies, security issues such as data permissions and isolation are involved. Generally, large group companies are unlikely to open the HANA database for access. At the same time, their SAP’s business table logic is also more complex;

The technical threshold lies in that we need to have Java development engineers to deal with each data table that required for an interface, and the transmission speed of the interface is also very slow, which is only suitable for small batches of data access;

The product barrier that stands there is SAP’s closed-loop management allows only the purchase of SAP’s BW products to achieve rapid access to overall data and model construction. This model is more suitable for “ALL IN SAP” enterprises, where all data processing and analysis are Closed-loop development management based on SAP products. But the drawbacks of this pattern are obvious. Once there are products separated from SAP, the cost of the data team and operation and maintenance will double, which hinders the goal of reducing costs and increasing efficiency.

The various business systems within the enterprise appear to be extremely complex, especially for various ERP systems, business middle-office systems, online platform systems, privatized deployments, and SaaS models, which require a unified tool to collect and access various data sources. Kettle and DATAX were the mainstream ones in China in the past few years; however, these offline-processing tools turn to be time-consuming and labor-intensive for real-time data access or even incapable of this purpose.

After we tried on mainstream open source products in the market to solve the above complex scenarios, we found Apache SeaTunnel and implemented offline data access, real-time data (Kafka) access, and data in the steps from simple to complex access on it. The connection between the Hadoop ecosystem and Clickhouse was established. After verifying the above stability and high speed, we decided to develop the SAP RFC interface based on Apache SeaTunnel, which completely and completely broke through the last barrier of data collection within the enterprise.

First, we developed the BaseStaticInput plugin, which is an abstract class, we just need to inherit and implement it.

lass SapRfcInput extends BaseStaticInput{  var config = ConfigFactory.empty()
override def setConfig(config: Config): Unit = { this.config = config }
override def getConfig(): Config = config
override def checkConfig(): (Boolean, String) = { }
override def getDataset(spark: SparkSession): Dataset[Row] = { }}

The key point is to implement the getDataSet function, whose return value is Dataset[Row].

How can I get Dataset[Row]? Either directly build it by seq or list-like data, or by RDD.

If we build it by data, memory overflow will occur when the amount of data is too large. This method is suitable when the amount of data is small. When the data volume is large, you need a lazy way to obtain data, and you have to implement your own RDD.

class SapRfcRDD(sc: SparkContext, config: Config) extends RDD[Row](sc, Nil) with Logging{
override def compute(split: Partition, context: TaskContext): Iterator[Row] = new Iterator[Row] { override def hasNext: Boolean = { }
override def next(): Row = {
} }
override protected def getPartitions: Array[Partition] = {
}}

To build the SapRfcRDD function, we add a parameter config ourselves, The reason why built it will be explained below.

As the name suggests, compute is used to calculate data, and the return value is an iterator, indicating that it is a lazy way to obtain data.

Before implementing an iterator, you first need to implement the hasNext and next methods, hasNext is used to identify whether there are data left, and next is used to generate data.

getPartitions is used to obtain partition information.

How do we implement these two functions? We should combine it with how to obtain the data of the SAP RFC interface.

We generally follow these key steps to obtain SAP RFC interface data:

# 根据相关RFC接口信息获得JcoTablegetJcoTable(config: Config): JCoTable
# 获得数据行数table.getNumRows
# 设置行table.setRow(curIndex)
# 根据字段名取数据val data = columns.map(column => { table.getString(column)})

The hasNext method in compute is related to table.getNumRows, and the next method is related to the table.setRow method. Then we have to get the JcoTable object, which is related to the second parameter config of the constructor of SapRfcRDD mentioned above. Up, through the config, we can get the JcoTable object.

So why not inject the JcoTable directly through the constructor parameter? This involves that RDD is a distributed data set, which will be serialized and passed between nodes. The parameters of the SapRfcRDD constructor must be safe to serialize, but JcoTable serialization will cause memory overflow. Of course, whether the overflow is and The size of the data associated with JcoTable is related.

So what does getPartitions do? It seems that it is okay to not need it. If you just want to divide the data into a partition, getPartitons is useless.

But if you want to divide the data into multiple partitions to speed up its processing, the implementation of getPartitions is important.

And pay special attention to the split parameter of compute, which is one of the partitions returned by getPartitions. The implementation of hasNext and next of compute is closely related to it.

trait Partition extends scala.AnyRef with scala.Serializable {  def index : scala.Intoverride def hashCode() : scala.Int = { /* compiled code */ }override def equals(other : scala.Any) : scala.Boolean = { /* compiled code */ }}
class RowPartition(idx: Int, val start: Int, val end: Int) extends Partition {override def index: Int = idx
override def toString: String = s"RowPartition{index: ${idx}, start: ${start}, end: ${end}}"}

The return value of getPartitions is Array[Partitions], Partition is an interface, which is very simple to implement.

We added two parameters-start and end to the RowPartiton constructor, that is, the number of starting rows and the number of ending rows of JcoTable, with left closed and right opened.

For example, the interface has 2000 rows of data, we divide it into two partitions, similar to RowPartition{index: 0, start: 0, end: 1000}, RowPartition{index: 0, start: 1000, end: 2000} .

In compute, split is an instance of RowPartiton. We can easily implement hasNext and next through the start and end of split.

override def compute(split: Partition, context: TaskContext): Iterator[Row] = new Iterator[Row] {val columns = config.getStringList(config.getString("table")).asScalaval rowPartition: RowPartition = split.asInstanceOf[RowPartition]val table = SapRfc.getJcoTable(config)val tableRowData = new TableRowData(columns, table, rowPartition.index, rowPartition.start, rowPartition.end)    println(tableRowData)override def hasNext: Boolean = {      tableRowData.hasNext()    }override def next(): Row = tableRowData.next()  }class TableRowData(val columns: Seq[String], val table: JCoTable, val partitionId: Int, val start: Int, val end: Int) {var curIndex = start  def hasNext(): Boolean = {    curIndex < end  }  def next(): Row = {    table.setRow(curIndex)val data = columns.map(column => {      table.getString(column)    })    curIndex += 1    Row.fromSeq(data)  }override def toString: String = s"TableRowData{partitionId: ${partitionId}, start: ${start}, end: ${end}, columns: ${columns} }"}

After implementing SapRfcRDD, it is very easy to implement getDataSet.

Finally, we realized the data access of the SAP RFC interface, including 2 modes ASHOST and MSHOST (Note: The MSHOST string is useful since it will give you failover capabilities in the process server connection. Also, it can load balance the CPS connections ( not the jobs, they are load balanced based on other metrics) to your remote system), which greatly simplifies the collection time of SAP data, and the current interface and one configuration are realized by the development of one interface in the original java mode, with input Example:

input {    org.interestinglab.waterdrop.input.SapRfc {        jco.client.mshost = "XXXXXX"        jco.client.r3name = "XXX"        jco.client.client = "XXX"        jco.client.user = "XXX"        jco.client.passwd = "XXX"        jco.client.lang = "ZH"        jco.client.group="PUBLIC"
function = "FUNXXX" params = ["IV_DDATE", ""${rfc_date}""] table = "TTXXX" TTXXX= ["col1","col2","col3"]
partition = 4 result_table_name = "res_tt" }}
input { org.interestinglab.waterdrop.input.SapRfc { jco.client.ashost = "XXXX" jco.client.sysnr = "XX" jco.client.client = "XX" jco.client.user = "XX" jco.client.passwd = "XXX" jco.client.lang = "ZH"
function = "FUNXXX" params = ["DDATE", ""${rfc_date}""] table = "TABLEXXXX" TABLEXXXX = ["col1","col2","col3"]
partition = 4 result_table_name = "res_tt"
}}

The parameter configuration consists of three parts, the first part is the access information of the port, the second part is the function inside sap and the transfer parameters, table name, and table fields, and the third part is the partition number configuration of spark.

Through the above configuration, we obtained about 600,000 sap data (limited sap control conditions can only be queried daily). It only takes 2 minutes from job startup to data insertion into Hive, and the entire SAP data access development time is changed from the original The day is shortened to the Hour level (including parameter configuration and basic verification).

Author bio: Han Shanfeng / Huangfu Xinyi
Big data development engineer of Gold Hongye Paper Group

He is an expert in big data platform construction, data warehouse, data model construction, and data visualization, and have a deep understanding of common data integration frameworks and engines in the market.

About Apache SeaTunnel

--

--

Apache SeaTunnel
Apache SeaTunnel

Written by Apache SeaTunnel

The next-generation high-performance, distributed, massive data integration tool.

No responses yet