Spring 云数据流自定义 Scala 处理器无法 send/receive 来自入门应用程序的数据(SCDF 2.5.1 & Spring Boot 2.2.6)
Spring Cloud Data Flow Custom Scala Processor unable to send/receive data from Starter Apps (SCDF 2.5.1 & Spring Boot 2.2.6)
我一直致力于在 Scala 中为 Spring 云数据流创建一个简单的自定义处理器,并且 运行 遇到了 sending/receiving 数据 from/to 启动应用程序的问题.我一直无法看到通过流传播的任何消息。流的定义是 time --trigger.time-unit=SECONDS | pass-through-log | log
其中 pass-through-log
是我的自定义处理器。
我正在使用 Spring Cloud Data Flow 2.5.1 和 Spring Boot 2.2.6。
这是用于处理器的代码 - 我使用的是功能模型。
@SpringBootApplication
class PassThroughLog {
@Bean
def passthroughlog(): Function[String, String] = {
input: String => {
println(s"Received input `$input`")
input
}
}
}
object PassThroughLog {
def main(args: Array[String]): Unit = SpringApplication.run(classOf[PassThroughLog], args: _ *)
}
application.yml
spring:
cloud:
stream:
function:
bindings:
passthroughlog-in-0: input
passthroughlog-out-0: output
build.gradle.kts
// scala
implementation("org.scala-lang:scala-library:2.12.10")
// spring
implementation(platform("org.springframework.cloud:spring-cloud-dependencies:Hoxton.SR5"))
implementation(platform("org.springframework.cloud:spring-cloud-stream-dependencies:Horsham.SR5"))
implementation("org.springframework.boot:spring-boot-starter")
implementation("org.springframework.boot:spring-boot-starter-actuator")
implementation("org.springframework.cloud:spring-cloud-starter-function-web:3.0.7.RELEASE")
implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka:3.0.5.RELEASE")
如果此处缺少代码示例,我已将整个项目发布到 github。我也在那里发布了日志,因为它们很长。
当我 bootstrap 本地 Kafka 集群并将任意数据推送到 input
主题时,我能够看到数据流经处理器。但是,当我在 Spring Cloud Data Flow 上部署应用程序时,情况并非如此。我正在通过 Docker 在 Kubernetes 中部署应用程序。
此外,当我使用定义 time --trigger.time-unit=SECONDS | log
部署流时,我在日志接收器中看到消息。这让我确信问题出在自定义处理器上。
我是否遗漏了一些简单的东西,比如依赖项或额外配置?非常感谢任何帮助。
在 SCDF 中使用 Spring Cloud Stream 3.x 版本时,您必须设置一个额外的 属性 以让 SCDF 知道哪些通道绑定配置为输入和输出频道。
请特别注意以下属性:
app.time-source.spring.cloud.stream.function.bindings.timeSupplier-out-0=output
app.log-sink.spring.cloud.stream.function.bindings.logConsumer-in-0=input
在您的情况下,您必须将 passthroughlog-in-0
和 passthroughlog-out-0
函数绑定分别映射到 input
和 output
。
原来问题出在我的 Dockerfile 上。为了便于配置,我有一个构建参数来指定 ENTRYPOINT
中使用的 jar 文件。为此,我使用了 ENTRYPOINT
的 shell 版本。将我的 ENTRYPOINT
更改为 exec 版本解决了我的问题。
ENTRYPOINT
的 shell 版本不能很好地处理图像参数 (docker run <image> <args>
),因此 SCDF 无法将适当的参数传递给容器。
更改我的 Dockerfile 来自:
FROM openjdk:11.0.5-jdk-slim as build
ARG JAR
ENV JAR $JAR
ADD build/libs/$JAR .
ENTRYPOINT java -jar $JAR
至
FROM openjdk:11.0.5-jdk-slim as build
ARG JAR
ADD build/libs/$JAR program.jar
ENTRYPOINT ["java", "-jar", "program.jar"]
已解决问题。
我一直致力于在 Scala 中为 Spring 云数据流创建一个简单的自定义处理器,并且 运行 遇到了 sending/receiving 数据 from/to 启动应用程序的问题.我一直无法看到通过流传播的任何消息。流的定义是 time --trigger.time-unit=SECONDS | pass-through-log | log
其中 pass-through-log
是我的自定义处理器。
我正在使用 Spring Cloud Data Flow 2.5.1 和 Spring Boot 2.2.6。
这是用于处理器的代码 - 我使用的是功能模型。
@SpringBootApplication
class PassThroughLog {
@Bean
def passthroughlog(): Function[String, String] = {
input: String => {
println(s"Received input `$input`")
input
}
}
}
object PassThroughLog {
def main(args: Array[String]): Unit = SpringApplication.run(classOf[PassThroughLog], args: _ *)
}
application.yml
spring:
cloud:
stream:
function:
bindings:
passthroughlog-in-0: input
passthroughlog-out-0: output
build.gradle.kts
// scala
implementation("org.scala-lang:scala-library:2.12.10")
// spring
implementation(platform("org.springframework.cloud:spring-cloud-dependencies:Hoxton.SR5"))
implementation(platform("org.springframework.cloud:spring-cloud-stream-dependencies:Horsham.SR5"))
implementation("org.springframework.boot:spring-boot-starter")
implementation("org.springframework.boot:spring-boot-starter-actuator")
implementation("org.springframework.cloud:spring-cloud-starter-function-web:3.0.7.RELEASE")
implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka:3.0.5.RELEASE")
如果此处缺少代码示例,我已将整个项目发布到 github。我也在那里发布了日志,因为它们很长。
当我 bootstrap 本地 Kafka 集群并将任意数据推送到 input
主题时,我能够看到数据流经处理器。但是,当我在 Spring Cloud Data Flow 上部署应用程序时,情况并非如此。我正在通过 Docker 在 Kubernetes 中部署应用程序。
此外,当我使用定义 time --trigger.time-unit=SECONDS | log
部署流时,我在日志接收器中看到消息。这让我确信问题出在自定义处理器上。
我是否遗漏了一些简单的东西,比如依赖项或额外配置?非常感谢任何帮助。
在 SCDF 中使用 Spring Cloud Stream 3.x 版本时,您必须设置一个额外的 属性 以让 SCDF 知道哪些通道绑定配置为输入和输出频道。
请特别注意以下属性:
app.time-source.spring.cloud.stream.function.bindings.timeSupplier-out-0=output
app.log-sink.spring.cloud.stream.function.bindings.logConsumer-in-0=input
在您的情况下,您必须将 passthroughlog-in-0
和 passthroughlog-out-0
函数绑定分别映射到 input
和 output
。
原来问题出在我的 Dockerfile 上。为了便于配置,我有一个构建参数来指定 ENTRYPOINT
中使用的 jar 文件。为此,我使用了 ENTRYPOINT
的 shell 版本。将我的 ENTRYPOINT
更改为 exec 版本解决了我的问题。
ENTRYPOINT
的 shell 版本不能很好地处理图像参数 (docker run <image> <args>
),因此 SCDF 无法将适当的参数传递给容器。
更改我的 Dockerfile 来自:
FROM openjdk:11.0.5-jdk-slim as build
ARG JAR
ENV JAR $JAR
ADD build/libs/$JAR .
ENTRYPOINT java -jar $JAR
至
FROM openjdk:11.0.5-jdk-slim as build
ARG JAR
ADD build/libs/$JAR program.jar
ENTRYPOINT ["java", "-jar", "program.jar"]
已解决问题。