使用 flume 读取 IBM MQ 数据
Using flume to read IBM MQ data
我想从 IBM MQ 读取数据并将其放入 HDF。
查看了 flume 的 JMS 源代码,它似乎可以连接到 IBM MQ,但我不明白所需属性列表中的“destinationType”和“destinationName”是什么意思。有人可以解释一下吗?
此外,我应该如何配置我的 flume 代理
flumeAgent1(跑在和MQ一样的机器上)读取MQ数据---- flumeAgent2(跑在Hadoop集群上)写入Hdfs
或者 Hadoop 集群上只有一个代理就足够了
有人可以帮助我了解如何将 MQ 与 flume
集成吗
参考
https://flume.apache.org/FlumeUserGuide.html
谢谢,
恰雅
关于 Flume 代理架构,它由 source 以极简形式组成,负责接收或轮询事件,并将事件转换为 Flume 事件放在 频道 中。然后,sink 获取这些事件,以便将数据保存在某处,或将数据发送到另一个代理。所有这些组件(源、通道、接收器,即代理)运行 在同一台机器上。相反,可能会分发不同的代理。
话虽如此,您的方案似乎需要一个基于 JMS source, a channel, typically Memory Channel, and a HDFS sink 的单个代理。
如文档中所述,JMS 源仅针对 ActiveMQ 进行了测试,但应该适用于任何其他队列系统。该文档还提供了一个示例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = jms
a1.sources.r1.channels = c1
a1.sources.r1.initialContextFactory = org.apache.activemq.jndi.ActiveMQInitialContextFactory
a1.sources.r1.connectionFactory = GenericConnectionFactory
a1.sources.r1.providerURL = tcp://mqserver:61616
a1.sources.r1.destinationName = BUSINESS_DATA
a1.sources.r1.destinationType = QUEUE
a1
为单代理名称。 c1
是通道的名称,它的配置仍然必须完成;并且完全缺少接收器配置。添加即可轻松完成:
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = ...
a1.sinks.k1...
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1...
r1
是 JMS 源,可以看出,destinationName
只是要求一个字符串名称。 destinationType
只能取两个值:queue
或 topic
。我认为重要的参数是providerURL
和initialContextFactory
和connectionFactory
,必须适配IBM MQ。
我想从 IBM MQ 读取数据并将其放入 HDF。
查看了 flume 的 JMS 源代码,它似乎可以连接到 IBM MQ,但我不明白所需属性列表中的“destinationType”和“destinationName”是什么意思。有人可以解释一下吗?
此外,我应该如何配置我的 flume 代理
flumeAgent1(跑在和MQ一样的机器上)读取MQ数据---- flumeAgent2(跑在Hadoop集群上)写入Hdfs 或者 Hadoop 集群上只有一个代理就足够了
有人可以帮助我了解如何将 MQ 与 flume
集成吗参考
https://flume.apache.org/FlumeUserGuide.html
谢谢, 恰雅
关于 Flume 代理架构,它由 source 以极简形式组成,负责接收或轮询事件,并将事件转换为 Flume 事件放在 频道 中。然后,sink 获取这些事件,以便将数据保存在某处,或将数据发送到另一个代理。所有这些组件(源、通道、接收器,即代理)运行 在同一台机器上。相反,可能会分发不同的代理。
话虽如此,您的方案似乎需要一个基于 JMS source, a channel, typically Memory Channel, and a HDFS sink 的单个代理。
如文档中所述,JMS 源仅针对 ActiveMQ 进行了测试,但应该适用于任何其他队列系统。该文档还提供了一个示例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = jms
a1.sources.r1.channels = c1
a1.sources.r1.initialContextFactory = org.apache.activemq.jndi.ActiveMQInitialContextFactory
a1.sources.r1.connectionFactory = GenericConnectionFactory
a1.sources.r1.providerURL = tcp://mqserver:61616
a1.sources.r1.destinationName = BUSINESS_DATA
a1.sources.r1.destinationType = QUEUE
a1
为单代理名称。 c1
是通道的名称,它的配置仍然必须完成;并且完全缺少接收器配置。添加即可轻松完成:
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = ...
a1.sinks.k1...
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1...
r1
是 JMS 源,可以看出,destinationName
只是要求一个字符串名称。 destinationType
只能取两个值:queue
或 topic
。我认为重要的参数是providerURL
和initialContextFactory
和connectionFactory
,必须适配IBM MQ。