使用 Hive Sink 将 flume 输出保存到 hive table
Save flume output to hive table with Hive Sink
我正在尝试使用 Hive 配置 flume 以将 flume 输出保存到具有 Hive Sink 类型的 hive table。我有单节点集群。我使用 mapr hadoop 发行版。
这是我的 flume.conf
agent1.sources = source1
agent1.channels = channel1
agent1.sinks = sink1
agent1.sources.source1.type = exec
agent1.sources.source1.command = cat /home/andrey/flume_test.data
agent1.sinks.sink1.type = hive
agent1.sinks.sink1.channel = channel1
agent1.sinks.sink1.hive.metastore = thrift://127.0.0.1:9083
agent1.sinks.sink1.hive.database = default
agent1.sinks.sink1.hive.table = flume_test
agent1.sinks.sink1.useLocalTimeStamp = false
agent1.sinks.sink1.round = true
agent1.sinks.sink1.roundValue = 10
agent1.sinks.sink1.roundUnit = minute
agent1.sinks.sink1.serializer = DELIMITED
agent1.sinks.sink1.serializer.delimiter = ","
agent1.sinks.sink1.serializer.serdeSeparator = ','
agent1.sinks.sink1.serializer.fieldnames = id,message
agent1.channels.channel1.type = FILE
agent1.channels.channel1.transactionCapacity = 1000000
agent1.channels.channel1.checkpointInterval 30000
agent1.channels.channel1.maxFileSize = 2146435071
agent1.channels.channel1.capacity 10000000
agent1.sources.source1.channels = channel1
我的数据flume_test.data
1,AAAAAAAA
2,BBBBBBB
3,CCCCCCCC
4,DDDDDD
5,EEEEEEE
6,FFFFFFFFFFF
7,GGGGGG
8,HHHHHHH
9,IIIIII
10,JJJJJJ
11,KKKKKK
12,LLLLLLLL
13,MMMMMMMMM
14,NNNNNNNNN
15,OOOOOOOO
16,PPPPPPPPPP
17,QQQQQQQ
18,RRRRRRR
19,SSSSSSSS
这就是我在 Hive
中创建 table 的方式
create table flume_test(id string, message string)
clustered by (message) into 1 buckets
STORED AS ORC tblproperties ("orc.compress"="NONE");
当我只使用 1 个桶时,select * 来自配置单元中的 flume_test 命令 shell returns 我只有 OK 状态,没有数据。如果我使用超过 1 个桶,它会 returns 我的错误消息。
错误,例如配置单元后有 5 个桶 table select:
hive> select * from flume_test;
OK
2015-06-18 10:04:57,6909 ERROR Client fs/client/fileclient/cc/client.cc:1385 Thread: 10141 Open failed for file /user/hive/warehouse/flume_test/delta_0004401_0004500/bucket_00, LookupFid error No such file or directory(2)
2015-06-18 10:04:57,6941 ERROR Client fs/client/fileclient/cc/client.cc:1385 Thread: 10141 Open failed for file /user/hive/warehouse/flume_test/delta_0004401_0004500/bucket_00, LookupFid error No such file or directory(2)
2015-06-18 10:04:57,6976 ERROR Client fs/client/fileclient/cc/client.cc:1385 Thread: 10141 Open failed for file /user/hive/warehouse/flume_test/delta_0004401_0004500/bucket_00, LookupFid error No such file or directory(2)
2015-06-18 10:04:57,7044 ERROR Client fs/client/fileclient/cc/client.cc:1385 Thread: 10141 Open failed for file /user/hive/warehouse/flume_test/delta_0004401_0004500/bucket_00, LookupFid error No such file or directory(2)
Time taken: 0.914 seconds
Hive table 数据保存在 /user/hive/warehouse/flume_test 目录中并且不为空。
-rwxr-xr-x 3 andrey andrey 4 2015-06-17 16:28 /user/hive/warehouse/flume_test/_orc_acid_version
drwxr-xr-x - andrey andrey 2 2015-06-17 16:28 /user/hive/warehouse/flume_test/delta_0004301_0004400
增量目录包含
-rw-r--r-- 3 andrey andrey 991 2015-06-17 16:28 /user/hive/warehouse/flume_test/delta_0004301_0004400/bucket_00000
-rwxr-xr-x 3 andrey andrey 8 2015-06-17 16:28 /user/hive/warehouse/flume_test/delta_0004301_0004400/bucket_00000_flush_length
我无法阅读 /user/hive/warehouse/flume_test/delta_0004301_0004400/bucket_00000 orc 文件,即使是猪。
我还尝试在 table 在 hive 中创建后设置此变量,但这没有给出结果。
set hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
set hive.compactor.initiator.on = true;
set hive.compactor.worker.threads = 2;
我在网上找了几个例子,但都不是很全,而且我是新手flume,看不懂)
将这两行添加到我的配置中解决了我的问题,但是从配置单元读取 table 时我仍然有错误。我可以阅读 table,它 returns 正确的结果但有错误
agent1.sinks.sink1.hive.txnsPerBatchAsk = 2
agent1.sinks.sink1.batchSize = 10
您似乎没有生成 avsc 文件。您似乎是使用 AVRO 文件创建 HIVE table,因此出现错误。
我正在尝试使用 Hive 配置 flume 以将 flume 输出保存到具有 Hive Sink 类型的 hive table。我有单节点集群。我使用 mapr hadoop 发行版。
这是我的 flume.conf
agent1.sources = source1
agent1.channels = channel1
agent1.sinks = sink1
agent1.sources.source1.type = exec
agent1.sources.source1.command = cat /home/andrey/flume_test.data
agent1.sinks.sink1.type = hive
agent1.sinks.sink1.channel = channel1
agent1.sinks.sink1.hive.metastore = thrift://127.0.0.1:9083
agent1.sinks.sink1.hive.database = default
agent1.sinks.sink1.hive.table = flume_test
agent1.sinks.sink1.useLocalTimeStamp = false
agent1.sinks.sink1.round = true
agent1.sinks.sink1.roundValue = 10
agent1.sinks.sink1.roundUnit = minute
agent1.sinks.sink1.serializer = DELIMITED
agent1.sinks.sink1.serializer.delimiter = ","
agent1.sinks.sink1.serializer.serdeSeparator = ','
agent1.sinks.sink1.serializer.fieldnames = id,message
agent1.channels.channel1.type = FILE
agent1.channels.channel1.transactionCapacity = 1000000
agent1.channels.channel1.checkpointInterval 30000
agent1.channels.channel1.maxFileSize = 2146435071
agent1.channels.channel1.capacity 10000000
agent1.sources.source1.channels = channel1
我的数据flume_test.data
1,AAAAAAAA
2,BBBBBBB
3,CCCCCCCC
4,DDDDDD
5,EEEEEEE
6,FFFFFFFFFFF
7,GGGGGG
8,HHHHHHH
9,IIIIII
10,JJJJJJ
11,KKKKKK
12,LLLLLLLL
13,MMMMMMMMM
14,NNNNNNNNN
15,OOOOOOOO
16,PPPPPPPPPP
17,QQQQQQQ
18,RRRRRRR
19,SSSSSSSS
这就是我在 Hive
中创建 table 的方式create table flume_test(id string, message string)
clustered by (message) into 1 buckets
STORED AS ORC tblproperties ("orc.compress"="NONE");
当我只使用 1 个桶时,select * 来自配置单元中的 flume_test 命令 shell returns 我只有 OK 状态,没有数据。如果我使用超过 1 个桶,它会 returns 我的错误消息。
错误,例如配置单元后有 5 个桶 table select:
hive> select * from flume_test;
OK
2015-06-18 10:04:57,6909 ERROR Client fs/client/fileclient/cc/client.cc:1385 Thread: 10141 Open failed for file /user/hive/warehouse/flume_test/delta_0004401_0004500/bucket_00, LookupFid error No such file or directory(2)
2015-06-18 10:04:57,6941 ERROR Client fs/client/fileclient/cc/client.cc:1385 Thread: 10141 Open failed for file /user/hive/warehouse/flume_test/delta_0004401_0004500/bucket_00, LookupFid error No such file or directory(2)
2015-06-18 10:04:57,6976 ERROR Client fs/client/fileclient/cc/client.cc:1385 Thread: 10141 Open failed for file /user/hive/warehouse/flume_test/delta_0004401_0004500/bucket_00, LookupFid error No such file or directory(2)
2015-06-18 10:04:57,7044 ERROR Client fs/client/fileclient/cc/client.cc:1385 Thread: 10141 Open failed for file /user/hive/warehouse/flume_test/delta_0004401_0004500/bucket_00, LookupFid error No such file or directory(2)
Time taken: 0.914 seconds
Hive table 数据保存在 /user/hive/warehouse/flume_test 目录中并且不为空。
-rwxr-xr-x 3 andrey andrey 4 2015-06-17 16:28 /user/hive/warehouse/flume_test/_orc_acid_version
drwxr-xr-x - andrey andrey 2 2015-06-17 16:28 /user/hive/warehouse/flume_test/delta_0004301_0004400
增量目录包含
-rw-r--r-- 3 andrey andrey 991 2015-06-17 16:28 /user/hive/warehouse/flume_test/delta_0004301_0004400/bucket_00000
-rwxr-xr-x 3 andrey andrey 8 2015-06-17 16:28 /user/hive/warehouse/flume_test/delta_0004301_0004400/bucket_00000_flush_length
我无法阅读 /user/hive/warehouse/flume_test/delta_0004301_0004400/bucket_00000 orc 文件,即使是猪。
我还尝试在 table 在 hive 中创建后设置此变量,但这没有给出结果。
set hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
set hive.compactor.initiator.on = true;
set hive.compactor.worker.threads = 2;
我在网上找了几个例子,但都不是很全,而且我是新手flume,看不懂)
将这两行添加到我的配置中解决了我的问题,但是从配置单元读取 table 时我仍然有错误。我可以阅读 table,它 returns 正确的结果但有错误
agent1.sinks.sink1.hive.txnsPerBatchAsk = 2
agent1.sinks.sink1.batchSize = 10
您似乎没有生成 avsc 文件。您似乎是使用 AVRO 文件创建 HIVE table,因此出现错误。