在不使用 netcat 服务器的情况下从套接字读取 Flink 中的字符串数据流
read string datastream in Flink from socket without using netcat server
我有一个案例场景,我有一个流生成器 client,它生成多个流,合并它们并将其发送到套接字,我希望 Flink 程序听它作为 服务器 。正如我们所知,必须首先启动服务器,以便它可以侦听客户端请求。我尝试使用下面给出的代码来做同样的事情
public static void main(String[] args) throws Exception {
//setting the envrionment variable as StreamExecutionEnvironment
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
DataStream<String> stream1 = environment.socketTextStream("localhost", 9000);
stream1.print();
//start the execution
environment.execute(" Started the execution ");
}// main
流生成器作为客户端的代码如下
DataStream<Event> stream1 = envrionment
.addSource(new EventGenerator(2,60,1,1,100, 200 ))
.name("stream 1")
.setParallelism(parallelism_for_stream_rr);
DataStream<Event> stream2 = envrionment
.addSource(new EventGenerator(3,60,1,2,10, 20 ))
.name("stream 2")
.setParallelism(parallelism_for_stream_rr);
DataStream<Event> stream3 = envrionment
.addSource(new EventGenerator(5,60,1,3,30, 40 ))
.name("stream 3")
.setParallelism(parallelism_for_stream_rr);
DataStream<Event> merged = stream1.union(stream2,stream3);
merged.print();
// sending data to Mobile Cep via socket
merged.map(new MapFunction<Event, String>() {
@Override
public String map(Event event) throws Exception {
String tuple = event.toString();
return tuple + "\n";
}
}).writeToSocket("localhost", 9000, new SimpleStringSchema() );
问题 #1:问题是客户端代码仅在我启动 Netcat 服务器时工作,但随后 Netcat 服务器不转发数据 streams.IfNetcat 服务器未启动,客户端代码说它不能建立联系
问题 2:如果 Netcat 服务器未启动,Flink 程序不会执行
Caused by: java.net.ConnectException: Connection refused (Connection refused)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
我知道一个可能的解决方案是在 Flink 程序中生成流,但我想通过套接字接收流。
提前致谢~
Flink 的套接字源和接收器都没有启动 TCP 服务器并等待传入连接。它们都是连接到已经启动的 TCP 服务器的客户端。这也是您必须在启动作业之前启动 netcat
的原因。如果你想写入和读取套接字,那么你必须编写一个 TCP 服务器,它可以缓冲传入的数据并在客户端连接到它时转发它们。
我有一个案例场景,我有一个流生成器 client,它生成多个流,合并它们并将其发送到套接字,我希望 Flink 程序听它作为 服务器 。正如我们所知,必须首先启动服务器,以便它可以侦听客户端请求。我尝试使用下面给出的代码来做同样的事情
public static void main(String[] args) throws Exception {
//setting the envrionment variable as StreamExecutionEnvironment
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
DataStream<String> stream1 = environment.socketTextStream("localhost", 9000);
stream1.print();
//start the execution
environment.execute(" Started the execution ");
}// main
流生成器作为客户端的代码如下
DataStream<Event> stream1 = envrionment
.addSource(new EventGenerator(2,60,1,1,100, 200 ))
.name("stream 1")
.setParallelism(parallelism_for_stream_rr);
DataStream<Event> stream2 = envrionment
.addSource(new EventGenerator(3,60,1,2,10, 20 ))
.name("stream 2")
.setParallelism(parallelism_for_stream_rr);
DataStream<Event> stream3 = envrionment
.addSource(new EventGenerator(5,60,1,3,30, 40 ))
.name("stream 3")
.setParallelism(parallelism_for_stream_rr);
DataStream<Event> merged = stream1.union(stream2,stream3);
merged.print();
// sending data to Mobile Cep via socket
merged.map(new MapFunction<Event, String>() {
@Override
public String map(Event event) throws Exception {
String tuple = event.toString();
return tuple + "\n";
}
}).writeToSocket("localhost", 9000, new SimpleStringSchema() );
问题 #1:问题是客户端代码仅在我启动 Netcat 服务器时工作,但随后 Netcat 服务器不转发数据 streams.IfNetcat 服务器未启动,客户端代码说它不能建立联系
问题 2:如果 Netcat 服务器未启动,Flink 程序不会执行
Caused by: java.net.ConnectException: Connection refused (Connection refused)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
我知道一个可能的解决方案是在 Flink 程序中生成流,但我想通过套接字接收流。
提前致谢~
Flink 的套接字源和接收器都没有启动 TCP 服务器并等待传入连接。它们都是连接到已经启动的 TCP 服务器的客户端。这也是您必须在启动作业之前启动 netcat
的原因。如果你想写入和读取套接字,那么你必须编写一个 TCP 服务器,它可以缓冲传入的数据并在客户端连接到它时转发它们。