Flink statefun 同地函数通信
Flink statefun co-located functions communication
我有一个正常工作的嵌入式作业,我想部署额外的同地作业。
这些新添加的作业会收到旧作业的消息,并将其发送到kafka主题。
代码如下
@AutoService(StatefulFunctionModule.class)
public final class CoLocatedModule implements StatefulFunctionModule {
@Override
public void configure(Map<String, String> globalConfiguration, Binder binder) {
FunctionProvider provider = new FunctionProvider();
binder.bindFunctionProvider( CoLocated.TYPE,provider );
binder.bindEgress(KafkaSpecs.TO_TRANSACTION_SPEC);
}
}
我收到如下错误
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: There are no ingress defined.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at org.apache.flink.client.cli.CliFrontend.lambda$main(CliFrontend.java:968)
at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: java.lang.IllegalStateException: There are no ingress defined.
at org.apache.flink.statefun.flink.core.StatefulFunctionsUniverseValidator.validate(StatefulFunctionsUniverseValidator.java:25)
at org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:66)
at org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:41)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:567)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
错误不言自明,需要我定义入口。
在link中有一个类似定义的嵌入式模块->
https://ci.apache.org/projects/flink/flink-statefun-docs-stable/sdk/modules.html#embedded-module
新定义的模块将接收来自其他模块的消息并将它们发送到kafka。
- 我是否必须为每个位于同一地点的工作定义入口?如果不是,我怎样才能完成这项工作?
- 我怎样才能获得同地工作以进行交流?使用相同的 FunctionType 是否足够?
- 位于同一地点的功能是否通过 ingress/egress 进行通信?
内联回复,仅供参考,您所问的内容都不是特定的。这些属性适用于包含同地和远程混合工作负载的远程模块和作业。
Do I have to define ingress for every co-located job? If not how can I make this work?
是的,每项工作(远程或同地)都需要至少一个入口。入口是将来自外部世界的消息消费到 statefun 应用程序中的通道。想想 Kafka 或 Kinesis。没有入口,作业将永远不会做任何事情,因为没有初始消息来开始处理。
对于每个入口,您将绑定 1 个或多个路由器,它们从入口获取每条消息,并根据它们的函数类型将它们转发到 0 个或多个函数[1]。
How can I get co-located jobs to communicate? Is it enough to use the same FunctionType?
是的,函数只是使用它们的函数类型相互发送消息。
Are co-located functions communicating over ingress/egress?
不,消息是使用包含高度优化的网络堆栈的 Apache Flink 运行时在函数之间传递的。一旦一条消息从一个入口中被拉出,它就再也不会与那个入口交互。如果有兴趣,您可以在社区写的一些博客文章中了解 Flink 的网络堆栈是如何工作的,但这并不是在生产中成功使用 statefun 所必需的 [2]。
[1] https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.0/io-module/index.html#router
[2]https://flink.apache.org/2019/06/05/flink-network-stack.html
我有一个正常工作的嵌入式作业,我想部署额外的同地作业。 这些新添加的作业会收到旧作业的消息,并将其发送到kafka主题。
代码如下
@AutoService(StatefulFunctionModule.class)
public final class CoLocatedModule implements StatefulFunctionModule {
@Override
public void configure(Map<String, String> globalConfiguration, Binder binder) {
FunctionProvider provider = new FunctionProvider();
binder.bindFunctionProvider( CoLocated.TYPE,provider );
binder.bindEgress(KafkaSpecs.TO_TRANSACTION_SPEC);
}
}
我收到如下错误
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: There are no ingress defined.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at org.apache.flink.client.cli.CliFrontend.lambda$main(CliFrontend.java:968)
at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: java.lang.IllegalStateException: There are no ingress defined.
at org.apache.flink.statefun.flink.core.StatefulFunctionsUniverseValidator.validate(StatefulFunctionsUniverseValidator.java:25)
at org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:66)
at org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:41)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:567)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
错误不言自明,需要我定义入口。
在link中有一个类似定义的嵌入式模块-> https://ci.apache.org/projects/flink/flink-statefun-docs-stable/sdk/modules.html#embedded-module
新定义的模块将接收来自其他模块的消息并将它们发送到kafka。
- 我是否必须为每个位于同一地点的工作定义入口?如果不是,我怎样才能完成这项工作?
- 我怎样才能获得同地工作以进行交流?使用相同的 FunctionType 是否足够?
- 位于同一地点的功能是否通过 ingress/egress 进行通信?
内联回复,仅供参考,您所问的内容都不是特定的。这些属性适用于包含同地和远程混合工作负载的远程模块和作业。
Do I have to define ingress for every co-located job? If not how can I make this work?
是的,每项工作(远程或同地)都需要至少一个入口。入口是将来自外部世界的消息消费到 statefun 应用程序中的通道。想想 Kafka 或 Kinesis。没有入口,作业将永远不会做任何事情,因为没有初始消息来开始处理。
对于每个入口,您将绑定 1 个或多个路由器,它们从入口获取每条消息,并根据它们的函数类型将它们转发到 0 个或多个函数[1]。
How can I get co-located jobs to communicate? Is it enough to use the same FunctionType?
是的,函数只是使用它们的函数类型相互发送消息。
Are co-located functions communicating over ingress/egress?
不,消息是使用包含高度优化的网络堆栈的 Apache Flink 运行时在函数之间传递的。一旦一条消息从一个入口中被拉出,它就再也不会与那个入口交互。如果有兴趣,您可以在社区写的一些博客文章中了解 Flink 的网络堆栈是如何工作的,但这并不是在生产中成功使用 statefun 所必需的 [2]。
[1] https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.0/io-module/index.html#router
[2]https://flink.apache.org/2019/06/05/flink-network-stack.html