Eventhub Stream 未捕获架构不匹配

Eventhub Stream not catching schema mismatch

当我们从 eventhub 读取事件时,我们正在尝试实施 badRecordsPath,作为尝试使其正常工作的示例,我已经放入了应该使事件失败的模式:

eventStreamDF = (spark.readStream
  .format("eventhubs")
  .options(**eventHubsConf)
  .option("badRecordsPath", "/tmp/badRecordsPath/test1")
  .schema(badSchema)
  .load()
) 

然而这永远不会失败并且总是读取事件,这是数据块的 eventhub 的读取流的行为吗?目前的解决方法是根据我们自己的模式检查 inferSchema。

EventHubs 中的数据模式是固定的(请参阅 docs)(Kafka 也是如此)- 实际有效负载始终编码为名称为 body 的二进制字段,并且在开发人员根据数据生产者和该数据的消费者之间的“联系”解码此二进制有效负载。因此,即使您指定架构和 badRecordsPath 选项,也不会使用它们。

你需要实现一些函数来解码来自 JSON 的数据,或者其他东西,例如 return null 如果数据被破坏,然后你将有一个过滤器将空值拆分为两个子流 - 用于好数据和坏数据。