KStream 问题以 avro 格式读取之前和之后的有效负载

KStream issue to read payload coming as before and after in avro format

我正在 confluent kafka 平台上开发 KStream 应用程序。数据以嵌套的 avro 格式出现在源主题上,如下所述

{
   "type":"record",
   "namespace":"xyz",
   "table":"abc",
   "op_type":{
      "string":"D"
   },
   "op_ts":{
      "string":"2020-05-16 09:03:25.000462"
   },
   "pos":{
      "string":"00000000000000010722",
      "before":{
[
            "fields":            {
               "column1":"value"
            },
            {
               "column2":"value"
            },
            {
               "column3":"value"
            }
         ]
      },
      "after":{
[
            "fields":            {
               "column1":"value"
            },
            {
               "column2":"value"
            },
            {
               "column3":"value"
            }
         ]
      }
   }

我想根据 op_type="D" 过滤记录,因为我想将删除的记录分离到其他 Kafka 主题。

我在针对反序列化错误的输出主题编写模式时遇到问题:对于所需的行找到数组。

我已经使用 apache maven-avro 插件创建了 POJO 对象。

for "before tag" 我提到类型为数组并将 class 之前的对象传递给它。 与 "After tag" 相同,我提到类型为数组并将 after 的对象传递给它。 现在我不能在这里使用 KSQL,因为我的项目中没有服务。

需要如何解析此嵌套架构或任何其他方式将已删除的记录从源主题过滤到其他 kafka 主题的解决方案。

为了解决上述问题,我使用了通用的 avro 序列化器来处理使用 Kafka 流的数据。

如果我们使用特定的avro序列化器,我们无法过滤嵌套数据。使用通用的 avro 序列化器将有助于过滤带有字符串的特定记录。我使用它并解决了上述问题。