在 Apache Flink 中解析来自 Kafka 的数据

Parsing data from Kafka in Apache Flink

我刚刚开始使用 Apache Flink (Scala API),我的问题如下: 我正在尝试根据 Flink 站点的一个示例将数据从 Kafka 流式传输到 Apache Flink:

val stream =
  env.addSource(new FlinkKafkaConsumer09("testing", new SimpleStringSchema() , properties))

一切正常,stream.print() 语句在屏幕上显示以下内容:

2018-05-16 10:22:44上午|1|11|-71.16|40.27

我想使用案例 class 来加载数据,我试过使用

flatMap(p=>p.split("|")) 

但它一次只拆分一个字符。

基本上预期的结果是能够填充案例class的5个字段如下

  field(0)=2018-05-16 10:22:44 AM
  field(1)=1
  field(2)=11
  field(3)=-71.16 
  field(4)=40.27

但它现在正在做:

   field(0) = 2
   field(1) = 0
   field(3) = 1
   field(4) = 8 

等...

如有任何建议,我们将不胜感激。

提前致谢

弗兰克

问题是 String.split 的用法。如果您使用 String 调用它,则该方法期望它是一个正则表达式。因此,p.split("\|") 将是输入数据的正确正则表达式。或者,您也可以在指定分隔符 p.split('|') 的地方调用 split 变体。两种解决方案都应为您提供所需的结果。