正在根据消息源(即 RabbitMQ、Kafka)中的事件执行 spring 云任务

Executing spring cloud task based on event into messaging source (i.e. RabbitMQ, Kafka)

我是学习 Spring 云任务和 SCDF 的新手所以问这个。

我想根据一个事件执行我的 SCT(比如说一条消息被发布到 Rabbit MQ),所以我认为它可以通过两种方式完成:

  1. 创建一个从 RabbitMQ 轮询消息并将数据发送到流的源,现在创建一个从流中读取数据的接收器,一旦数据到达接收器(来自源流),任务将是推出。

    create steam producer --definition "rabbitproducer | streamconsumer (This is @TaskEnabled)"
    

    不确定这是否可行?

  2. 其他方法可能是使用任务启动器。这里的任务启动器将配置一个流,一个监听器将轮询来自 rabbitMQ 的消息。因此,当收到一条消息时,触发器将启动该过程,而 tasklauncher 将启动该任务。但是这里不确定我将如何将消息数据放入我的任务中?我必须将数据添加到 TaskLaunchRequest 中吗?

    create stream mystream --definition "rabbitmsgtrigger --uri:my task | joblauncher"
    

已支持通过上游事件启动任务,但实现它的方法很少 - 请查看 reference guide 和示例)了解更多详细信息。

这里是关于我的问题答案的完整解释。 Sabby 在这里帮了我很多,解决了我的问题。

问题:我无法使用 tasklauncher/task-sink 触发我的任务。在日志中我也没有得到正确的细节,我什至不知道如何正确设置日志级别。

解决方案:在 Sabby 和 SCT 站点上提供的文档的帮助下,我可以解决此问题并继续推进我的 POC 工作。下面是我做的详细步骤。

  1. 通过参考 属性 文件并将日志级别更改设置为

    ,使用 postgresql 数据库启动我的 SCDF
    --logging.level.org.springframework.cloud=DEBUG
    --spring.config.location=file://scdf.properties
    
  2. 从 bitly 导入应用程序。

    app import --uri [stream applications link][1]
    
  3. 已注册任务接收器应用程序

    app register --name task-sink --type sink --uri file://tasksink-1.1.0.BUILD-SNAPSHOT.jar
    
  4. 创建流为:

    stream create mytasklaunchertest --definition "triggertask --triggertask.uri=https://my-archiva/myproject-scdf-task1/0.0.1-SNAPSHOT/myproject-scdf-task1-0.0.1-20160916.143611-1.jar --trigger.fixed-delay=5 | task-sink"
    
  5. 已部署流:

    stream deploy foo --properties "app.triggertask.spring.rabbitmq.host=host,app.triggertask.spring.rabbitmq.username=user,app.triggertask.spring.rabbitmq.password=pass,app.triggertask.spring.rabbitmq.port=5672,app.triggertask.spring.rabbitmq.virtual-host=xxx"