从kafka转换数据的最简单方法

Simplest way to go about transforming data from kafka

我正在开发一个使用 kafka connect 从多个数据库源中提取数据的项目。然后我希望能够将数据转换为指定的 json 格式,然后最终将最终的 json 推送到 S3 存储桶,最好使用 kafka connect 来降低我的开销。

这是一个示例,说明当前进入 kafka 的数据(以 avro 格式):

{"tableName":"TABLE1","SchemaName{"string":"dbo"},"tableID":1639117030,"columnName":{"string":"DATASET"},"ordinalPosition":{"int":1},"isNullable":{"int":1},"dataType":{"string":"varchar"},"maxLength":{"int":510},"precision":{"int":0},"scale":{"int":0},"isPrimaryKey":{"int":0},"tableSizeKB":{"long":72}}
{"tableName":"dtproperties","SchemaName":{"string":"dbo"},"tableID":1745441292,"columnName":{"string":"id"},"ordinalPosition":{"int":1},"isNullable":{"int":0},"dataType":{"string":"int"},"maxLength":{"int":4},"precision":{"int":10},"scale":{"int":0},"isPrimaryKey":{"int":1},"tableSizeKB":{"long":24}}

转换为 JSON 时看起来像这样:

{
      "tablename" : "AS_LOOKUPS",
      "tableID": 5835333,
      "columnName": "SVALUE",
      "ordinalPosition": 6,
      "isNullable": 1,
      "dataType": "varchar",
      "maxLength": 4000,
      "precision": 0,
      "scale": 0,
      "isPrimaryKey": 0,
      "tableSize": 0,
      "sizeUnit": "GB"
},
{
      "tablename" : "AS_LOOKUPS",
      "tableID": 5835333,
      "columnName": "SORT_ORDER",
      "ordinalPosition": 7,
      "isNullable": 1,
      "dataType": "int",
      "maxLength": 4,
      "precision": 10,
      "scale": 0,
      "isPrimaryKey": 0,
      "tableSize": 0,
      "sizeUnit": "GB"
}

我的目标是让数据看起来像这样:

{
  "header": "Database Inventory",
  "DBName": "DB",
  "ServerName": "server@server.com",
  "SchemaName": "DBE",
  "DB Owner": "Name",
  "DB Guardian" : "Name/Group",
  "ASV" : "ASVC1AUTODWH",
  "ENVCI": "ENVC1AUTODWHORE",
  "Service Owner" : "Name/Group",
  "Business Owner" : "Name/Group",
  "Support Owner" : "Name/Group",
  "Date of Data" : "2017-06-28 12:12:55.000",
  "TABLE_METADATA": {
  "TABLE_SIZE" : "500",
  "UNIT_SIZE" : "GB",
  "TABLE_ID": 117575457,
  "TABLE_NAME": "spt_fallback_db",
  "COLUMN_METADATA": [
  {
    "COLUMN_NM": "xserver_name",
    "DATE_TYPE": "varchar",
    "MAX_LENGTH": 30,
    "PRECISION": 0,
    "SCALE": 0,
    "IS_NULLABLE": 0,
    "PRIMARY_KEY": 0,
    "ORDINAL_POSITION": 1
  },
  {
    "COLUMN_NM": "xdttm_ins",
    "DATE_TYPE": "datetime",
    "MAX_LENGTH": 8,
    "PRECISION": 23,
    "SCALE": 3,
    "IS_NULLABLE": 0,
    "PRIMARY_KEY": 0,
    "ORDINAL_POSITION": 2
  }, ........

header 数据大部分是通用的,但需要填充一些数据,例如日期等。

最初我最初的想法是,我可以利用 kafka connect 做所有事情,而且我可以只为我希望格式化数据的方式创建一个模式。我在使用连接器的不同模式时遇到了问题,我不确定它是否可行。

我想到的另一个解决方案是利用 Kafka Streams,并编写代码将数据转换为需要的数据。我不确定使用 Kafka Streaming 有多容易。

最后,我看到的第三种解决方案是利用 Apache Spark,并使用数据帧处理数据。但这会增加更多开销。

老实说,我不确定要走哪条路,或者这些解决方案中的任何一个是否是我正在寻找的。所以我愿意接受所有关于如何解决这个问题的建议。

Kafka Connect does have Simple Message Transforms (SMTs),一个框架,用于在将源连接器生成的记录写入 Kafka 之前对它们进行微调,或者在将它们发送到接收器连接器之前对从 Kafka 读取的记录进行微调。大多数 SMT 都是非常简单的函数,但您可以将它们链接在一起以进行稍微复杂的操作。您始终可以使用自定义逻辑实现自己的转换,但无论每个转换一次对单个记录进行什么操作,都不应该调用其他服务。 SMT 仅用于对单个记录进行基本操作。

但是,您想要进行的更改可能比通过 SMT 进行的更改要复杂一些。 Kafka Streams seems like it is the best solution to this problem, since it allows you to create a simple stream processor that consumes the topic(s) produced by the source connector, alters (and possibly combines) the messages accordingly, and writes them out to other topic(s). Since you're already using Avro, you can write your Streams application to use Avro generic records (see this example) or with classes auto-generated from the Avro schemas (see this example)。

您还提到您有来自多个来源的数据,这些数据很可能会分开主题。如果您想集成、加入、组合或简单地将这些主题合并到其他主题中,那么 Kafka Streams 是实现此目的的好方法。

Kafka Streams apps 也只是普通的 Java 应用程序,因此您可以使用您选择的平台部署它们,无论是 Docker、Kubernetes、Mesos、AWS 还是其他平台.而且它们不需要像 Apache Spark 那样的 运行 分布式平台。