pyflink 中的 socketTextStream 在哪里
where's socketTextStream in pyflink
我想把下面的代码翻译成pyflink然后运行在pyflink-shell.sh之后
public class MapDemo {
private static int index = 1;
public static void main(String[] args) throws Exception {
//1.获取执行环境配置信息
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.定义加载或创建数据源(source),监听9000端口的socket消息
DataStream<String> textStream = env.socketTextStream("localhost", 9000, "\n");
//3.map操作。
DataStream<String> result = textStream.map(s -> (index++) + ".您输入的是:" + s);
//4.打印输出sink
result.print();
//5.开始执行
env.execute();
}
但是我在b_env
,bt_env
,s_env
,st_env
中找不到socketTextStream
那么 pyflink 中的 socketTextStream
在哪里 api?
从 Flink 1.12 开始,开箱即用的 PyFlink 似乎只支持这些连接器:
- FlinkKafka消费者
- FlinkKafkaProducer
- JdbcSink
- StreamingFileSink
参见https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/connectors.py。
因为socketTextStream
不支持exactly-once语义,所以一般不鼓励使用,PyFlink也没有包含它
是的,你可以在pyflink中使用socketTextStream
,虽然它还没有被正式支持。示例:
from pyflink.datastream import DataStream, StreamExecutionEnvironment
if __name__ == '__main__':
s_env = StreamExecutionEnvironment.get_execution_environment()
socket_stream = DataStream(s_env._j_stream_execution_environment.socketTextStream('localhost', 9999))
socket_stream.print()
s_env.execute('socket_stream')
我想把下面的代码翻译成pyflink然后运行在pyflink-shell.sh之后
public class MapDemo {
private static int index = 1;
public static void main(String[] args) throws Exception {
//1.获取执行环境配置信息
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.定义加载或创建数据源(source),监听9000端口的socket消息
DataStream<String> textStream = env.socketTextStream("localhost", 9000, "\n");
//3.map操作。
DataStream<String> result = textStream.map(s -> (index++) + ".您输入的是:" + s);
//4.打印输出sink
result.print();
//5.开始执行
env.execute();
}
但是我在b_env
,bt_env
,s_env
,st_env
socketTextStream
那么 pyflink 中的 socketTextStream
在哪里 api?
从 Flink 1.12 开始,开箱即用的 PyFlink 似乎只支持这些连接器:
- FlinkKafka消费者
- FlinkKafkaProducer
- JdbcSink
- StreamingFileSink
参见https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/connectors.py。
因为socketTextStream
不支持exactly-once语义,所以一般不鼓励使用,PyFlink也没有包含它
是的,你可以在pyflink中使用socketTextStream
,虽然它还没有被正式支持。示例:
from pyflink.datastream import DataStream, StreamExecutionEnvironment
if __name__ == '__main__':
s_env = StreamExecutionEnvironment.get_execution_environment()
socket_stream = DataStream(s_env._j_stream_execution_environment.socketTextStream('localhost', 9999))
socket_stream.print()
s_env.execute('socket_stream')