Spring 云数据流自定义 Scala 处理器无法 send/receive 来自入门应用程序的数据(SCDF 2.5.1 & Spring Boot 2.2.6)

Spring Cloud Data Flow Custom Scala Processor unable to send/receive data from Starter Apps (SCDF 2.5.1 & Spring Boot 2.2.6)

我一直致力于在 Scala 中为 Spring 云数据流创建一个简单的自定义处理器,并且 运行 遇到了 sending/receiving 数据 from/to 启动应用程序的问题.我一直无法看到通过流传播的任何消息。流的定义是 time --trigger.time-unit=SECONDS | pass-through-log | log 其中 pass-through-log 是我的自定义处理器。

我正在使用 Spring Cloud Data Flow 2.5.1 和 Spring Boot 2.2.6。

这是用于处理器的代码 - 我使用的是功能模型。

@SpringBootApplication
class PassThroughLog {

  @Bean
  def passthroughlog(): Function[String, String] = {
    input: String => {
      println(s"Received input `$input`")
      input
    }
  }
}

object PassThroughLog {
  def main(args: Array[String]): Unit = SpringApplication.run(classOf[PassThroughLog], args: _ *)
}

application.yml

spring:
  cloud:
    stream:
      function:
        bindings:
          passthroughlog-in-0: input
          passthroughlog-out-0: output

build.gradle.kts

// scala
implementation("org.scala-lang:scala-library:2.12.10")

// spring
implementation(platform("org.springframework.cloud:spring-cloud-dependencies:Hoxton.SR5"))
implementation(platform("org.springframework.cloud:spring-cloud-stream-dependencies:Horsham.SR5"))
implementation("org.springframework.boot:spring-boot-starter")
implementation("org.springframework.boot:spring-boot-starter-actuator")
implementation("org.springframework.cloud:spring-cloud-starter-function-web:3.0.7.RELEASE")
implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka:3.0.5.RELEASE")

如果此处缺少代码示例,我已将整个项目发布到 github。我也在那里发布了日志,因为它们很长。

当我 bootstrap 本地 Kafka 集群并将任意数据推送到 input 主题时,我能够看到数据流经处理器。但是,当我在 Spring Cloud Data Flow 上部署应用程序时,情况并非如此。我正在通过 Docker 在 Kubernetes 中部署应用程序。

此外,当我使用定义 time --trigger.time-unit=SECONDS | log 部署流时,我在日志接收器中看到消息。这让我确信问题出在自定义处理器上。

我是否遗漏了一些简单的东西,比如依赖项或额外配置?非常感谢任何帮助。

在 SCDF 中使用 Spring Cloud Stream 3.x 版本时,您必须设置一个额外的 属性 以让 SCDF 知道哪些通道绑定配置为输入和输出频道。

参见:Functional Applications

请特别注意以下属性:

app.time-source.spring.cloud.stream.function.bindings.timeSupplier-out-0=output

app.log-sink.spring.cloud.stream.function.bindings.logConsumer-in-0=input

在您的情况下,您必须将 passthroughlog-in-0passthroughlog-out-0 函数绑定分别映射到 inputoutput

原来问题出在我的 Dockerfile 上。为了便于配置,我有一个构建参数来指定 ENTRYPOINT 中使用的 jar 文件。为此,我使用了 ENTRYPOINT 的 shell 版本。将我的 ENTRYPOINT 更改为 exec 版本解决了我的问题。

ENTRYPOINT 的 shell 版本不能很好地处理图像参数 (docker run <image> <args>),因此 SCDF 无法将适当的参数传递给容器。

更改我的 Dockerfile 来自:

FROM openjdk:11.0.5-jdk-slim as build
ARG JAR
ENV JAR $JAR
ADD build/libs/$JAR .
ENTRYPOINT java -jar $JAR

FROM openjdk:11.0.5-jdk-slim as build
ARG JAR
ADD build/libs/$JAR program.jar
ENTRYPOINT ["java", "-jar", "program.jar"]

已解决问题。