Python Flink连接远程Flink环境
Python Flink Connect to Remote Flink Environment
我在远程系统中有 flink 系统 运行.. 假设 IP 为 10.XX.XX.XX,端口为 6123。
现在我想使用远程执行环境使用 Pyflink 从另一个系统连接。我看过文档 https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/table-api-users-guide/table_environment.html 但不清楚。有什么指点吗?
我相信这样做应该足够了:
./bin/flink run \
--jobmanager <jobmanagerHost>:8081 \
--python examples/python/table/batch/word_count.py
请参阅 Submitting PyFlink Jobs,这是我找到此示例的地方。
似乎没有以编程方式执行此操作的内置方法,但我能够正确地使用:
from pyflink.java_gateway import get_gateway
gateway = get_gateway()
string_class = gateway.jvm.String
string_array = gateway.new_array(string_class, 0)
stream_env = gateway.jvm.org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
j_stream_exection_environment = stream_env.createRemoteEnvironment(
"localhost",
8081,
string_array
)
env = StreamExecutionEnvironment(j_stream_exection_environment)
我在远程系统中有 flink 系统 运行.. 假设 IP 为 10.XX.XX.XX,端口为 6123。 现在我想使用远程执行环境使用 Pyflink 从另一个系统连接。我看过文档 https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/table-api-users-guide/table_environment.html 但不清楚。有什么指点吗?
我相信这样做应该足够了:
./bin/flink run \
--jobmanager <jobmanagerHost>:8081 \
--python examples/python/table/batch/word_count.py
请参阅 Submitting PyFlink Jobs,这是我找到此示例的地方。
似乎没有以编程方式执行此操作的内置方法,但我能够正确地使用:
from pyflink.java_gateway import get_gateway
gateway = get_gateway()
string_class = gateway.jvm.String
string_array = gateway.new_array(string_class, 0)
stream_env = gateway.jvm.org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
j_stream_exection_environment = stream_env.createRemoteEnvironment(
"localhost",
8081,
string_array
)
env = StreamExecutionEnvironment(j_stream_exection_environment)