集群中的 Apache Flink 流式处理不会与工作人员拆分作业
Apache Flink streaming in cluster does not split jobs with workers
我的 objective 是使用 Kafka 作为源并使用 Flink 作为流处理引擎来设置一个高吞吐量集群。这是我所做的。
我已经在主服务器和工作服务器上设置了一个 2 节点集群,配置如下。
flink大师-conf.yaml
jobmanager.rpc.address: <MASTER_IP_ADDR> #localhost
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 256
taskmanager.heap.mb: 512
taskmanager.numberOfTaskSlots: 50
parallelism.default: 100
Worker flink-conf.yaml
jobmanager.rpc.address: <MASTER_IP_ADDR> #localhost
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 512 #256
taskmanager.heap.mb: 1024 #512
taskmanager.numberOfTaskSlots: 50
parallelism.default: 100
主节点上的slaves
文件如下所示:
<WORKER_IP_ADDR>
localhost
两个节点上的flink设置在同名的文件夹中。我通过 运行ning
在 master 上启动集群
bin/start-cluster-streaming.sh
这将在 Worker 节点上启动任务管理器。
我的输入源是Kafka。这是片段。
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> stream =
env.addSource(
new KafkaSource<String>(kafkaUrl,kafkaTopic, new SimpleStringSchema()));
stream.addSink(stringSinkFunction);
env.execute("Kafka stream");
这是我的 Sink 函数
public class MySink implements SinkFunction<String> {
private static final long serialVersionUID = 1L;
public void invoke(String arg0) throws Exception {
processMessage(arg0);
System.out.println("Processed Message");
}
}
这是我的 pom.xml.
中的 Flink 依赖项
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-core</artifactId>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>0.9.0</version>
</dependency>
然后我运行在master上用这个命令打包的jar
bin/flink run flink-test-jar-with-dependencies.jar
然而,当我将消息插入 Kafka 主题时,我能够单独在主节点上解释来自我的 Kafka 主题的所有消息(通过我 SinkFunction
实现的调用方法中的调试消息) .
在作业管理器中 UI 我可以看到 2 个任务管理器,如下所示:
此外,仪表板如下所示:
问题:
- 为什么工作节点没有得到任务?
- 我是不是缺少一些配置?
在 Flink 中读取 Kafka 源时,源任务的最大并行度受给定 Kafka 主题的分区数限制。 Kafka 分区是 Flink 中源任务可以使用的最小单元。如果分区比源任务多,那么有些任务会消耗多个分区。
因此,为了向所有 100 个任务提供输入,您应该确保您的 Kafka 主题至少有 100 个分区。
如果您不能更改主题的分区数,那么也可以使用 setParallelism
方法以较低的并行度从 Kafka 中读取。或者,您可以使用 rebalance
方法,该方法将跨前面操作的所有可用任务随机排列数据。
我的 objective 是使用 Kafka 作为源并使用 Flink 作为流处理引擎来设置一个高吞吐量集群。这是我所做的。
我已经在主服务器和工作服务器上设置了一个 2 节点集群,配置如下。
flink大师-conf.yaml
jobmanager.rpc.address: <MASTER_IP_ADDR> #localhost
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 256
taskmanager.heap.mb: 512
taskmanager.numberOfTaskSlots: 50
parallelism.default: 100
Worker flink-conf.yaml
jobmanager.rpc.address: <MASTER_IP_ADDR> #localhost
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 512 #256
taskmanager.heap.mb: 1024 #512
taskmanager.numberOfTaskSlots: 50
parallelism.default: 100
主节点上的slaves
文件如下所示:
<WORKER_IP_ADDR>
localhost
两个节点上的flink设置在同名的文件夹中。我通过 运行ning
在 master 上启动集群bin/start-cluster-streaming.sh
这将在 Worker 节点上启动任务管理器。
我的输入源是Kafka。这是片段。
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> stream =
env.addSource(
new KafkaSource<String>(kafkaUrl,kafkaTopic, new SimpleStringSchema()));
stream.addSink(stringSinkFunction);
env.execute("Kafka stream");
这是我的 Sink 函数
public class MySink implements SinkFunction<String> {
private static final long serialVersionUID = 1L;
public void invoke(String arg0) throws Exception {
processMessage(arg0);
System.out.println("Processed Message");
}
}
这是我的 pom.xml.
中的 Flink 依赖项<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-core</artifactId>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>0.9.0</version>
</dependency>
然后我运行在master上用这个命令打包的jar
bin/flink run flink-test-jar-with-dependencies.jar
然而,当我将消息插入 Kafka 主题时,我能够单独在主节点上解释来自我的 Kafka 主题的所有消息(通过我 SinkFunction
实现的调用方法中的调试消息) .
在作业管理器中 UI 我可以看到 2 个任务管理器,如下所示:
此外,仪表板如下所示:
- 为什么工作节点没有得到任务?
- 我是不是缺少一些配置?
在 Flink 中读取 Kafka 源时,源任务的最大并行度受给定 Kafka 主题的分区数限制。 Kafka 分区是 Flink 中源任务可以使用的最小单元。如果分区比源任务多,那么有些任务会消耗多个分区。
因此,为了向所有 100 个任务提供输入,您应该确保您的 Kafka 主题至少有 100 个分区。
如果您不能更改主题的分区数,那么也可以使用 setParallelism
方法以较低的并行度从 Kafka 中读取。或者,您可以使用 rebalance
方法,该方法将跨前面操作的所有可用任务随机排列数据。