Flume 1.6.0 假脱机目录源,时间戳为 header

Flume 1.6.0 spooling directory source with timestamp on header

我正在尝试创建一个新的 flume 代理,例如 source spooldir 并将它们放在 HDFS 中。这是我的配置文件:

agent.sources = file
agent.channels = channel
agent.sinks = hdfsSink

# SOURCES CONFIGURATION
agent.sources.file.type = spooldir
agent.sources.file.channels = channel
agent.sources.file.spoolDir = /path/to/json_files

# SINKS CONFIGURATION
agent.sinks.hdfsSink.type = hdfs
agent.sinks.hdfsSink.hdfs.path = /HADOOP/PATH/%Y/%m/%d/%H/

agent.sinks.hdfsSink.hdfs.filePrefix = common
agent.sinks.hdfsSink.hdfs.fileSuffix = .json
agent.sinks.hdfsSink.hdfs.rollInterval = 300
agent.sinks.hdfsSink.hdfs.rollSize = 5242880
agent.sinks.hdfsSink.hdfs.rollCount = 0
agent.sinks.hdfsSink.hdfs.maxOpenFiles = 2
agent.sinks.hdfsSink.hdfs.fileType = DataStream
agent.sinks.hdfsSink.hdfs.callTimeout = 100000
agent.sinks.hdfsSink.hdfs.batchSize = 1000
agent.sinks.hdfsSink.channel = channel

# CHANNELS CONFIGURATION
agent.channels.channel.type = memory
agent.channels.channel.capacity = 10000
agent.channels.channel.transactionCapacity = 1000

我收到描述 Expected timestamp in the Flume event headers, but it was null 的错误。我正在阅读的文件包含 JSON 结构,其中有一个名为 timestamp.

的字段

有没有办法在 header 中添加这个时间戳?

根据我之前的评论,现在我将分享我为 spooling header enable json file 遵循和执行的整个步骤,使用 flume 将其放入 hadoop hdfs 集群,创建外部文件在 json 文件上执行 DML query -

已创建flume-spool.conf

//Flume Configuration Starts
erum.sources =source-1
erum.channels =file-channel-1
erum.sinks =hdfs-sink-1

erum.sources.source-1.channels =file-channel-1
erum.sinks.hdfs-sink-1.channel =file-channel-1

//Define a file channel called fileChannel on erum
erum.channels.file-channel-1.type =file 

erum.channels.file-channel-1.capacity =2000000
erum.channels.file-channel-1.transactionCapacity =100000

//Define a source for erum
erum.sources.source-1.type =spooldir
erum.sources.source-1.bind =localhost
erum.sources.source-1.port =44444
erum.sources.source-1.inputCharset =UTF-8
erum.sources.source-1.bufferMaxLineLength =100

//Spooldir in my case is /home/arif/practice/flume_sink
erum.sources.source-1.spoolDir =/home/arif/practice/flume_sink/
erum.sources.source-1.fileHeader =true
erum.sources.source-1.fileHeaderKey=file
erum.sources.source-1.fileSuffix =.COMPLETED

//Sink is flume_import under hdfs
erum.sinks.hdfs-sink-1.pathManager =DEFAULT
erum.sinks.hdfs-sink-1.type =hdfs

erum.sinks.hdfs-sink-1.hdfs.filePrefix =common
erum.sinks.hdfs-sink-1.hdfs.fileSuffix =.json
erum.sinks.hdfs-sink-1.hdfs.writeFormat =Text
erum.sinks.hdfs-sink-1.hdfs.fileType =DataStream
erum.sinks.hdfs-sink-1.hdfs.path =hdfs://localhost:9000/user/arif/flume_sink/products/

erum.sinks.hdfs-sink-1.hdfs.batchSize =1000
erum.sinks.hdfs-sink-1.hdfs.rollSize =2684354560
erum.sinks.hdfs-sink-1.hdfs.rollInterval =5
erum.sinks.hdfs-sink-1.hdfs.rollCount =5000

现在我们 运行 使用代理的 flume-spool - erum

bin/flume-ng agent -n erum -c conf -f conf/flume-spool.conf -Dflume.root.logger=DEBUG,console

复制products.json文件到erum.sources.source-1.spoolDirflume配置的指定目录

products.json文件里面的内容原样如下-

{"productid":"5968dd23fc13ae04d9000001","product_name":"sildenafilcitrate","mfgdate":"20160719031109","supplier":"WisozkInc","quantity":261,"unit_cost":".47"}
{"productid":"5968dd23fc13ae04d9000002","product_name":"MountainJuniperusashei","mfgdate":"20161003021009","supplier":"Keebler-Hilpert","quantity":292,"unit_cost":".74"}
{"productid":"5968dd23fc13ae04d9000003","product_name":"DextromathorphanHBr","mfgdate":"20161101041113","supplier":"Schmitt-Weissnat","quantity":211,"unit_cost":".53"}
{"productid":"5968dd23fc13ae04d9000004","product_name":"MeophanHBr","mfgdate":"20161101061113","supplier":"Schmitt-Weissnat","quantity":198,"unit_cost":".73"}

从下方下载 hive-serdes-sources-1.0.6.jar url-

https://www.dropbox.com/s/lsjgk2zaqz8uli9/hive-serdes-sources-1.0.6.jar?dl=0

使用flume-spool将json文件假脱机到hdfs集群后,我们将启动配置单元服务器,登录到配置单元shell,然后执行以下-

hive> add jar /home/arif/applications/hadoop/apache-hive-2.1.1-bin/lib/hive-serdes-sources-1.0.6.jar;
hive> create external table products (productid string, product_name string, mfgdate string, supplier string, quantity int, unit_cost string) 
    > row format serde 'com.cloudera.hive.serde.JSONSerDe' location '/user/arif/flume_sink/products/';
OK
Time taken: 0.211 seconds
hive> select * from products;
OK
5968dd23fc13ae04d9000001    sildenafilcitrate   20160719031109  WisozkInc   261 .47
5968dd23fc13ae04d9000002    MountainJuniperusashei  20161003021009  Keebler-Hilpert 292 .74
5968dd23fc13ae04d9000003    DextromathorphanHBr 20161101041113  Schmitt-Weissnat    211 .53
5968dd23fc13ae04d9000004    MeophanHBr  20161101061113  Schmitt-Weissnat    198 .73
Time taken: 0.291 seconds, Fetched: 4 row(s)

我已经完成了所有这些步骤,没有出现任何错误,希望这对您有所帮助,谢谢。

如本 post 中所述: http://shzhangji.com/blog/2017/08/05/how-to-extract-event-time-in-apache-flume/

所需的更改是为其包含一个拦截器和序列化程序:

# SOURCES CONFIGURATION
agent.sources.file.type = spooldir
agent.sources.file.channels = channel
agent.sources.file.spoolDir = /path/to/json_files
agent.sources.file.interceptors = i1
agent.sources.file.interceptors.i1.type = regex_extractor
agent.sources.file.interceptors.i1.regex = <regex_for_timestamp>
agent.sources.file.interceptors.i1.serializers = s1
agent.sources.file.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
agent.sources.file.interceptors.i1.serializers.s1.name = timestamp
agent.sources.file.interceptors.i1.serializers.s1.pattern = <pattern_that_matches_your_regex>

感谢您指出,除了 link 之外,我还需要包含一个适当的片段 :)