使用 Apache Flink 从网络中获取 JSON 个元素

Get JSON elements from a web with Apache Flink

阅读 Apache Flink 的几个文档页面后(official documentation, dataartisans) as well as the examples provided in the official repository,我不断看到示例,其中它们用作流式传输已下载文件的数据源,始终连接到本地主机。

我正在尝试使用 Apache Flink 下载 JSON 包含动态数据的文件。我的目的是尝试建立 url 可以访问 JSON 文件作为 Apache Flink 的输入源,而不是用另一个系统下载它并用 Apache Flink 处理下载的文件。

是否可以与 Apache Flink 建立此网络连接?

您可以将要下载的 URL 定义为输入 DataStream,然后从 MapFunction 中下载文档。以下代码演示了这一点:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> inputURLs = env.fromElements("http://www.json.org/index.html");

inputURLs.map(new MapFunction<String, String>() {
    @Override
    public String map(String s) throws Exception {
        URL url = new URL(s);
        InputStream is = url.openStream();

        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(is));

        StringBuilder builder = new StringBuilder();
        String line;

        try {
            while ((line = bufferedReader.readLine()) != null) {
                builder.append(line + "\n");
            }
        } catch (IOException ioe) {
            ioe.printStackTrace();
        }

        try {
            bufferedReader.close();
        } catch (IOException ioe) {
            ioe.printStackTrace();
        }

        return builder.toString();
    }
}).print();

env.execute("URL download job");