如何在 Kotlin 中创建回调函数?
How to create a callback function in Kotlin?
我在以下情况下创建回调函数时遇到问题:
一个kafka消费者监听新的消息能够记录到数据库中:
suspend fun consumerClient(service: ClientService) {
val messages = consumerCommands(
"create-client", "localhost:9092", "consumer-client", false,
OffsetBehaviour.Earliest, 10
)
}
suspend fun consumerCommands(
topic: String,
bootstrapServers: String,
group: String,
autoCommit: Boolean,
offsetBehaviour: OffsetBehaviour,
pollMax: Int
) {
val consumer = consumer(bootstrapServers, group, autoCommit, offsetBehaviour, pollMax)
consumer.subscribe(mutableListOf(topic))
while (true) {
val records = consumer.poll(Duration.ofMillis(1000))
if (records.count() > 0) {
records.forEach {
val entity = treeToValue(it.value().get("message"), Client::class.java) as Client
ClientService().insert(entity)
}
}
}
}
效果很好。但我正在尝试创建更通用的内容,如下所示:
interface KafkaConsumer<T> {
fun execute(callback: (T) -> Unit)
}
suspend fun <T> consumerCommand(
topic: String,
bootstrapServers: String,
group: String,
autoCommit: Boolean,
offsetBehaviour: OffsetBehaviour,
pollMax: Int,
callback: KafkaConsumer<T>
): ConsumerRecords<String, JsonNode>? {
val consumer = consumer(bootstrapServers, group, autoCommit, offsetBehaviour, pollMax)
consumer.subscribe(mutableListOf(topic))
while (true) {
val records = consumer.poll(Duration.ofMillis(1000))
if (records.count() > 0) {
records.forEach {
val entity = (treeToValue(it.value().get("message"), Any::class.java) as T)
coroutineScope {
callback.execute { entity }
}
}
}
}
}
suspend fun consumerClient(service: ClientService) {
val messages = consumerCommand<Client>(
"create-client", "localhost:9092", "consumer-client", false,
OffsetBehaviour.Earliest, 10, {client: Client -> ClientService().insert(client)}
)
}
但它不起作用。有人可以帮忙吗?
您正在尝试使用 lambda { client: Client -> ... }
,但应为 KafkaConsumer
。相反,您需要使用函数类型。你的 KafkaConsumer
相当于 ((T) -> Unit) -> Unit
,但我怀疑这是一个错误,你实际上想要
suspend fun <T> consumerCommand(
topic: String,
bootstrapServers: String,
group: String,
autoCommit: Boolean,
offsetBehaviour: OffsetBehaviour,
pollMax: Int,
callback: (T) -> Unit
): ConsumerRecords<String, JsonNode>? {
...
coroutineScope {
callback(entity)
}
}
}
}
}
甚至suspend (T) -> Unit
.
旁注:您没有在 consumerClient
函数中使用 service
参数,而是创建了一个新的 ClientService
;是故意的吗?
也许你可以这样做:
suspend fun <T> consumerCommand(
topic: String,
bootstrapServers: String,
group: String,
autoCommit: Boolean,
offsetBehaviour: OffsetBehaviour,
pollMax: Int,
callback: (entity: T) -> Unit>
): ConsumerRecords<String, JsonNode>? {
val consumer = consumer(bootstrapServers, group, autoCommit, offsetBehaviour, pollMax)
consumer.subscribe(mutableListOf(topic))
while (true) {
val records = consumer.poll(Duration.ofMillis(1000))
if (records.count() > 0) {
records.forEach {
val entity = (treeToValue(it.value().get("message"), Any::class.java) as T)
callback(entity)
}
}
}
}
suspend fun consumerClient(service: ClientService) {
val messages = consumerCommand<Client>(
"create-client", "localhost:9092", "consumer-client", false,
OffsetBehaviour.Earliest, 10, {client: Client -> ClientService().insert(client)}
)
}
我在以下情况下创建回调函数时遇到问题: 一个kafka消费者监听新的消息能够记录到数据库中:
suspend fun consumerClient(service: ClientService) {
val messages = consumerCommands(
"create-client", "localhost:9092", "consumer-client", false,
OffsetBehaviour.Earliest, 10
)
}
suspend fun consumerCommands(
topic: String,
bootstrapServers: String,
group: String,
autoCommit: Boolean,
offsetBehaviour: OffsetBehaviour,
pollMax: Int
) {
val consumer = consumer(bootstrapServers, group, autoCommit, offsetBehaviour, pollMax)
consumer.subscribe(mutableListOf(topic))
while (true) {
val records = consumer.poll(Duration.ofMillis(1000))
if (records.count() > 0) {
records.forEach {
val entity = treeToValue(it.value().get("message"), Client::class.java) as Client
ClientService().insert(entity)
}
}
}
}
效果很好。但我正在尝试创建更通用的内容,如下所示:
interface KafkaConsumer<T> {
fun execute(callback: (T) -> Unit)
}
suspend fun <T> consumerCommand(
topic: String,
bootstrapServers: String,
group: String,
autoCommit: Boolean,
offsetBehaviour: OffsetBehaviour,
pollMax: Int,
callback: KafkaConsumer<T>
): ConsumerRecords<String, JsonNode>? {
val consumer = consumer(bootstrapServers, group, autoCommit, offsetBehaviour, pollMax)
consumer.subscribe(mutableListOf(topic))
while (true) {
val records = consumer.poll(Duration.ofMillis(1000))
if (records.count() > 0) {
records.forEach {
val entity = (treeToValue(it.value().get("message"), Any::class.java) as T)
coroutineScope {
callback.execute { entity }
}
}
}
}
}
suspend fun consumerClient(service: ClientService) {
val messages = consumerCommand<Client>(
"create-client", "localhost:9092", "consumer-client", false,
OffsetBehaviour.Earliest, 10, {client: Client -> ClientService().insert(client)}
)
}
但它不起作用。有人可以帮忙吗?
您正在尝试使用 lambda { client: Client -> ... }
,但应为 KafkaConsumer
。相反,您需要使用函数类型。你的 KafkaConsumer
相当于 ((T) -> Unit) -> Unit
,但我怀疑这是一个错误,你实际上想要
suspend fun <T> consumerCommand(
topic: String,
bootstrapServers: String,
group: String,
autoCommit: Boolean,
offsetBehaviour: OffsetBehaviour,
pollMax: Int,
callback: (T) -> Unit
): ConsumerRecords<String, JsonNode>? {
...
coroutineScope {
callback(entity)
}
}
}
}
}
甚至suspend (T) -> Unit
.
旁注:您没有在 consumerClient
函数中使用 service
参数,而是创建了一个新的 ClientService
;是故意的吗?
也许你可以这样做:
suspend fun <T> consumerCommand(
topic: String,
bootstrapServers: String,
group: String,
autoCommit: Boolean,
offsetBehaviour: OffsetBehaviour,
pollMax: Int,
callback: (entity: T) -> Unit>
): ConsumerRecords<String, JsonNode>? {
val consumer = consumer(bootstrapServers, group, autoCommit, offsetBehaviour, pollMax)
consumer.subscribe(mutableListOf(topic))
while (true) {
val records = consumer.poll(Duration.ofMillis(1000))
if (records.count() > 0) {
records.forEach {
val entity = (treeToValue(it.value().get("message"), Any::class.java) as T)
callback(entity)
}
}
}
}
suspend fun consumerClient(service: ClientService) {
val messages = consumerCommand<Client>(
"create-client", "localhost:9092", "consumer-client", false,
OffsetBehaviour.Earliest, 10, {client: Client -> ClientService().insert(client)}
)
}