Flume 1.6.0 假脱机目录源,时间戳为 header
Flume 1.6.0 spooling directory source with timestamp on header
我正在尝试创建一个新的 flume 代理,例如 source spooldir
并将它们放在 HDFS 中。这是我的配置文件:
agent.sources = file
agent.channels = channel
agent.sinks = hdfsSink
# SOURCES CONFIGURATION
agent.sources.file.type = spooldir
agent.sources.file.channels = channel
agent.sources.file.spoolDir = /path/to/json_files
# SINKS CONFIGURATION
agent.sinks.hdfsSink.type = hdfs
agent.sinks.hdfsSink.hdfs.path = /HADOOP/PATH/%Y/%m/%d/%H/
agent.sinks.hdfsSink.hdfs.filePrefix = common
agent.sinks.hdfsSink.hdfs.fileSuffix = .json
agent.sinks.hdfsSink.hdfs.rollInterval = 300
agent.sinks.hdfsSink.hdfs.rollSize = 5242880
agent.sinks.hdfsSink.hdfs.rollCount = 0
agent.sinks.hdfsSink.hdfs.maxOpenFiles = 2
agent.sinks.hdfsSink.hdfs.fileType = DataStream
agent.sinks.hdfsSink.hdfs.callTimeout = 100000
agent.sinks.hdfsSink.hdfs.batchSize = 1000
agent.sinks.hdfsSink.channel = channel
# CHANNELS CONFIGURATION
agent.channels.channel.type = memory
agent.channels.channel.capacity = 10000
agent.channels.channel.transactionCapacity = 1000
我收到描述 Expected timestamp in the Flume event headers, but it was null
的错误。我正在阅读的文件包含 JSON 结构,其中有一个名为 timestamp
.
的字段
有没有办法在 header 中添加这个时间戳?
根据我之前的评论,现在我将分享我为 spooling header enable json file
遵循和执行的整个步骤,使用 flume
将其放入 hadoop hdfs
集群,创建外部文件在 json
文件上执行 DML query
-
已创建flume-spool.conf
//Flume Configuration Starts
erum.sources =source-1
erum.channels =file-channel-1
erum.sinks =hdfs-sink-1
erum.sources.source-1.channels =file-channel-1
erum.sinks.hdfs-sink-1.channel =file-channel-1
//Define a file channel called fileChannel on erum
erum.channels.file-channel-1.type =file
erum.channels.file-channel-1.capacity =2000000
erum.channels.file-channel-1.transactionCapacity =100000
//Define a source for erum
erum.sources.source-1.type =spooldir
erum.sources.source-1.bind =localhost
erum.sources.source-1.port =44444
erum.sources.source-1.inputCharset =UTF-8
erum.sources.source-1.bufferMaxLineLength =100
//Spooldir in my case is /home/arif/practice/flume_sink
erum.sources.source-1.spoolDir =/home/arif/practice/flume_sink/
erum.sources.source-1.fileHeader =true
erum.sources.source-1.fileHeaderKey=file
erum.sources.source-1.fileSuffix =.COMPLETED
//Sink is flume_import under hdfs
erum.sinks.hdfs-sink-1.pathManager =DEFAULT
erum.sinks.hdfs-sink-1.type =hdfs
erum.sinks.hdfs-sink-1.hdfs.filePrefix =common
erum.sinks.hdfs-sink-1.hdfs.fileSuffix =.json
erum.sinks.hdfs-sink-1.hdfs.writeFormat =Text
erum.sinks.hdfs-sink-1.hdfs.fileType =DataStream
erum.sinks.hdfs-sink-1.hdfs.path =hdfs://localhost:9000/user/arif/flume_sink/products/
erum.sinks.hdfs-sink-1.hdfs.batchSize =1000
erum.sinks.hdfs-sink-1.hdfs.rollSize =2684354560
erum.sinks.hdfs-sink-1.hdfs.rollInterval =5
erum.sinks.hdfs-sink-1.hdfs.rollCount =5000
现在我们 运行 使用代理的 flume-spool - erum
bin/flume-ng agent -n erum -c conf -f conf/flume-spool.conf -Dflume.root.logger=DEBUG,console
复制products.json
文件到erum.sources.source-1.spoolDir
flume
配置的指定目录
products.json
文件里面的内容原样如下-
{"productid":"5968dd23fc13ae04d9000001","product_name":"sildenafilcitrate","mfgdate":"20160719031109","supplier":"WisozkInc","quantity":261,"unit_cost":".47"}
{"productid":"5968dd23fc13ae04d9000002","product_name":"MountainJuniperusashei","mfgdate":"20161003021009","supplier":"Keebler-Hilpert","quantity":292,"unit_cost":".74"}
{"productid":"5968dd23fc13ae04d9000003","product_name":"DextromathorphanHBr","mfgdate":"20161101041113","supplier":"Schmitt-Weissnat","quantity":211,"unit_cost":".53"}
{"productid":"5968dd23fc13ae04d9000004","product_name":"MeophanHBr","mfgdate":"20161101061113","supplier":"Schmitt-Weissnat","quantity":198,"unit_cost":".73"}
从下方下载 hive-serdes-sources-1.0.6.jar url-
https://www.dropbox.com/s/lsjgk2zaqz8uli9/hive-serdes-sources-1.0.6.jar?dl=0
使用flume-spool将json文件假脱机到hdfs集群后,我们将启动配置单元服务器,登录到配置单元shell,然后执行以下-
hive> add jar /home/arif/applications/hadoop/apache-hive-2.1.1-bin/lib/hive-serdes-sources-1.0.6.jar;
hive> create external table products (productid string, product_name string, mfgdate string, supplier string, quantity int, unit_cost string)
> row format serde 'com.cloudera.hive.serde.JSONSerDe' location '/user/arif/flume_sink/products/';
OK
Time taken: 0.211 seconds
hive> select * from products;
OK
5968dd23fc13ae04d9000001 sildenafilcitrate 20160719031109 WisozkInc 261 .47
5968dd23fc13ae04d9000002 MountainJuniperusashei 20161003021009 Keebler-Hilpert 292 .74
5968dd23fc13ae04d9000003 DextromathorphanHBr 20161101041113 Schmitt-Weissnat 211 .53
5968dd23fc13ae04d9000004 MeophanHBr 20161101061113 Schmitt-Weissnat 198 .73
Time taken: 0.291 seconds, Fetched: 4 row(s)
我已经完成了所有这些步骤,没有出现任何错误,希望这对您有所帮助,谢谢。
如本 post 中所述:
http://shzhangji.com/blog/2017/08/05/how-to-extract-event-time-in-apache-flume/
所需的更改是为其包含一个拦截器和序列化程序:
# SOURCES CONFIGURATION
agent.sources.file.type = spooldir
agent.sources.file.channels = channel
agent.sources.file.spoolDir = /path/to/json_files
agent.sources.file.interceptors = i1
agent.sources.file.interceptors.i1.type = regex_extractor
agent.sources.file.interceptors.i1.regex = <regex_for_timestamp>
agent.sources.file.interceptors.i1.serializers = s1
agent.sources.file.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
agent.sources.file.interceptors.i1.serializers.s1.name = timestamp
agent.sources.file.interceptors.i1.serializers.s1.pattern = <pattern_that_matches_your_regex>
感谢您指出,除了 link 之外,我还需要包含一个适当的片段 :)
我正在尝试创建一个新的 flume 代理,例如 source spooldir
并将它们放在 HDFS 中。这是我的配置文件:
agent.sources = file
agent.channels = channel
agent.sinks = hdfsSink
# SOURCES CONFIGURATION
agent.sources.file.type = spooldir
agent.sources.file.channels = channel
agent.sources.file.spoolDir = /path/to/json_files
# SINKS CONFIGURATION
agent.sinks.hdfsSink.type = hdfs
agent.sinks.hdfsSink.hdfs.path = /HADOOP/PATH/%Y/%m/%d/%H/
agent.sinks.hdfsSink.hdfs.filePrefix = common
agent.sinks.hdfsSink.hdfs.fileSuffix = .json
agent.sinks.hdfsSink.hdfs.rollInterval = 300
agent.sinks.hdfsSink.hdfs.rollSize = 5242880
agent.sinks.hdfsSink.hdfs.rollCount = 0
agent.sinks.hdfsSink.hdfs.maxOpenFiles = 2
agent.sinks.hdfsSink.hdfs.fileType = DataStream
agent.sinks.hdfsSink.hdfs.callTimeout = 100000
agent.sinks.hdfsSink.hdfs.batchSize = 1000
agent.sinks.hdfsSink.channel = channel
# CHANNELS CONFIGURATION
agent.channels.channel.type = memory
agent.channels.channel.capacity = 10000
agent.channels.channel.transactionCapacity = 1000
我收到描述 Expected timestamp in the Flume event headers, but it was null
的错误。我正在阅读的文件包含 JSON 结构,其中有一个名为 timestamp
.
有没有办法在 header 中添加这个时间戳?
根据我之前的评论,现在我将分享我为 spooling header enable json file
遵循和执行的整个步骤,使用 flume
将其放入 hadoop hdfs
集群,创建外部文件在 json
文件上执行 DML query
-
已创建flume-spool.conf
//Flume Configuration Starts
erum.sources =source-1
erum.channels =file-channel-1
erum.sinks =hdfs-sink-1
erum.sources.source-1.channels =file-channel-1
erum.sinks.hdfs-sink-1.channel =file-channel-1
//Define a file channel called fileChannel on erum
erum.channels.file-channel-1.type =file
erum.channels.file-channel-1.capacity =2000000
erum.channels.file-channel-1.transactionCapacity =100000
//Define a source for erum
erum.sources.source-1.type =spooldir
erum.sources.source-1.bind =localhost
erum.sources.source-1.port =44444
erum.sources.source-1.inputCharset =UTF-8
erum.sources.source-1.bufferMaxLineLength =100
//Spooldir in my case is /home/arif/practice/flume_sink
erum.sources.source-1.spoolDir =/home/arif/practice/flume_sink/
erum.sources.source-1.fileHeader =true
erum.sources.source-1.fileHeaderKey=file
erum.sources.source-1.fileSuffix =.COMPLETED
//Sink is flume_import under hdfs
erum.sinks.hdfs-sink-1.pathManager =DEFAULT
erum.sinks.hdfs-sink-1.type =hdfs
erum.sinks.hdfs-sink-1.hdfs.filePrefix =common
erum.sinks.hdfs-sink-1.hdfs.fileSuffix =.json
erum.sinks.hdfs-sink-1.hdfs.writeFormat =Text
erum.sinks.hdfs-sink-1.hdfs.fileType =DataStream
erum.sinks.hdfs-sink-1.hdfs.path =hdfs://localhost:9000/user/arif/flume_sink/products/
erum.sinks.hdfs-sink-1.hdfs.batchSize =1000
erum.sinks.hdfs-sink-1.hdfs.rollSize =2684354560
erum.sinks.hdfs-sink-1.hdfs.rollInterval =5
erum.sinks.hdfs-sink-1.hdfs.rollCount =5000
现在我们 运行 使用代理的 flume-spool - erum
bin/flume-ng agent -n erum -c conf -f conf/flume-spool.conf -Dflume.root.logger=DEBUG,console
复制products.json
文件到erum.sources.source-1.spoolDir
flume
配置的指定目录
products.json
文件里面的内容原样如下-
{"productid":"5968dd23fc13ae04d9000001","product_name":"sildenafilcitrate","mfgdate":"20160719031109","supplier":"WisozkInc","quantity":261,"unit_cost":".47"}
{"productid":"5968dd23fc13ae04d9000002","product_name":"MountainJuniperusashei","mfgdate":"20161003021009","supplier":"Keebler-Hilpert","quantity":292,"unit_cost":".74"}
{"productid":"5968dd23fc13ae04d9000003","product_name":"DextromathorphanHBr","mfgdate":"20161101041113","supplier":"Schmitt-Weissnat","quantity":211,"unit_cost":".53"}
{"productid":"5968dd23fc13ae04d9000004","product_name":"MeophanHBr","mfgdate":"20161101061113","supplier":"Schmitt-Weissnat","quantity":198,"unit_cost":".73"}
从下方下载 hive-serdes-sources-1.0.6.jar url-
https://www.dropbox.com/s/lsjgk2zaqz8uli9/hive-serdes-sources-1.0.6.jar?dl=0
使用flume-spool将json文件假脱机到hdfs集群后,我们将启动配置单元服务器,登录到配置单元shell,然后执行以下-
hive> add jar /home/arif/applications/hadoop/apache-hive-2.1.1-bin/lib/hive-serdes-sources-1.0.6.jar;
hive> create external table products (productid string, product_name string, mfgdate string, supplier string, quantity int, unit_cost string)
> row format serde 'com.cloudera.hive.serde.JSONSerDe' location '/user/arif/flume_sink/products/';
OK
Time taken: 0.211 seconds
hive> select * from products;
OK
5968dd23fc13ae04d9000001 sildenafilcitrate 20160719031109 WisozkInc 261 .47
5968dd23fc13ae04d9000002 MountainJuniperusashei 20161003021009 Keebler-Hilpert 292 .74
5968dd23fc13ae04d9000003 DextromathorphanHBr 20161101041113 Schmitt-Weissnat 211 .53
5968dd23fc13ae04d9000004 MeophanHBr 20161101061113 Schmitt-Weissnat 198 .73
Time taken: 0.291 seconds, Fetched: 4 row(s)
我已经完成了所有这些步骤,没有出现任何错误,希望这对您有所帮助,谢谢。
如本 post 中所述: http://shzhangji.com/blog/2017/08/05/how-to-extract-event-time-in-apache-flume/
所需的更改是为其包含一个拦截器和序列化程序:
# SOURCES CONFIGURATION
agent.sources.file.type = spooldir
agent.sources.file.channels = channel
agent.sources.file.spoolDir = /path/to/json_files
agent.sources.file.interceptors = i1
agent.sources.file.interceptors.i1.type = regex_extractor
agent.sources.file.interceptors.i1.regex = <regex_for_timestamp>
agent.sources.file.interceptors.i1.serializers = s1
agent.sources.file.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
agent.sources.file.interceptors.i1.serializers.s1.name = timestamp
agent.sources.file.interceptors.i1.serializers.s1.pattern = <pattern_that_matches_your_regex>
感谢您指出,除了 link 之外,我还需要包含一个适当的片段 :)