如何将事件拆分为多个事件以将它们发送到多路复用扇出流

How to split an event to multiple events to send them to multiplexed fan out flow

我们计划使用 kafka flume-ng 集成 (Flafka),其中 flume 是 kafka 队列的消费者。 Flume 代理将收到列出命令的文件及其输出,如下所示:

root@host> [Command1]

[Output1]

root@host> [Command2]

[Output2]

该文件可能包含多个命令,并且一个命令的输出可能很大。我们需要拦截事件(也就是文件数据),根据命令将事件拆分成多个事件。然后,source 将流扇出到多个通道,将每个子事件发送到一个通道(使用多路复用),每个 sink 将命令信息存储到各自的 Hive table。 是否可以使用扇出流将一个事件拆分为多个事件?或者换句话说,我们可以在拦截器中将一个事件拆分为多个事件吗?

我已经阅读了有关正则表达式提取器拦截器和序列化器的内容,但不确定它是否对这种情况有任何帮助。

如果我没看错的话,你需要把从 Kafka queue 中获取的原始事件分成几个,比方说,sub-events。你想知道 Flume 中的哪一块可以做到这一点。

我认为拦截器不适合用于该目的,因为拦截器位于源和通道之间 "placed",它们旨在添加、删除或修改关于 [=20] 的 headers =] 放入频道前的事件;同样,他们可以放弃整个事件。但是他们无法根据其他现有事件生成多个事件。

我认为您正在寻找附加到源的处理程序之类的东西,能够解释从 Kafka 获取的事件并在源输出处生成多个 Flume 事件。这个概念类似于可以附加到 HTTPSoure 的处理程序(更多详细信息 here)。如果您的源可以实现这样的事情,您很可能必须开发自己的自定义处理程序,因为您需要的功能非常具体。

感谢frb的回复。

我想将传入事件拆分为 flume 源到多个子事件,并将它们发送到各自的频道。因此,拓扑中的第一个 flume 节点会将每个子事件(使用多路复用)路由到可以处理此类信息的特定跃点。

根据您的回复,我了解到无法使用拦截器来完成。您能否分享处理程序的任何示例或文档?

是的,flume 无法将事件拆分为多个。这是我对这种方法的替代解决方案,以 Kafka 源为例。

首先实现一个扩展Kafka源的源class,替换默认的ChannelProcessor对象。

public class XXXSplitSource extends KafkaSource {

    @Override
    public synchronized ChannelProcessor getChannelProcessor()
    {
        return new XXXYourChannelProcessorProxy(super.getChannelProcessor());
    }
}

然后,在 ChannelProcessor 代理实现中,您可以使用自定义函数拆分事件。

public class XXXYourChannelProcessorProxy  extends ChannelProcessor {
    public ChannelProcessor  m_downstreamChannelProcessor = null;

    public XXXYourChannelProcessorProxy (ChannelSelector selector) {
        super(selector);
    }

    public XXXYourChannelProcessorProxy (ChannelProcessor processor) {
        super(null);
        m_downstreamChannelProcessor = processor;
    }

    @Override
    public void processEventBatch(List<Event> events) {
        List<Event> generatedEvents = YOUR_SPLIT_FUNCTION_HERE(events);
        m_downstreamChannelProcessor.processEventBatch(generatedEvents);    
    }
}