Flink 以 Kafka 为源
Flink with Kafka as a source
我正在尝试从 Flink 流中的 kafka 主题读取数据。我正在尝试 运行 以下示例代码,它在 APACHE Flink 1.1.3 文档页面上作为示例:Apache kafka 连接器,
import java.util.Properties;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
public class stock_streaming_kafka {
public static void main(String[] args) throws Exception
{
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
FlinkKafkaConsumer09<String> myConsumer = new FlinkKafkaConsumer09<>("nsestocks4k", new SimpleStringSchema(), properties);
DataStream<String> stream = env
.addSource(myConsumer)
.print();
}
}
我有以下错误:
Exception in thread "main" java.lang.Error: Unresolved compilation problems:
The type org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase cannot be resolved. It is indirectly referenced from required .class files
The method addSource(SourceFunction<OUT>) in the type StreamExecutionEnvironment is not applicable for the arguments (FlinkKafkaConsumer09<String>)
at stock_streaming_kafka.main(stock_streaming_kafka.java:25)
你能指导我解决这个问题吗? Kafka 连接器是否存在任何依赖性问题。
我的版本是:
- Flink 1.1.3
- 卡夫卡 2.10
- flink-connector-kafka-0.9_2.11-1.0.0.jar
Flink 和 Flink connector 的版本必须匹配。
将 flink-connector
依赖项更新为 1.1.3.
请使用以下版本。它适用于您的 Kafka 版本。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.1.4</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.9_2.10</artifactId>
<version>1.1.3</version>
</dependency>
我发现代码中存在编译问题。
改变这个:
DataStream<String> stream = env
.addSource(myConsumer)
.print();
至:
DataStream<String> stream = env
.addSource(myConsumer);
stream.print();
如果它仍然不适合您,请告诉我,我将分享工作代码。
由于答案还没有被采纳,这里complete Maven code example使用Flink从Kafka中读取数据。
您可能需要调整 pom.xml 以匹配您的 Kafka 和 Scala 版本设置。
希望这对您有所帮助。
我正在尝试从 Flink 流中的 kafka 主题读取数据。我正在尝试 运行 以下示例代码,它在 APACHE Flink 1.1.3 文档页面上作为示例:Apache kafka 连接器,
import java.util.Properties;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
public class stock_streaming_kafka {
public static void main(String[] args) throws Exception
{
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
FlinkKafkaConsumer09<String> myConsumer = new FlinkKafkaConsumer09<>("nsestocks4k", new SimpleStringSchema(), properties);
DataStream<String> stream = env
.addSource(myConsumer)
.print();
}
}
我有以下错误:
Exception in thread "main" java.lang.Error: Unresolved compilation problems:
The type org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase cannot be resolved. It is indirectly referenced from required .class files
The method addSource(SourceFunction<OUT>) in the type StreamExecutionEnvironment is not applicable for the arguments (FlinkKafkaConsumer09<String>)
at stock_streaming_kafka.main(stock_streaming_kafka.java:25)
你能指导我解决这个问题吗? Kafka 连接器是否存在任何依赖性问题。 我的版本是:
- Flink 1.1.3
- 卡夫卡 2.10
- flink-connector-kafka-0.9_2.11-1.0.0.jar
Flink 和 Flink connector 的版本必须匹配。
将 flink-connector
依赖项更新为 1.1.3.
请使用以下版本。它适用于您的 Kafka 版本。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.1.4</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.9_2.10</artifactId>
<version>1.1.3</version>
</dependency>
我发现代码中存在编译问题。
改变这个:
DataStream<String> stream = env
.addSource(myConsumer)
.print();
至:
DataStream<String> stream = env
.addSource(myConsumer);
stream.print();
如果它仍然不适合您,请告诉我,我将分享工作代码。
由于答案还没有被采纳,这里complete Maven code example使用Flink从Kafka中读取数据。
您可能需要调整 pom.xml 以匹配您的 Kafka 和 Scala 版本设置。
希望这对您有所帮助。