使用 Gradle 和 Kotlin 创建 Spring 云数据流处理器
Create Spring Cloud Data Flow Processor using Gradle and Kotlin
我正在尝试使用 Kotlin Lambda 基于 Spring Cloud Stream 3.1 构建 Kotlin 处理器。
我正在使用 Spring Cloud Data Flow 将此处理器部署到流中,但它被标记为失败。
如果我 cat 错误日志,我有以下错误:IllegalArgumentException: Type must be one of Supplier, Function or Consumer
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'functionBindingRegistrar' defined in class path resource [org/springframework/cloud/stream/function/FunctionConfiguration.class]: Invocation of init method failed; nested exception is java.lang.IllegalArgumentException: Type must be one of Supplier, Function or Consumer
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1788) ~[spring-beans-5.3.2.jar!/:5.3.2]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:609) ~[spring-beans-5.3.2.jar!/:5.3.2]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:531) ~[spring-beans-5.3.2.jar!/:5.3.2]
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean[=12=](AbstractBeanFactory.java:335) ~[spring-beans-5.3.2.jar!/:5.3.2]
这是我的代码:
@SpringBootApplication
class TransformerUppercaseApplication{
@Bean
fun transform(): (String) -> String {
return { "This is my uppercase".plus(it.toUpperCase()) }
}
}
fun main(args: Array<String>) {
runApplication<TransformerUppercaseApplication>(*args)
}
我试过在 application.yml 中指定函数名称,但没有成功:
spring:
cloud:
stream:
function:
definition: transform
这是我的 build.gradle.kts :
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
plugins {
`maven-publish`
id("org.springframework.boot") version "2.4.1"
id("io.spring.dependency-management") version "1.0.10.RELEASE"
kotlin("jvm") version "1.4.21"
kotlin("plugin.spring") version "1.4.21"
}
group = "com.company"
version = "0.0.11"
java.sourceCompatibility = JavaVersion.VERSION_11
repositories {
mavenCentral()
maven { url = uri("https://repo.spring.io/milestone") }
}
extra["springCloudVersion"] = "2020.0.0"
dependencies {
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka")
implementation("org.springframework.cloud:spring-cloud-function-kotlin")
testImplementation("org.springframework.boot:spring-boot-starter-test")
}
dependencyManagement {
imports {
mavenBom("org.springframework.cloud:spring-cloud-dependencies:${property("springCloudVersion")}")
}
}
tasks.withType<KotlinCompile> {
kotlinOptions {
freeCompilerArgs = listOf("-Xjsr305=strict")
jvmTarget = "11"
}
}
tasks.withType<Test> {
useJUnitPlatform()
}
publishing {
publications {
create<MavenPublication>("mavenJava") {
from(components["java"])
}
}
}
configurations {
val elements = listOf(apiElements, runtimeElements)
elements.forEach { element ->
element.get().outgoing.artifacts.removeIf { it -> it.buildDependencies.getDependencies(null).contains(tasks.jar.get())}
element.get().outgoing.artifact(tasks.bootJar.get())
}
}
Edit :我可以通过使用 java 函数 class 而不是 kotlin lambda 来避免错误:
@Bean
fun transform(): Function<String, String> = Function {
it.toUpperCase()
}
但是现在出现了一个新问题,处理器无法绑定到kafka主题,它显示为匿名并且处理器kepp状态正在部署。我在处理器日志中得到了这个(我也在 java 示例中尝试过):
2021-01-19 08:13:27.674 INFO 107 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.6.0
2021-01-19 08:13:27.674 INFO 107 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 62abe01bee039651
2021-01-19 08:13:27.674 INFO 107 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1611044007674
2021-01-19 08:13:27.676 INFO 107 --- [ main] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Subscribed to topic(s): transform-in-0
2021-01-19 08:13:27.677 INFO 107 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService
2021-01-19 08:13:27.702 INFO 107 --- [ main] s.i.k.i.KafkaMessageDrivenChannelAdapter : started org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter@24fabd0f
2021-01-19 08:13:27.715 INFO 107 --- [ main] c.e.t.TransformerUppercaseApplicationKt : Started TransformerUppercaseApplicationKt in 7.538 seconds (JVM running for 8.374)
2021-01-19 08:13:27.729 INFO 107 --- [container-0-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Cluster ID: gN63F8OzRwaEeKElBfkmjw
2021-01-19 08:13:28.827 INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Discovered group coordinator kafka-broker:9092 (id: 2147482646 rack: null)
2021-01-19 08:13:28.833 INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] (Re-)joining group
2021-01-19 08:13:28.866 INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group.
2021-01-19 08:13:28.866 INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] (Re-)joining group
2021-01-19 08:13:31.890 INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Finished assignment for group at generation 1: {consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2-61cd1b1a-6485-4bc8-83e8-003bc22db7e1=Assignment(partitions=[transform-in-0-0])}
2021-01-19 08:13:31.935 INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Successfully joined group with generation 1
2021-01-19 08:13:31.936 INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Notifying assignor about the new Assignment(partitions=[transform-in-0-0])
2021-01-19 08:13:31.939 INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Adding newly assigned partitions: transform-in-0-0
2021-01-19 08:13:31.950 INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Found no committed offset for partition transform-in-0-0
2021-01-19 08:13:31.963 INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Found no committed offset for partition transform-in-0-0
2021-01-19 08:13:31.977 INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Resetting offset for partition transform-in-0-0 to offset 0.
2021-01-19 08:13:32.007 INFO 107 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder : anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3: partitions assigned: [transform-in-0-0]
请将您的示例与我们在微型网站中的 Kotlin example 进行比较。
甚至可以按原样重复示例并使其在您的环境中工作以适应它。事后您应该能够以相同的方式使用您的应用程序。
我成功了!
Kotlin lambda 检测问题是 2020.0.0 版本的问题。它适用于 Hoxton.SR9
通过在 application.properies
中添加这两行解决了匿名消费者问题(在编辑中)
spring.cloud.stream.function.bindings.transform-in-0=input
spring.cloud.stream.function.bindings.transform-out-0=output
我正在尝试使用 Kotlin Lambda 基于 Spring Cloud Stream 3.1 构建 Kotlin 处理器。 我正在使用 Spring Cloud Data Flow 将此处理器部署到流中,但它被标记为失败。
如果我 cat 错误日志,我有以下错误:IllegalArgumentException: Type must be one of Supplier, Function or Consumer
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'functionBindingRegistrar' defined in class path resource [org/springframework/cloud/stream/function/FunctionConfiguration.class]: Invocation of init method failed; nested exception is java.lang.IllegalArgumentException: Type must be one of Supplier, Function or Consumer
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1788) ~[spring-beans-5.3.2.jar!/:5.3.2]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:609) ~[spring-beans-5.3.2.jar!/:5.3.2]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:531) ~[spring-beans-5.3.2.jar!/:5.3.2]
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean[=12=](AbstractBeanFactory.java:335) ~[spring-beans-5.3.2.jar!/:5.3.2]
这是我的代码:
@SpringBootApplication
class TransformerUppercaseApplication{
@Bean
fun transform(): (String) -> String {
return { "This is my uppercase".plus(it.toUpperCase()) }
}
}
fun main(args: Array<String>) {
runApplication<TransformerUppercaseApplication>(*args)
}
我试过在 application.yml 中指定函数名称,但没有成功:
spring:
cloud:
stream:
function:
definition: transform
这是我的 build.gradle.kts :
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
plugins {
`maven-publish`
id("org.springframework.boot") version "2.4.1"
id("io.spring.dependency-management") version "1.0.10.RELEASE"
kotlin("jvm") version "1.4.21"
kotlin("plugin.spring") version "1.4.21"
}
group = "com.company"
version = "0.0.11"
java.sourceCompatibility = JavaVersion.VERSION_11
repositories {
mavenCentral()
maven { url = uri("https://repo.spring.io/milestone") }
}
extra["springCloudVersion"] = "2020.0.0"
dependencies {
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka")
implementation("org.springframework.cloud:spring-cloud-function-kotlin")
testImplementation("org.springframework.boot:spring-boot-starter-test")
}
dependencyManagement {
imports {
mavenBom("org.springframework.cloud:spring-cloud-dependencies:${property("springCloudVersion")}")
}
}
tasks.withType<KotlinCompile> {
kotlinOptions {
freeCompilerArgs = listOf("-Xjsr305=strict")
jvmTarget = "11"
}
}
tasks.withType<Test> {
useJUnitPlatform()
}
publishing {
publications {
create<MavenPublication>("mavenJava") {
from(components["java"])
}
}
}
configurations {
val elements = listOf(apiElements, runtimeElements)
elements.forEach { element ->
element.get().outgoing.artifacts.removeIf { it -> it.buildDependencies.getDependencies(null).contains(tasks.jar.get())}
element.get().outgoing.artifact(tasks.bootJar.get())
}
}
Edit :我可以通过使用 java 函数 class 而不是 kotlin lambda 来避免错误:
@Bean
fun transform(): Function<String, String> = Function {
it.toUpperCase()
}
但是现在出现了一个新问题,处理器无法绑定到kafka主题,它显示为匿名并且处理器kepp状态正在部署。我在处理器日志中得到了这个(我也在 java 示例中尝试过):
2021-01-19 08:13:27.674 INFO 107 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.6.0
2021-01-19 08:13:27.674 INFO 107 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 62abe01bee039651
2021-01-19 08:13:27.674 INFO 107 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1611044007674
2021-01-19 08:13:27.676 INFO 107 --- [ main] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Subscribed to topic(s): transform-in-0
2021-01-19 08:13:27.677 INFO 107 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService
2021-01-19 08:13:27.702 INFO 107 --- [ main] s.i.k.i.KafkaMessageDrivenChannelAdapter : started org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter@24fabd0f
2021-01-19 08:13:27.715 INFO 107 --- [ main] c.e.t.TransformerUppercaseApplicationKt : Started TransformerUppercaseApplicationKt in 7.538 seconds (JVM running for 8.374)
2021-01-19 08:13:27.729 INFO 107 --- [container-0-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Cluster ID: gN63F8OzRwaEeKElBfkmjw
2021-01-19 08:13:28.827 INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Discovered group coordinator kafka-broker:9092 (id: 2147482646 rack: null)
2021-01-19 08:13:28.833 INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] (Re-)joining group
2021-01-19 08:13:28.866 INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group.
2021-01-19 08:13:28.866 INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] (Re-)joining group
2021-01-19 08:13:31.890 INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Finished assignment for group at generation 1: {consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2-61cd1b1a-6485-4bc8-83e8-003bc22db7e1=Assignment(partitions=[transform-in-0-0])}
2021-01-19 08:13:31.935 INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Successfully joined group with generation 1
2021-01-19 08:13:31.936 INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Notifying assignor about the new Assignment(partitions=[transform-in-0-0])
2021-01-19 08:13:31.939 INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Adding newly assigned partitions: transform-in-0-0
2021-01-19 08:13:31.950 INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Found no committed offset for partition transform-in-0-0
2021-01-19 08:13:31.963 INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Found no committed offset for partition transform-in-0-0
2021-01-19 08:13:31.977 INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Resetting offset for partition transform-in-0-0 to offset 0.
2021-01-19 08:13:32.007 INFO 107 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder : anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3: partitions assigned: [transform-in-0-0]
请将您的示例与我们在微型网站中的 Kotlin example 进行比较。
甚至可以按原样重复示例并使其在您的环境中工作以适应它。事后您应该能够以相同的方式使用您的应用程序。
我成功了! Kotlin lambda 检测问题是 2020.0.0 版本的问题。它适用于 Hoxton.SR9
通过在 application.properies
中添加这两行解决了匿名消费者问题(在编辑中)spring.cloud.stream.function.bindings.transform-in-0=input
spring.cloud.stream.function.bindings.transform-out-0=output