EMR 上的 Flink - 没有输出,无论是控制台还是文件

Flink on EMR - no output, either to console or to file

我正在尝试在 AWS EMR(Flink 1.4.2 版本 5.15)上部署我的 Flink 作业。但是,我无法从流中获得任何输出。 我尝试创建一个简单的工作:

object StreamingJob1 {
    def main(args: Array[String]) {
        val path = args(0)
        val file_input_format = new TextInputFormat(
            new org.apache.flink.core.fs.Path(path))
        file_input_format.setFilesFilter(FilePathFilter.createDefaultFilter())
        file_input_format.setNestedFileEnumeration(true)

        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val myStream: DataStream[String] =
            env.readFile(file_input_format,
                path,
                FileProcessingMode.PROCESS_CONTINUOUSLY,
                1000L)
                .map(s => s.split(",").toString)

        myStream.print()
        // execute program
        env.execute("Flink Streaming Scala")
    }
}

然后我使用以下命令执行了它:

HADOOP_CONF_DIR=/etc/hadoop/conf; flink run -m yarn-cluster -yn 4 -c my.pkg.StreamingJob1 /home/hadoop/flink-test-0.1.jar hdfs:///user/hadoop/data/

没有错误,但是屏幕上除了 flink 的 INFO 日志外没有任何输出。

我尝试输出到 Kinesis 流或 S3 文件。什么都没有记录。

    myStream.addSink(new BucketingSink[String](output_path))

我还尝试写入 HDFS 文件。在这种情况下,创建了一个文件,但大小为 0。 我确定输入文件已使用简单检查进行了处理:

myStream.map(s => {"abc".toInt})

产生异常。

我在这里错过了什么?

看起来 stream.print() 在 EMR 上不起作用。

输出到文件:使用HDFS,有时(或大部分时间)需要等待文件更新

Kinesis 的输出:我的流名称中有错字。我不知道为什么我没有得到该流不存在的任何异常。但是,在更正名称后,我收到了预期的消息。