Kafka 流与 Scala

Kafka streams with Scala

我正在尝试将 kafka 流与 scala 一起使用 下面是我在 Java 中的代码,它工作得很好:

KStreamBuilder builder = new KStreamBuilder();
    KStream<String, String> textLines = builder.stream("TextLinesTopic");
    textLines.foreach((key,values) -> {
        System.out.println(values);
    });

    KafkaStreams streams = new KafkaStreams(builder, config);
    streams.start();

我的scala代码如下:

  val builder = new KStreamBuilder
  val textLines:KStream[String, String]  = builder.stream("TextLinesTopic")
  textLines.foreach((key,value)-> {
   println(key)
  })

  val streams = new KafkaStreams(builder, config)
  streams.start()

scala 代码抛出编译错误。类型不匹配 expected:ForEachAction[>String,>String],Actual((any,any), Unit) 不是 found:value 键 未找到:值值

有谁知道如何在 Scala 中使用流 API

你的语法有误:)。 -> 只是创建对的运算符,所以表达式

(key,value)-> {
  println(key)
}

具有类型 ((Any, Any), Unit),因为编译器无法推断任何类型信息(并且缺少 keyvalue

如果您使用的是 scala 2.12,将 -> 替换为 => 应该可以解决问题,但是如果您使用的是旧版本的 scala,则必须显式实现 java 双功能:

 textLines.foreach(new BiFunction[T1, T2] { ... })

可以直接使用print方法打印kafkastream

textlines.print

它将打印kafka流。您甚至可以通过将参数传递给打印函数来打印键或值。