Spring 云数据流流文件到 HDFS
Spring Cloud Data Flow Stream files to HDFS
我是 Spring 云数据流的新手。我使用 v 1.7.3 并想创建一个简单的流来扫描目录中的新文件并将它们推送到 HDFS。我有以下定义:
file --cron='* * * * * *' --mode=ref --directory=/dir | hdfs --fs-uri=hdfs://myhdpmaster:8020
当我部署我的流时,我遇到了两个问题:
无论我使用哪个文件mode
,都只创建了一个 hdfs-sink-0.txt,它根本没有任何内容或似乎打印默认 toString 的行() 输出(例如'[B@7d5bfc85').
当我将新文件放入目录时,HDFS 接收器没有收到消息,尽管我在文件源日志中看到消息已创建。
我的 hdfs 接收器的输出:
2019-01-25 12:21:06.330 INFO 63 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 0.10.1.1
2019-01-25 12:21:06.330 INFO 63 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : f10ef2720b03b247
2019-01-25 12:21:06.338 INFO 63 --- [ main] s.i.k.i.KafkaMessageDrivenChannelAdapter : started org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter@67110f71
2019-01-25 12:21:06.338 INFO 63 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {message-handler:inbound.testhdfs1.file.testhdfs1} as a subscriber to the 'bridge.testhdfs1.file' channel
2019-01-25 12:21:06.338 INFO 63 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started inbound.testhdfs1.file.testhdfs1
2019-01-25 12:21:06.340 INFO 63 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 2147483647
2019-01-25 12:21:06.476 INFO 63 --- [ main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 47888 (http)
2019-01-25 12:21:06.483 INFO 63 --- [ main] s.c.s.a.h.s.k.HdfsSinkKafka10Application : Started HdfsSinkKafka10Application in 17.593 seconds (JVM running for 18.756)
2019-01-25 12:21:08.250 INFO 63 --- [ -C-1] o.a.k.c.c.internals.AbstractCoordinator : Discovered coordinator kafka:9092 (id: 2147482646 rack: null) for group testhdfs1.
2019-01-25 12:21:08.256 INFO 63 --- [ -C-1] o.a.k.c.c.internals.ConsumerCoordinator : Revoking previously assigned partitions [] for group testhdfs1
2019-01-25 12:21:08.256 INFO 63 --- [ -C-1] o.s.c.s.b.k.KafkaMessageChannelBinder : partitions revoked:[]
2019-01-25 12:21:08.256 INFO 63 --- [ -C-1] o.a.k.c.c.internals.AbstractCoordinator : (Re-)joining group testhdfs1
2019-01-25 12:21:08.522 INFO 63 --- [ -C-1] o.a.k.c.c.internals.AbstractCoordinator : Successfully joined group testhdfs1 with generation 1
2019-01-25 12:21:08.526 INFO 63 --- [ -C-1] o.a.k.c.c.internals.ConsumerCoordinator : Setting newly assigned partitions [testhdfs1.file-0] for group testhdfs1
2019-01-25 12:21:08.735 INFO 63 --- [ -C-1] o.s.c.s.b.k.KafkaMessageChannelBinder : partitions assigned:[testhdfs1.file-0]
2019-01-25 12:21:23.238 INFO 63 --- [ -L-1] o.s.i.codec.kryo.CompositeKryoRegistrar : registering [40, java.io.File] with serializer org.springframework.integration.codec.kryo.FileSerializer
2019-01-25 12:21:23.353 INFO 63 --- [ -L-1] o.s.d.h.s.o.AbstractDataStreamWriter : Creating output for path /data/hdfs-sink-0.txt
您不能使用 hdfs
接收器将文件复制到 hdfs 中,因为它只是为了写入从源接收到的任意消息。您看到该文件长度为零的原因是该文件仍处于打开状态且未刷新。 hdfs sink readme 包含配置选项,如果您使用 idle-timeout
或 rollover
设置,您将开始看到写入的文件。
我是 Spring 云数据流的新手。我使用 v 1.7.3 并想创建一个简单的流来扫描目录中的新文件并将它们推送到 HDFS。我有以下定义:
file --cron='* * * * * *' --mode=ref --directory=/dir | hdfs --fs-uri=hdfs://myhdpmaster:8020
当我部署我的流时,我遇到了两个问题:
无论我使用哪个文件
mode
,都只创建了一个 hdfs-sink-0.txt,它根本没有任何内容或似乎打印默认 toString 的行() 输出(例如'[B@7d5bfc85').当我将新文件放入目录时,HDFS 接收器没有收到消息,尽管我在文件源日志中看到消息已创建。
我的 hdfs 接收器的输出:
2019-01-25 12:21:06.330 INFO 63 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 0.10.1.1
2019-01-25 12:21:06.330 INFO 63 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : f10ef2720b03b247
2019-01-25 12:21:06.338 INFO 63 --- [ main] s.i.k.i.KafkaMessageDrivenChannelAdapter : started org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter@67110f71
2019-01-25 12:21:06.338 INFO 63 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {message-handler:inbound.testhdfs1.file.testhdfs1} as a subscriber to the 'bridge.testhdfs1.file' channel
2019-01-25 12:21:06.338 INFO 63 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started inbound.testhdfs1.file.testhdfs1
2019-01-25 12:21:06.340 INFO 63 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 2147483647
2019-01-25 12:21:06.476 INFO 63 --- [ main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 47888 (http)
2019-01-25 12:21:06.483 INFO 63 --- [ main] s.c.s.a.h.s.k.HdfsSinkKafka10Application : Started HdfsSinkKafka10Application in 17.593 seconds (JVM running for 18.756)
2019-01-25 12:21:08.250 INFO 63 --- [ -C-1] o.a.k.c.c.internals.AbstractCoordinator : Discovered coordinator kafka:9092 (id: 2147482646 rack: null) for group testhdfs1.
2019-01-25 12:21:08.256 INFO 63 --- [ -C-1] o.a.k.c.c.internals.ConsumerCoordinator : Revoking previously assigned partitions [] for group testhdfs1
2019-01-25 12:21:08.256 INFO 63 --- [ -C-1] o.s.c.s.b.k.KafkaMessageChannelBinder : partitions revoked:[]
2019-01-25 12:21:08.256 INFO 63 --- [ -C-1] o.a.k.c.c.internals.AbstractCoordinator : (Re-)joining group testhdfs1
2019-01-25 12:21:08.522 INFO 63 --- [ -C-1] o.a.k.c.c.internals.AbstractCoordinator : Successfully joined group testhdfs1 with generation 1
2019-01-25 12:21:08.526 INFO 63 --- [ -C-1] o.a.k.c.c.internals.ConsumerCoordinator : Setting newly assigned partitions [testhdfs1.file-0] for group testhdfs1
2019-01-25 12:21:08.735 INFO 63 --- [ -C-1] o.s.c.s.b.k.KafkaMessageChannelBinder : partitions assigned:[testhdfs1.file-0]
2019-01-25 12:21:23.238 INFO 63 --- [ -L-1] o.s.i.codec.kryo.CompositeKryoRegistrar : registering [40, java.io.File] with serializer org.springframework.integration.codec.kryo.FileSerializer
2019-01-25 12:21:23.353 INFO 63 --- [ -L-1] o.s.d.h.s.o.AbstractDataStreamWriter : Creating output for path /data/hdfs-sink-0.txt
您不能使用 hdfs
接收器将文件复制到 hdfs 中,因为它只是为了写入从源接收到的任意消息。您看到该文件长度为零的原因是该文件仍处于打开状态且未刷新。 hdfs sink readme 包含配置选项,如果您使用 idle-timeout
或 rollover
设置,您将开始看到写入的文件。