从 Kafka 读取到 Flink Scala Shell
Read from Kafka into Flink Scala Shell
我正在尝试在 Flink (1.7.2) 附带的 scala-shell 中连接到本地机器上的 Kafka (2.1) 并从中读取数据。
这是我正在做的事情:
:require flink-connector-kafka_2.11-1.7.1.jar
:require flink-connector-kafka-base_2.11-1.7.1.jar
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase
import java.util.Properties
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")
var stream = senv.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties)).print()
之后,最后一条语句出现以下错误:
scala> var stream = senv.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties)).print()
<console>:69: error: overloaded method value addSource with alternatives:
[T](function: org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext[T] => Unit)(implicit evidence: org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T] <and>
[T](function: org.apache.flink.streaming.api.functions.source.SourceFunction[T])(implicit evidence: org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T]
cannot be applied to (org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer[String])
var stream = senv.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties)).print()
我已经创建了名为 "topic" 的主题,并且我能够通过另一个客户端正确地从中生成和读取消息。我正在使用 java 版本 1.8.0_201 并按照 https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html .
中的说明进行操作
任何关于可能出错的帮助?
很可能你应该在添加源之前导入 Flink 的 Scala 隐式:
import org.apache.flink.streaming.api.scala._
一些依赖项隐含地需要其他依赖项。我们通常使用一些依赖管理器,如 maven 或 sbt ,当我们将一些依赖项添加到项目中时,依赖管理器将提供其隐式依赖项背景.
另一方面,当您使用 shells 时,没有依赖管理器,您需要负责提供您的代码依赖。使用 Flink Kafka 连接器明确需要 Flink Connector Kafka
jar,但您应该注意到 Flink Connector Kafka
也需要一些依赖项。您可以在 page 底部找到它的依赖项,它位于 Compile Dependencies 部分。所以从这个前言开始,我在目录 FLINK_HOME/lib
(Flink classpath) 中添加了以下 jar 文件:
flink-connector-kafka-0.11_2.11-1.4.2.jar
flink-connector-kafka-0.10_2.11-1.4.2.jar
flink-connector-kafka-0.9_2.11-1.4.2.jar
flink-connector-kafka-base_2.11-1.4.2.jar
flink-core-1.4.2.jar
kafka_2.11-2.1.1.jar
kafka-clients-2.1.0.jar
我可以在 Flink 中使用以下代码成功使用 Kafka 消息 shell:
scala> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
scala> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
scala> import java.util.Properties
import java.util.Properties
scala> val properties = new Properties()
properties: java.util.Properties = {}
scala> properties.setProperty("bootstrap.servers", "localhost:9092")
res0: Object = null
scala> properties.setProperty("group.id", "test")
res1: Object = null
scala> val stream = senv.addSource(new FlinkKafkaConsumer011[String]("topic", new SimpleStringSchema(), properties)).print()
warning: there was one deprecation warning; re-run with -deprecation for details
stream: org.apache.flink.streaming.api.datastream.DataStreamSink[String] = org.apache.flink.streaming.api.datastream.DataStreamSink@71de1091
scala> senv.execute("Kafka Consumer Test")
Submitting job with JobID: 23e3bb3466d914a2747ae5fed293a076. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@localhost:40093/user/jobmanager#1760995711] with leader session id 00000000-0000-0000-0000-000000000000.
03/11/2019 21:42:39 Job execution switched to status RUNNING.
03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to SCHEDULED
03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to SCHEDULED
03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to DEPLOYING
03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to DEPLOYING
03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to RUNNING
03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to RUNNING
hello
hello
此外,将一些 jar 文件添加到 Flink 类路径的另一种方法是将 jar 作为参数传递给 Flink shell 启动命令:
bin/start-scala-shell.sh local "--addclasspath <path/to/jar.jar>"
测试环境:
Flink 1.4.2
Kafka 2.1.0
Java 1.8 201
Scala 2.11
我正在尝试在 Flink (1.7.2) 附带的 scala-shell 中连接到本地机器上的 Kafka (2.1) 并从中读取数据。
这是我正在做的事情:
:require flink-connector-kafka_2.11-1.7.1.jar
:require flink-connector-kafka-base_2.11-1.7.1.jar
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase
import java.util.Properties
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")
var stream = senv.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties)).print()
之后,最后一条语句出现以下错误:
scala> var stream = senv.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties)).print()
<console>:69: error: overloaded method value addSource with alternatives:
[T](function: org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext[T] => Unit)(implicit evidence: org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T] <and>
[T](function: org.apache.flink.streaming.api.functions.source.SourceFunction[T])(implicit evidence: org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T]
cannot be applied to (org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer[String])
var stream = senv.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties)).print()
我已经创建了名为 "topic" 的主题,并且我能够通过另一个客户端正确地从中生成和读取消息。我正在使用 java 版本 1.8.0_201 并按照 https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html .
中的说明进行操作任何关于可能出错的帮助?
很可能你应该在添加源之前导入 Flink 的 Scala 隐式:
import org.apache.flink.streaming.api.scala._
一些依赖项隐含地需要其他依赖项。我们通常使用一些依赖管理器,如 maven 或 sbt ,当我们将一些依赖项添加到项目中时,依赖管理器将提供其隐式依赖项背景.
另一方面,当您使用 shells 时,没有依赖管理器,您需要负责提供您的代码依赖。使用 Flink Kafka 连接器明确需要 Flink Connector Kafka
jar,但您应该注意到 Flink Connector Kafka
也需要一些依赖项。您可以在 page 底部找到它的依赖项,它位于 Compile Dependencies 部分。所以从这个前言开始,我在目录 FLINK_HOME/lib
(Flink classpath) 中添加了以下 jar 文件:
flink-connector-kafka-0.11_2.11-1.4.2.jar
flink-connector-kafka-0.10_2.11-1.4.2.jar
flink-connector-kafka-0.9_2.11-1.4.2.jar
flink-connector-kafka-base_2.11-1.4.2.jar
flink-core-1.4.2.jar
kafka_2.11-2.1.1.jar
kafka-clients-2.1.0.jar
我可以在 Flink 中使用以下代码成功使用 Kafka 消息 shell:
scala> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
scala> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
scala> import java.util.Properties
import java.util.Properties
scala> val properties = new Properties()
properties: java.util.Properties = {}
scala> properties.setProperty("bootstrap.servers", "localhost:9092")
res0: Object = null
scala> properties.setProperty("group.id", "test")
res1: Object = null
scala> val stream = senv.addSource(new FlinkKafkaConsumer011[String]("topic", new SimpleStringSchema(), properties)).print()
warning: there was one deprecation warning; re-run with -deprecation for details
stream: org.apache.flink.streaming.api.datastream.DataStreamSink[String] = org.apache.flink.streaming.api.datastream.DataStreamSink@71de1091
scala> senv.execute("Kafka Consumer Test")
Submitting job with JobID: 23e3bb3466d914a2747ae5fed293a076. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@localhost:40093/user/jobmanager#1760995711] with leader session id 00000000-0000-0000-0000-000000000000.
03/11/2019 21:42:39 Job execution switched to status RUNNING.
03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to SCHEDULED
03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to SCHEDULED
03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to DEPLOYING
03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to DEPLOYING
03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to RUNNING
03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to RUNNING
hello
hello
此外,将一些 jar 文件添加到 Flink 类路径的另一种方法是将 jar 作为参数传递给 Flink shell 启动命令:
bin/start-scala-shell.sh local "--addclasspath <path/to/jar.jar>"
测试环境:
Flink 1.4.2
Kafka 2.1.0
Java 1.8 201
Scala 2.11