EMR 上的 Flink - 没有输出,无论是控制台还是文件
Flink on EMR - no output, either to console or to file
我正在尝试在 AWS EMR(Flink 1.4.2 版本 5.15)上部署我的 Flink 作业。但是,我无法从流中获得任何输出。
我尝试创建一个简单的工作:
object StreamingJob1 {
def main(args: Array[String]) {
val path = args(0)
val file_input_format = new TextInputFormat(
new org.apache.flink.core.fs.Path(path))
file_input_format.setFilesFilter(FilePathFilter.createDefaultFilter())
file_input_format.setNestedFileEnumeration(true)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val myStream: DataStream[String] =
env.readFile(file_input_format,
path,
FileProcessingMode.PROCESS_CONTINUOUSLY,
1000L)
.map(s => s.split(",").toString)
myStream.print()
// execute program
env.execute("Flink Streaming Scala")
}
}
然后我使用以下命令执行了它:
HADOOP_CONF_DIR=/etc/hadoop/conf; flink run -m yarn-cluster -yn 4 -c my.pkg.StreamingJob1 /home/hadoop/flink-test-0.1.jar hdfs:///user/hadoop/data/
没有错误,但是屏幕上除了 flink 的 INFO 日志外没有任何输出。
我尝试输出到 Kinesis 流或 S3 文件。什么都没有记录。
myStream.addSink(new BucketingSink[String](output_path))
我还尝试写入 HDFS 文件。在这种情况下,创建了一个文件,但大小为 0。
我确定输入文件已使用简单检查进行了处理:
myStream.map(s => {"abc".toInt})
产生异常。
我在这里错过了什么?
看起来 stream.print() 在 EMR 上不起作用。
输出到文件:使用HDFS,有时(或大部分时间)需要等待文件更新
Kinesis 的输出:我的流名称中有错字。我不知道为什么我没有得到该流不存在的任何异常。但是,在更正名称后,我收到了预期的消息。
我正在尝试在 AWS EMR(Flink 1.4.2 版本 5.15)上部署我的 Flink 作业。但是,我无法从流中获得任何输出。 我尝试创建一个简单的工作:
object StreamingJob1 {
def main(args: Array[String]) {
val path = args(0)
val file_input_format = new TextInputFormat(
new org.apache.flink.core.fs.Path(path))
file_input_format.setFilesFilter(FilePathFilter.createDefaultFilter())
file_input_format.setNestedFileEnumeration(true)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val myStream: DataStream[String] =
env.readFile(file_input_format,
path,
FileProcessingMode.PROCESS_CONTINUOUSLY,
1000L)
.map(s => s.split(",").toString)
myStream.print()
// execute program
env.execute("Flink Streaming Scala")
}
}
然后我使用以下命令执行了它:
HADOOP_CONF_DIR=/etc/hadoop/conf; flink run -m yarn-cluster -yn 4 -c my.pkg.StreamingJob1 /home/hadoop/flink-test-0.1.jar hdfs:///user/hadoop/data/
没有错误,但是屏幕上除了 flink 的 INFO 日志外没有任何输出。
我尝试输出到 Kinesis 流或 S3 文件。什么都没有记录。
myStream.addSink(new BucketingSink[String](output_path))
我还尝试写入 HDFS 文件。在这种情况下,创建了一个文件,但大小为 0。 我确定输入文件已使用简单检查进行了处理:
myStream.map(s => {"abc".toInt})
产生异常。
我在这里错过了什么?
看起来 stream.print() 在 EMR 上不起作用。
输出到文件:使用HDFS,有时(或大部分时间)需要等待文件更新
Kinesis 的输出:我的流名称中有错字。我不知道为什么我没有得到该流不存在的任何异常。但是,在更正名称后,我收到了预期的消息。