使用 Gradle 和 Kotlin 创建 Spring 云数据流处理器

Create Spring Cloud Data Flow Processor using Gradle and Kotlin

我正在尝试使用 Kotlin Lambda 基于 Spring Cloud Stream 3.1 构建 Kotlin 处理器。 我正在使用 Spring Cloud Data Flow 将此处理器部署到流中,但它被标记为失败。

如果我 cat 错误日志,我有以下错误:IllegalArgumentException: Type must be one of Supplier, Function or Consumer

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'functionBindingRegistrar' defined in class path resource [org/springframework/cloud/stream/function/FunctionConfiguration.class]: Invocation of init method failed; nested exception is java.lang.IllegalArgumentException: Type must be one of Supplier, Function or Consumer
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1788) ~[spring-beans-5.3.2.jar!/:5.3.2]
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:609) ~[spring-beans-5.3.2.jar!/:5.3.2]
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:531) ~[spring-beans-5.3.2.jar!/:5.3.2]
        at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean[=12=](AbstractBeanFactory.java:335) ~[spring-beans-5.3.2.jar!/:5.3.2]

这是我的代码:

@SpringBootApplication
class TransformerUppercaseApplication{

    @Bean
     fun transform(): (String) -> String {
        return { "This is my uppercase".plus(it.toUpperCase()) }
    }

}

fun main(args: Array<String>) {
    runApplication<TransformerUppercaseApplication>(*args)
}

我试过在 application.yml 中指定函数名称,但没有成功:

spring:
  cloud:
    stream:
      function:
        definition: transform

这是我的 build.gradle.kts :

import org.jetbrains.kotlin.gradle.tasks.KotlinCompile

plugins {
    `maven-publish`
    id("org.springframework.boot") version "2.4.1"
    id("io.spring.dependency-management") version "1.0.10.RELEASE"
    kotlin("jvm") version "1.4.21"
    kotlin("plugin.spring") version "1.4.21"
}

group = "com.company"
version = "0.0.11"
java.sourceCompatibility = JavaVersion.VERSION_11

repositories {
    mavenCentral()
    maven { url = uri("https://repo.spring.io/milestone") }
}

extra["springCloudVersion"] = "2020.0.0"
dependencies {
    implementation("org.jetbrains.kotlin:kotlin-reflect")
    implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
    implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka")
    implementation("org.springframework.cloud:spring-cloud-function-kotlin")
    testImplementation("org.springframework.boot:spring-boot-starter-test")
}

dependencyManagement {
    imports {
        mavenBom("org.springframework.cloud:spring-cloud-dependencies:${property("springCloudVersion")}")
    }
}

tasks.withType<KotlinCompile> {
    kotlinOptions {
        freeCompilerArgs = listOf("-Xjsr305=strict")
        jvmTarget = "11"
    }
}

tasks.withType<Test> {
    useJUnitPlatform()
}
publishing {
    publications {
        create<MavenPublication>("mavenJava") {
            from(components["java"])
        }
    }
}

configurations {
    val elements = listOf(apiElements, runtimeElements)
    elements.forEach { element ->
        element.get().outgoing.artifacts.removeIf { it -> it.buildDependencies.getDependencies(null).contains(tasks.jar.get())}
        element.get().outgoing.artifact(tasks.bootJar.get())
    }
}

Edit :我可以通过使用 java 函数 class 而不是 kotlin lambda 来避免错误:

@Bean
fun transform(): Function<String, String> =  Function {
    it.toUpperCase()
}

但是现在出现了一个新问题,处理器无法绑定到kafka主题,它显示为匿名并且处理器kepp状态正在部署。我在处理器日志中得到了这个(我也在 java 示例中尝试过):


2021-01-19 08:13:27.674  INFO 107 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.6.0
2021-01-19 08:13:27.674  INFO 107 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 62abe01bee039651
2021-01-19 08:13:27.674  INFO 107 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1611044007674
2021-01-19 08:13:27.676  INFO 107 --- [           main] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Subscribed to topic(s): transform-in-0
2021-01-19 08:13:27.677  INFO 107 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService
2021-01-19 08:13:27.702  INFO 107 --- [           main] s.i.k.i.KafkaMessageDrivenChannelAdapter : started org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter@24fabd0f
2021-01-19 08:13:27.715  INFO 107 --- [           main] c.e.t.TransformerUppercaseApplicationKt  : Started TransformerUppercaseApplicationKt in 7.538 seconds (JVM running for 8.374)
2021-01-19 08:13:27.729  INFO 107 --- [container-0-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Cluster ID: gN63F8OzRwaEeKElBfkmjw
2021-01-19 08:13:28.827  INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Discovered group coordinator kafka-broker:9092 (id: 2147482646 rack: null)
2021-01-19 08:13:28.833  INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] (Re-)joining group
2021-01-19 08:13:28.866  INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group.
2021-01-19 08:13:28.866  INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] (Re-)joining group
2021-01-19 08:13:31.890  INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Finished assignment for group at generation 1: {consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2-61cd1b1a-6485-4bc8-83e8-003bc22db7e1=Assignment(partitions=[transform-in-0-0])}
2021-01-19 08:13:31.935  INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Successfully joined group with generation 1
2021-01-19 08:13:31.936  INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Notifying assignor about the new Assignment(partitions=[transform-in-0-0])
2021-01-19 08:13:31.939  INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Adding newly assigned partitions: transform-in-0-0
2021-01-19 08:13:31.950  INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Found no committed offset for partition transform-in-0-0
2021-01-19 08:13:31.963  INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Found no committed offset for partition transform-in-0-0
2021-01-19 08:13:31.977  INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Resetting offset for partition transform-in-0-0 to offset 0.
2021-01-19 08:13:32.007  INFO 107 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder  : anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3: partitions assigned: [transform-in-0-0]

请将您的示例与我们在微型网站中的 Kotlin example 进行比较。

甚至可以按原样重复示例并使其在您的环境中工作以适应它。事后您应该能够以相同的方式使用您的应用程序。

我成功了! Kotlin lambda 检测问题是 2020.0.0 版本的问题。它适用于 Hoxton.SR9

通过在 application.properies

中添加这两行解决了匿名消费者问题(在编辑中)
spring.cloud.stream.function.bindings.transform-in-0=input
spring.cloud.stream.function.bindings.transform-out-0=output