在 Apache NiFi 中使用 AvroSchemaRegistry

Using AvroSchemaRegistry in Apache NiFi

我有 5 种不同的 CSVReader 控制器服务。它们的配置除了schema text(因为headers不同)和1CSVRecordSetWriter.

外都是一样的

我想只留下一个CSVReader,动态设置schema text。我读到了 AvroSchemaRegistry 但我并不清楚如何使用它。

我是否应该创建 5 个不同的 AvroSchemaRegistry 具有 2 个属性的控制器:namevalue? F.e。我想放置以下架构:

{
    "type": "record",
    "name": "campaigns",
    "namespace": "common",
    "fields": [
        {"name": "campaign_name", "type": "string"},
        {"name": "campaign_id", "type": "long"},
        {"name": "date", "type" : {"type": "int", "logicalType" : "date"}}
    ]
}

我应该创建 AvroSchemaRegistry

对于另一个模式,我应该创建另一个 AvroSchemaResgitry 具有另一个属性 namevalue 的控制器?

之后,如何配置 CSVReaderCSVRecordSetWriter 以使用这些模式?最后,我应该如何处理流文件?添加额外的属性?什么样的?

像这样配置您的流程(根据您的要求进行更改),

  1. UpdateAttribute 配置到 derive/hard 代码流文件特定模式-

  1. ValidateRecord 配置使用通用 csv reader 并动态传递模式 -

  1. CSVReader 控制器服务使用动态传递的模式并设置模式访问策略 -

如果您希望使用 NiFi 支持的架构注册表,则将所有架构放入注册表中,并为记录 reader/writer 设置 schema.nameaccess strategy 属性以从注册表访问架构,但是首先,您需要 add/configure 控制器服务中的模式注册表提供程序。

更新:

HortonworksSchemaRegistry 的示例 SchemaRegistry 控制器服务配置(ConfluentSchemaRegistry 和 AvroSchemaRegistry 具有或多或少相同的属性)。重要的是参数注册表 API 在执行查找操作时期望什么,所以我们只需要从 Reader/Writer 控制器服务 Schema Name 属性 传递相同的值,请 refer to this 想出主意。

架构注册表 URL : http://example.com:7788/api/v1