回调驱动 Spring Cloud Dataflow 源应用程序
Callback-driven Spring Cloud Dataflow source application
我想创建一个基于库的 Spring Cloud Dataflow 源应用程序,该应用程序连接到消息服务(实际上是 IRC)并在消息到达时调用我的回调。源应用程序的唯一目标是根据收到的 IRC 消息创建 SCDF 消息并将其发送到流。
我想出了以下解决方案:
注解为@Component
的IrcListener
class做了一些配置,当start()
方法被调用时开始监听IRC消息。当收到一条消息时,它的 onGenericMessage
回调只是通过注入的 source
属性:
将消息发送到流
@Component
public class IrcListener extends ListenerAdapter {
@Override
public void onGenericMessage(GenericMessageEvent event) {
Message msg = new Message();
msg.content = event.getMessage();
source.output().send(MessageBuilder.withPayload(msg).build());
}
private Source source;
private String _name;
private String _server;
private List<String> _channels;
public void start() throws Exception {
Configuration configuration = new Configuration.Builder()
.setName(_name)
.addServer(_server)
.addAutoJoinChannels(_channels)
.addListener(this)
.buildConfiguration();
PircBotX bot = new PircBotX(configuration);
bot.startBot();
}
@Autowired
public IrcListener(Source source) {
this.source = source;
_name = "ircsource";
_server = "irc.rizon.net";
_channels = Arrays.asList("#test".split(","));
}
}
主 class 运行 Spring 应用程序并在 IrcListener
组件上调用上述 start()
方法。
@EnableBinding(Source.class)
@SpringBootApplication
public class IrcStreamApplication {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(IrcStreamApplication.class, args);
context.getBean(IrcListener.class).start();
}
}
这工作正常,消息已成功接收并发布到流中,但我想知道这是否是采用 Spring(云数据流)宇宙的正确方法,或者也许我遗漏了一些重要的东西?
看起来不错;但是,一般来说,消息驱动源会扩展 MessageProducerSupport
并调用 sendMessage(Message<?>)
.
(在这种情况下覆盖 doStart()
)。
它会让您访问消息历史记录跟踪和错误处理(如果发送失败)。
我想创建一个基于库的 Spring Cloud Dataflow 源应用程序,该应用程序连接到消息服务(实际上是 IRC)并在消息到达时调用我的回调。源应用程序的唯一目标是根据收到的 IRC 消息创建 SCDF 消息并将其发送到流。
我想出了以下解决方案:
注解为@Component
的IrcListener
class做了一些配置,当start()
方法被调用时开始监听IRC消息。当收到一条消息时,它的 onGenericMessage
回调只是通过注入的 source
属性:
@Component
public class IrcListener extends ListenerAdapter {
@Override
public void onGenericMessage(GenericMessageEvent event) {
Message msg = new Message();
msg.content = event.getMessage();
source.output().send(MessageBuilder.withPayload(msg).build());
}
private Source source;
private String _name;
private String _server;
private List<String> _channels;
public void start() throws Exception {
Configuration configuration = new Configuration.Builder()
.setName(_name)
.addServer(_server)
.addAutoJoinChannels(_channels)
.addListener(this)
.buildConfiguration();
PircBotX bot = new PircBotX(configuration);
bot.startBot();
}
@Autowired
public IrcListener(Source source) {
this.source = source;
_name = "ircsource";
_server = "irc.rizon.net";
_channels = Arrays.asList("#test".split(","));
}
}
主 class 运行 Spring 应用程序并在 IrcListener
组件上调用上述 start()
方法。
@EnableBinding(Source.class)
@SpringBootApplication
public class IrcStreamApplication {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(IrcStreamApplication.class, args);
context.getBean(IrcListener.class).start();
}
}
这工作正常,消息已成功接收并发布到流中,但我想知道这是否是采用 Spring(云数据流)宇宙的正确方法,或者也许我遗漏了一些重要的东西?
看起来不错;但是,一般来说,消息驱动源会扩展 MessageProducerSupport
并调用 sendMessage(Message<?>)
.
(在这种情况下覆盖 doStart()
)。
它会让您访问消息历史记录跟踪和错误处理(如果发送失败)。