如何在 Kotlin 中创建回调函数?

How to create a callback function in Kotlin?

我在以下情况下创建回调函数时遇到问题: 一个kafka消费者监听新的消息能够记录到数据库中:

suspend fun consumerClient(service: ClientService) {
    val messages = consumerCommands(
        "create-client", "localhost:9092", "consumer-client", false,
        OffsetBehaviour.Earliest, 10
    )
}

suspend fun consumerCommands(
    topic: String,
    bootstrapServers: String,
    group: String,
    autoCommit: Boolean,
    offsetBehaviour: OffsetBehaviour,
    pollMax: Int
) {
    val consumer = consumer(bootstrapServers, group, autoCommit, offsetBehaviour, pollMax)
    consumer.subscribe(mutableListOf(topic))
    while (true) {
        val records = consumer.poll(Duration.ofMillis(1000))
        if (records.count() > 0) {
            records.forEach {
                val entity = treeToValue(it.value().get("message"), Client::class.java) as Client
                ClientService().insert(entity)
            }
        }
    }
}

效果很好。但我正在尝试创建更通用的内容,如下所示:

interface KafkaConsumer<T> {
    fun execute(callback: (T) -> Unit)
}

suspend fun <T> consumerCommand(
    topic: String,
    bootstrapServers: String,
    group: String,
    autoCommit: Boolean,
    offsetBehaviour: OffsetBehaviour,
    pollMax: Int,
    callback: KafkaConsumer<T>
): ConsumerRecords<String, JsonNode>? {
    val consumer = consumer(bootstrapServers, group, autoCommit, offsetBehaviour, pollMax)
    consumer.subscribe(mutableListOf(topic))
    while (true) {
        val records = consumer.poll(Duration.ofMillis(1000))
        if (records.count() > 0) {
            records.forEach {
                val entity = (treeToValue(it.value().get("message"), Any::class.java) as T)
                coroutineScope {
                    callback.execute { entity }
                }
            }
        }
    }
}


suspend fun consumerClient(service: ClientService) {
    val messages = consumerCommand<Client>(
        "create-client", "localhost:9092", "consumer-client", false,
        OffsetBehaviour.Earliest, 10, {client: Client -> ClientService().insert(client)}
    )
}

但它不起作用。有人可以帮忙吗?

您正在尝试使用 lambda { client: Client -> ... },但应为 KafkaConsumer。相反,您需要使用函数类型。你的 KafkaConsumer 相当于 ((T) -> Unit) -> Unit,但我怀疑这是一个错误,你实际上想要

suspend fun <T> consumerCommand(
    topic: String,
    bootstrapServers: String,
    group: String,
    autoCommit: Boolean,
    offsetBehaviour: OffsetBehaviour,
    pollMax: Int,
    callback: (T) -> Unit
): ConsumerRecords<String, JsonNode>? {
    ...
                coroutineScope {
                    callback(entity)
                }
            }
        }
    }
}

甚至suspend (T) -> Unit.

旁注:您没有在 consumerClient 函数中使用 service 参数,而是创建了一个新的 ClientService;是故意的吗?

也许你可以这样做:

suspend fun <T> consumerCommand(
    topic: String,
    bootstrapServers: String,
    group: String,
    autoCommit: Boolean,
    offsetBehaviour: OffsetBehaviour,
    pollMax: Int,
    callback: (entity: T) -> Unit>
): ConsumerRecords<String, JsonNode>? {
    val consumer = consumer(bootstrapServers, group, autoCommit, offsetBehaviour, pollMax)
    consumer.subscribe(mutableListOf(topic))
    while (true) {
        val records = consumer.poll(Duration.ofMillis(1000))
        if (records.count() > 0) {
            records.forEach {
                val entity = (treeToValue(it.value().get("message"), Any::class.java) as T)
                callback(entity)
            }
        }
    }
}

suspend fun consumerClient(service: ClientService) {
    val messages = consumerCommand<Client>(
        "create-client", "localhost:9092", "consumer-client", false,
        OffsetBehaviour.Earliest, 10, {client: Client -> ClientService().insert(client)}
    )
}