如何正确使用 Flume 在 HDFS 中插入 JSON
How to insert JSON in HDFS using Flume correctly
我在 Flume
中使用 HTTPSource
接收 POST 事件 json
格式如下:
{"username":"xyz","password":"123"}
我的问题是:我是否必须修改事件源(我的意思是将 JSON
发送到 Flume 的源)以便 JSON,有以下格式:
[{
"headers" : {
"timestamp" : "434324343",
"host" : "random_host.example.com"
},
"body" : "{"username":"xyz","password":"123"}"
}]
这是最好的方法吗?或者我可以在其他地方修改它?
我的 conf
文件 flume agent
是:
## Componentes
SomeAgent.sources = SomeHTTP
SomeAgent.channels = MemChannel
SomeAgent.sinks = SomeHDFS
## Fuente e Interceptores
SomeAgent.sources.SomeHTTP.type = http
SomeAgent.sources.SomeHTTP.port = 5140
SomeAgent.sources.SomeHTTP.handler = org.apache.flume.source.http.JSONHandler
SomeAgent.sources.SomeHTTP.channels = MemChannel
SomeAgent.sources.SomeHTTP.interceptors = i1 i2
## Interceptores
SomeAgent.sources.SomeHTTP.interceptors.i1.type = timestamp
SomeAgent.sources.SomeHTTP.interceptors.i2.type = host
SomeAgent.sources.SomeHTTP.interceptors.i2.hostHeader = hostname
## Canal
SomeAgent.channels.MemChannel.type = memory
SomeAgent.channels.MemChannel.capacity = 10000
SomeAgent.channels.MemChannel.transactionCapacity = 1000
## Sumidero
SomeAgent.sinks.SomeHDFS.type = hdfs
SomeAgent.sinks.SomeHDFS.channel = MemChannel
SomeAgent.sinks.SomeHDFS.hdfs.path = /raw/logs/%Y-%m-%d
SomeAgent.sinks.SomeHDFS.hdfs.fileType = DataStream
SomeAgent.sinks.SomeHDFS.hdfs.filePrefix = SomeLogs-
SomeAgent.sinks.SomeHDFS.hdfs.writeFormat = Text
SomeAgent.sinks.SomeHDFS.hdfs.batchSize = 100
SomeAgent.sinks.SomeHDFS.hdfs.rollSize = 0
SomeAgent.sinks.SomeHDFS.hdfs.rollCount = 10000
SomeAgent.sinks.SomeHDFS.hdfs.rollInterval = 600
SomeAgent.sinks.SomeHDFS.hdfs.useLocalTimeStamp = true
运行hadoop fs
cat
$ hadoop fs -ls -R /raw/logs/somes
drwxr-xr-x - flume-agent supergroup 0 2015-06-16 12:43 /raw/logs/arquimedes/2015-06-16
-rw-r--r-- 3 flume-agent supergroup 3814 2015-06-16 12:33 /raw/logs/arquimedes/2015-06-16/SomeLogs.1434471803369
-rw-r--r-- 3 flume-agent supergroup 3719 2015-06-16 12:43 /raw/logs/arquimedes/2015-06-16/SomeLogs.1434472404774
$ hadoop fs -cat /raw/logs/somes/2015-06-16/SomeLogs.1434471803369 | head
$
(你没看错,空行)
如果我现在查看文件(例如使用 HUE
的二进制视图):
0000000: 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a ................
0000010: 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a ................
0000020: 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a ................
如果我理解得很好,您想要序列化数据和 headers。在这种情况下,您不必修改数据源,而是使用一些标准 Flume 元素并为 HDFS 创建自定义序列化程序。
第一步是实现Flume创建所需的JSON结构,即headers+body。 Flume 可以为您完成,只需在您的 HTTPSource 中使用 JSONHandler,这样:
a1.sources = r1
a1.sources.r1.hnadler = org.apache.flume.source.http.JSONHandler
事实上,没有必要配置 JSON 处理程序,因为它是 HTTPSource 的默认处理程序。
然后,同时使用 Timestamp Interceptor and Host Interceptor 以添加所需的 headers。唯一的技巧是 Flume 代理必须 运行 在与发件人进程相同的机器上,以便拦截的主机与发件人相同:
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i2.type = host
a1.sources.r1.interceptors.i2.hostHeader = hostname
此时,您将拥有想要的事件。尽管如此,HDFS 的标准序列化程序只保存 body,而不保存 headers。因此创建一个实现 org.apache.flume.serialization.EventSerializer
的自定义序列化程序。它配置为:
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.serializer = my_custom_serializer
HTH
@frb 发布的答案是正确的,唯一缺少的一点是 JSON 生成器 必须 发送 body
部分(我必须 admit/complain docs 在那一点上不清楚),所以 正确的 发布 json
的方式是
[body:"{'username':'xyz','password':'123'}"]
请注意 json
数据现在是一个字符串。
通过此更改,json
现在可以在 hdfs
中看到。
使用默认 JSONHandler 的 Flume HTTPSource 需要 JSON 表示 [{ headers: ..., body: ... }]
中的 fully-formed Flume 事件列表提交给端点;要创建一个可以接受像 {"username":"xyz", "password":"123"}
这样的裸 application-level 结构的代理端点,您可以使用实现 HTTPSourceHandler; see the JSONHandler 源的替代 class 覆盖处理程序 - 没有太多可做的它。
public List<Event> getEvents(HttpServletRequest request) throws ...
在自定义 JSONHandler 中,您还可以根据 HTTP 请求向事件添加 headers,例如源 IP、User-Agent 等(拦截器不会有上下文)。此时您可能想要验证 application-supplied JSON(尽管默认处理程序没有)。
虽然如您所见,您可以仅传递 [{body: ...}]
部分,但如果您想 防止 生成器注入,这样的自定义处理程序也可能很有用headers 参加活动。
我在 Flume
中使用 HTTPSource
接收 POST 事件 json
格式如下:
{"username":"xyz","password":"123"}
我的问题是:我是否必须修改事件源(我的意思是将 JSON
发送到 Flume 的源)以便 JSON,有以下格式:
[{
"headers" : {
"timestamp" : "434324343",
"host" : "random_host.example.com"
},
"body" : "{"username":"xyz","password":"123"}"
}]
这是最好的方法吗?或者我可以在其他地方修改它?
我的 conf
文件 flume agent
是:
## Componentes
SomeAgent.sources = SomeHTTP
SomeAgent.channels = MemChannel
SomeAgent.sinks = SomeHDFS
## Fuente e Interceptores
SomeAgent.sources.SomeHTTP.type = http
SomeAgent.sources.SomeHTTP.port = 5140
SomeAgent.sources.SomeHTTP.handler = org.apache.flume.source.http.JSONHandler
SomeAgent.sources.SomeHTTP.channels = MemChannel
SomeAgent.sources.SomeHTTP.interceptors = i1 i2
## Interceptores
SomeAgent.sources.SomeHTTP.interceptors.i1.type = timestamp
SomeAgent.sources.SomeHTTP.interceptors.i2.type = host
SomeAgent.sources.SomeHTTP.interceptors.i2.hostHeader = hostname
## Canal
SomeAgent.channels.MemChannel.type = memory
SomeAgent.channels.MemChannel.capacity = 10000
SomeAgent.channels.MemChannel.transactionCapacity = 1000
## Sumidero
SomeAgent.sinks.SomeHDFS.type = hdfs
SomeAgent.sinks.SomeHDFS.channel = MemChannel
SomeAgent.sinks.SomeHDFS.hdfs.path = /raw/logs/%Y-%m-%d
SomeAgent.sinks.SomeHDFS.hdfs.fileType = DataStream
SomeAgent.sinks.SomeHDFS.hdfs.filePrefix = SomeLogs-
SomeAgent.sinks.SomeHDFS.hdfs.writeFormat = Text
SomeAgent.sinks.SomeHDFS.hdfs.batchSize = 100
SomeAgent.sinks.SomeHDFS.hdfs.rollSize = 0
SomeAgent.sinks.SomeHDFS.hdfs.rollCount = 10000
SomeAgent.sinks.SomeHDFS.hdfs.rollInterval = 600
SomeAgent.sinks.SomeHDFS.hdfs.useLocalTimeStamp = true
运行hadoop fs
cat
$ hadoop fs -ls -R /raw/logs/somes
drwxr-xr-x - flume-agent supergroup 0 2015-06-16 12:43 /raw/logs/arquimedes/2015-06-16
-rw-r--r-- 3 flume-agent supergroup 3814 2015-06-16 12:33 /raw/logs/arquimedes/2015-06-16/SomeLogs.1434471803369
-rw-r--r-- 3 flume-agent supergroup 3719 2015-06-16 12:43 /raw/logs/arquimedes/2015-06-16/SomeLogs.1434472404774
$ hadoop fs -cat /raw/logs/somes/2015-06-16/SomeLogs.1434471803369 | head
$
(你没看错,空行)
如果我现在查看文件(例如使用 HUE
的二进制视图):
0000000: 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a ................
0000010: 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a ................
0000020: 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a ................
如果我理解得很好,您想要序列化数据和 headers。在这种情况下,您不必修改数据源,而是使用一些标准 Flume 元素并为 HDFS 创建自定义序列化程序。
第一步是实现Flume创建所需的JSON结构,即headers+body。 Flume 可以为您完成,只需在您的 HTTPSource 中使用 JSONHandler,这样:
a1.sources = r1
a1.sources.r1.hnadler = org.apache.flume.source.http.JSONHandler
事实上,没有必要配置 JSON 处理程序,因为它是 HTTPSource 的默认处理程序。
然后,同时使用 Timestamp Interceptor and Host Interceptor 以添加所需的 headers。唯一的技巧是 Flume 代理必须 运行 在与发件人进程相同的机器上,以便拦截的主机与发件人相同:
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i2.type = host
a1.sources.r1.interceptors.i2.hostHeader = hostname
此时,您将拥有想要的事件。尽管如此,HDFS 的标准序列化程序只保存 body,而不保存 headers。因此创建一个实现 org.apache.flume.serialization.EventSerializer
的自定义序列化程序。它配置为:
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.serializer = my_custom_serializer
HTH
@frb 发布的答案是正确的,唯一缺少的一点是 JSON 生成器 必须 发送 body
部分(我必须 admit/complain docs 在那一点上不清楚),所以 正确的 发布 json
的方式是
[body:"{'username':'xyz','password':'123'}"]
请注意 json
数据现在是一个字符串。
通过此更改,json
现在可以在 hdfs
中看到。
使用默认 JSONHandler 的 Flume HTTPSource 需要 JSON 表示 [{ headers: ..., body: ... }]
中的 fully-formed Flume 事件列表提交给端点;要创建一个可以接受像 {"username":"xyz", "password":"123"}
这样的裸 application-level 结构的代理端点,您可以使用实现 HTTPSourceHandler; see the JSONHandler 源的替代 class 覆盖处理程序 - 没有太多可做的它。
public List<Event> getEvents(HttpServletRequest request) throws ...
在自定义 JSONHandler 中,您还可以根据 HTTP 请求向事件添加 headers,例如源 IP、User-Agent 等(拦截器不会有上下文)。此时您可能想要验证 application-supplied JSON(尽管默认处理程序没有)。
虽然如您所见,您可以仅传递 [{body: ...}]
部分,但如果您想 防止 生成器注入,这样的自定义处理程序也可能很有用headers 参加活动。