如何使用 Apache Flink 读取 websocket 数据
How to read websocket data using Apache Flink
我正在尝试使用 Apache Flink
从 websocket
读取数据
我的 Flink 作业正在连接到 websocket,但它没有从 websocket 中提取数据。
下面是我尝试使用 Apache flink 连接到 websocket 的示例代码 API
运行() method
在 RichSourceFunction
既不执行也不抛出任何错误。
@Slf4j
public class Main {
public static final int CHECKPOINTING_INTERVAL_MS = 5000;
private static final String JOB_NAME = "Flink Streaming Java API Skeleton";
/**
* Main Flink job.
*
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final ObjectMapper objectMapper = new ObjectMapper();
env.setParallelism(4);
ParameterTool paramTool = ParameterTool.fromArgs(args);
env.getConfig().setGlobalJobParameters(paramTool);
DataStreamSource<String> mySocketStream = env.addSource(new MyWebSocketSourceFunc());
mySocketStream.map(new MapIt()).print();
// mySocketStream.print();
env.enableCheckpointing(CHECKPOINTING_INTERVAL_MS);
env.setRestartStrategy(RestartStrategies.noRestart());
env.execute(JOB_NAME);
}
public static class MyWebSocketSourceFunc extends RichSourceFunction<String> {
private boolean running = true;
transient AsyncHttpClient client;
transient BoundRequestBuilder boundRequestBuilder;
transient WebSocketUpgradeHandler.Builder webSocketListener;
private BlockingQueue<String> messages = new ArrayBlockingQueue<>(100);
@Override
public void run(SourceContext<String> ctx) throws Exception {
WebSocketUpgradeHandler webSocketUpgradeHandler = webSocketListener.addWebSocketListener(
new WebSocketListener() {
private final ObjectMapper myMapper = new ObjectMapper();
private String getRsvpId(String payload) {
try {
Map map = myMapper.readValue(payload, Map.class);
Object rsvpId = map.get("rsvp_id");
return rsvpId != null ? rsvpId.toString() : "NOT FOUND";
} catch (IOException e) {
log.error("Mapping failed, returning 'null'");
return "NULL";
}
}
@Override
public void onOpen(WebSocket webSocket) {
}
@Override
public void onClose(WebSocket webSocket, int i, String s) {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onTextFrame(String payload, boolean finalFragment, int rsv) {
log.debug("onTextFrame({}), rsvp_id={}", hash(payload), getRsvpId(payload));
if (payload != null) {
try {
messages.put(payload);
} catch (InterruptedException e) {
log.error("Interrupted!", e);
Thread.currentThread().interrupt();
}
}
}
}).build();
boundRequestBuilder.execute(webSocketUpgradeHandler).get();
while (running) {
ctx.collect(messages.take());
}
running = false;
}
@Override
public void cancel() {
log.info("cancel function called");
running = false;
}
@Override
public void open(Configuration parameters) throws Exception {
log.info("open function called");
super.open(parameters);
client = Dsl.asyncHttpClient();
boundRequestBuilder = client.prepareGet("ws://stream.meetup.com/2/rsvps");
webSocketListener = new WebSocketUpgradeHandler.Builder();
}
private String hash(String input) {
if (input == null) {
return "-- NULL --";
}
try {
MessageDigest md = MessageDigest.getInstance("MD5");
md.update(input.getBytes());
byte[] digest = md.digest();
return DatatypeConverter.printHexBinary(digest).toUpperCase();
} catch (NoSuchAlgorithmException e) {
log.error("Cound not instantiate MD5", e);
return "--NOT CALCULATED--";
}
}
}
public static class MapIt extends RichMapFunction<String, String> {
final ObjectMapper objectMapper = new ObjectMapper();
@Override
public String map(String value) throws Exception {
Map<String, Object> mapped = objectMapper.readValue(value, Map.class);
Object rsvp = mapped.get("rsvp_id");
return rsvp != null ? rsvp.toString() : "null" ;
}
}
}
这是我执行这项工作所遵循的参考文档
Referance
蒂亚。
Flink 包含一个 built-in 套接字源连接器。你会发现 an example showing how to use it in the documentation。这将比调试其他实现更容易。
另请注意,不建议在生产应用程序中使用套接字,因为它们无法提供任何容错保证(因为它们无法支持检查点)。
我正在尝试使用 Apache Flink
websocket
读取数据
我的 Flink 作业正在连接到 websocket,但它没有从 websocket 中提取数据。
下面是我尝试使用 Apache flink 连接到 websocket 的示例代码 API
运行() method
在 RichSourceFunction
既不执行也不抛出任何错误。
@Slf4j
public class Main {
public static final int CHECKPOINTING_INTERVAL_MS = 5000;
private static final String JOB_NAME = "Flink Streaming Java API Skeleton";
/**
* Main Flink job.
*
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final ObjectMapper objectMapper = new ObjectMapper();
env.setParallelism(4);
ParameterTool paramTool = ParameterTool.fromArgs(args);
env.getConfig().setGlobalJobParameters(paramTool);
DataStreamSource<String> mySocketStream = env.addSource(new MyWebSocketSourceFunc());
mySocketStream.map(new MapIt()).print();
// mySocketStream.print();
env.enableCheckpointing(CHECKPOINTING_INTERVAL_MS);
env.setRestartStrategy(RestartStrategies.noRestart());
env.execute(JOB_NAME);
}
public static class MyWebSocketSourceFunc extends RichSourceFunction<String> {
private boolean running = true;
transient AsyncHttpClient client;
transient BoundRequestBuilder boundRequestBuilder;
transient WebSocketUpgradeHandler.Builder webSocketListener;
private BlockingQueue<String> messages = new ArrayBlockingQueue<>(100);
@Override
public void run(SourceContext<String> ctx) throws Exception {
WebSocketUpgradeHandler webSocketUpgradeHandler = webSocketListener.addWebSocketListener(
new WebSocketListener() {
private final ObjectMapper myMapper = new ObjectMapper();
private String getRsvpId(String payload) {
try {
Map map = myMapper.readValue(payload, Map.class);
Object rsvpId = map.get("rsvp_id");
return rsvpId != null ? rsvpId.toString() : "NOT FOUND";
} catch (IOException e) {
log.error("Mapping failed, returning 'null'");
return "NULL";
}
}
@Override
public void onOpen(WebSocket webSocket) {
}
@Override
public void onClose(WebSocket webSocket, int i, String s) {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onTextFrame(String payload, boolean finalFragment, int rsv) {
log.debug("onTextFrame({}), rsvp_id={}", hash(payload), getRsvpId(payload));
if (payload != null) {
try {
messages.put(payload);
} catch (InterruptedException e) {
log.error("Interrupted!", e);
Thread.currentThread().interrupt();
}
}
}
}).build();
boundRequestBuilder.execute(webSocketUpgradeHandler).get();
while (running) {
ctx.collect(messages.take());
}
running = false;
}
@Override
public void cancel() {
log.info("cancel function called");
running = false;
}
@Override
public void open(Configuration parameters) throws Exception {
log.info("open function called");
super.open(parameters);
client = Dsl.asyncHttpClient();
boundRequestBuilder = client.prepareGet("ws://stream.meetup.com/2/rsvps");
webSocketListener = new WebSocketUpgradeHandler.Builder();
}
private String hash(String input) {
if (input == null) {
return "-- NULL --";
}
try {
MessageDigest md = MessageDigest.getInstance("MD5");
md.update(input.getBytes());
byte[] digest = md.digest();
return DatatypeConverter.printHexBinary(digest).toUpperCase();
} catch (NoSuchAlgorithmException e) {
log.error("Cound not instantiate MD5", e);
return "--NOT CALCULATED--";
}
}
}
public static class MapIt extends RichMapFunction<String, String> {
final ObjectMapper objectMapper = new ObjectMapper();
@Override
public String map(String value) throws Exception {
Map<String, Object> mapped = objectMapper.readValue(value, Map.class);
Object rsvp = mapped.get("rsvp_id");
return rsvp != null ? rsvp.toString() : "null" ;
}
}
}
这是我执行这项工作所遵循的参考文档 Referance
蒂亚。
Flink 包含一个 built-in 套接字源连接器。你会发现 an example showing how to use it in the documentation。这将比调试其他实现更容易。
另请注意,不建议在生产应用程序中使用套接字,因为它们无法提供任何容错保证(因为它们无法支持检查点)。