回调驱动 Spring Cloud Dataflow 源应用程序

Callback-driven Spring Cloud Dataflow source application

我想创建一个基于库的 Spring Cloud Dataflow 源应用程序,该应用程序连接到消息服务(实际上是 IRC)并在消息到达时调用我的回调。源应用程序的唯一目标是根据收到的 IRC 消息创建 SCDF 消息并将其发送到流。

我想出了以下解决方案:

注解为@ComponentIrcListenerclass做了一些配置,当start()方法被调用时开始监听IRC消息。当收到一条消息时,它的 onGenericMessage 回调只是通过注入的 source 属性:

将消息发送到流
@Component
public class IrcListener extends ListenerAdapter {

    @Override
    public void onGenericMessage(GenericMessageEvent event) {
            Message msg = new Message();
            msg.content = event.getMessage();

            source.output().send(MessageBuilder.withPayload(msg).build());
    }

    private Source source;
    private String _name;
    private String _server;
    private List<String> _channels;

    public void start() throws Exception {
            Configuration configuration = new Configuration.Builder()
                            .setName(_name)
                            .addServer(_server)
                            .addAutoJoinChannels(_channels)
                            .addListener(this)
                            .buildConfiguration();

            PircBotX bot = new PircBotX(configuration);
            bot.startBot();
    }

    @Autowired
    public IrcListener(Source source) {
            this.source = source;

            _name = "ircsource";
            _server = "irc.rizon.net";
            _channels = Arrays.asList("#test".split(","));
    }
}

主 class 运行 Spring 应用程序并在 IrcListener 组件上调用上述 start() 方法。

@EnableBinding(Source.class)
@SpringBootApplication
public class IrcStreamApplication {
    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(IrcStreamApplication.class, args);
        context.getBean(IrcListener.class).start();
    }
}

这工作正常,消息已成功接收并发布到流中,但我想知道这是否是采用 Spring(云数据流)宇宙的正确方法,或者也许我遗漏了一些重要的东西?

看起来不错;但是,一般来说,消息驱动源会扩展 MessageProducerSupport 并调用 sendMessage(Message<?>).

(在这种情况下覆盖 doStart())。

它会让您访问消息历史记录跟踪和错误处理(如果发送失败)。