正在 Apache Flume 中获取 JMS Header
Fetching JMS Header in Apache Flume
我正在尝试使用 Apache Flume 使用 JMS 消息 (IBM Websphere MQ) 并将数据存储到 HDFS。阅读邮件时,我只能看到邮件的 body 而看不到邮件的 header 内容。
是否可以使用 Apache Flume 使用 header 属性 读取 jms 消息?
我的配置:
# Source definition
u.sources.s1.type=jms
u.sources.s1.initialContextFactory=ABC
u.sources.s1.connectionFactory=<my connection factory>
u.sources.s1.providerURL=ABC
u.sources.s1.destinationName=r1
u.sources.s1.destinationType=QUEUE
# Channel definition
u.channels.c1.type=file
u.channels.c1.capacity=10000000
u.channels.c1.checkpointDir=/checkpointdir
u.channels.c1.transactionCapacity=10000
u.channels.c1.dataDirs=/datadir
# Sink definition
u.sinks.r1.type=hdfs
u.sinks.r1.channel=c1
u.sinks.r1.hdfs.path=/message/%Y%m%d
u.sinks.r1.hdfs.filePrefix=event_
u.sinks.r1.hdfs.fileSuffix=.xml
u.sinks.r1.hdfs.fileType = DataStream
u.sinks.r1.hdfs.writeFormat=Text
u.sinks.r1.hdfs.useLocalTimeStamp=TRUE
有很多类型的 JMS 消息,如 "Table 30–2 JMS Message Types" here。
Flume DefaultJMSMessageConverter
与here 一样使用TextMessage
并提供如下供您参考:
...
else if(message instanceof TextMessage) {
TextMessage textMessage = (TextMessage)message;
event.setBody(textMessage.getText().getBytes(charset));
}
...
TextMessage 仅提供 body 条消息。
恕我直言,你有两个选择:
- 如果可能,请在 body 本身中发送 message-header、header-value 对,并按原样使用 "DefaultJMSMessageConverter"。
- 通过编写自定义 JMSMessageConverter 和 type-cast 构建您自己的 "flume-jms-source.jar" "message" 到 javax.jms.Message,获取 JMS headers,将它们设置在
SimpleEvent
.
希望这能提供一些方向。
我正在尝试使用 Apache Flume 使用 JMS 消息 (IBM Websphere MQ) 并将数据存储到 HDFS。阅读邮件时,我只能看到邮件的 body 而看不到邮件的 header 内容。
是否可以使用 Apache Flume 使用 header 属性 读取 jms 消息?
我的配置:
# Source definition
u.sources.s1.type=jms
u.sources.s1.initialContextFactory=ABC
u.sources.s1.connectionFactory=<my connection factory>
u.sources.s1.providerURL=ABC
u.sources.s1.destinationName=r1
u.sources.s1.destinationType=QUEUE
# Channel definition
u.channels.c1.type=file
u.channels.c1.capacity=10000000
u.channels.c1.checkpointDir=/checkpointdir
u.channels.c1.transactionCapacity=10000
u.channels.c1.dataDirs=/datadir
# Sink definition
u.sinks.r1.type=hdfs
u.sinks.r1.channel=c1
u.sinks.r1.hdfs.path=/message/%Y%m%d
u.sinks.r1.hdfs.filePrefix=event_
u.sinks.r1.hdfs.fileSuffix=.xml
u.sinks.r1.hdfs.fileType = DataStream
u.sinks.r1.hdfs.writeFormat=Text
u.sinks.r1.hdfs.useLocalTimeStamp=TRUE
有很多类型的 JMS 消息,如 "Table 30–2 JMS Message Types" here。
Flume DefaultJMSMessageConverter
与here 一样使用TextMessage
并提供如下供您参考:
...
else if(message instanceof TextMessage) {
TextMessage textMessage = (TextMessage)message;
event.setBody(textMessage.getText().getBytes(charset));
}
...
TextMessage 仅提供 body 条消息。
恕我直言,你有两个选择:
- 如果可能,请在 body 本身中发送 message-header、header-value 对,并按原样使用 "DefaultJMSMessageConverter"。
- 通过编写自定义 JMSMessageConverter 和 type-cast 构建您自己的 "flume-jms-source.jar" "message" 到 javax.jms.Message,获取 JMS headers,将它们设置在
SimpleEvent
.
希望这能提供一些方向。