使用 ktor 和 kotlin 对 kafka 消费者进行 na 抽象
Make na abstraction to kafka consumer using ktor and kotlin
我正在为消费者和生产者 Kafka 创建抽象,以始终避免重复代码。所以我使用 kotlin 和 gradle 创建了一个名为 "kafka-commons" 的库,并放入以下代码:
对于 Kafka 生产者:
fun producer(
bootstrapServers: String,
idempotence: Boolean,
acks: Acks,
retries: Int,
requestPerConnection: Int,
compression: Compression,
linger: Int,
batchSize: BatchSize
): KafkaProducer<String, Any> {
val prop: HashMap<String, Any> = HashMap()
prop[BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
prop[KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
prop[VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
prop[ENABLE_IDEMPOTENCE_CONFIG] = idempotence
prop[ACKS_CONFIG] = acks.value
prop[RETRIES_CONFIG] = retries
prop[MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION] = requestPerConnection
prop[COMPRESSION_TYPE_CONFIG] = compression.value
prop[LINGER_MS_CONFIG] = linger
prop[BATCH_SIZE_CONFIG] = batchSize.value
return KafkaProducer(prop)
}
suspend inline fun <reified K : Any, reified V : Any> KafkaProducer<K, V>.dispatch(record: ProducerRecord<K, V>) =
suspendCoroutine<RecordMetadata> { continuation ->
val callback = Callback { metadata, exception ->
if (metadata == null) {
continuation.resumeWithException(exception!!)
} else {
continuation.resume(metadata)
}
}
this.send(record, callback)
}
因此,我创建了一个具有以下结构的默认命令对象
data class Command(
val id: UUID,
val status: CommandStatus,
val message: Any
)
id: 创建唯一ID
状态:创建消息状态(可以是:打开/处理中/关闭/错误)
消息:来自 http 请求的对象(例如:如果有一个 Post
例如:如果有一个 "insert user: POST" 正文:
{ "id": 1, "name" : "John", "lastName" : "Wick" }
所以消息将是这个对象,依此类推。
为了创建这个命令,我做了这个函数:
suspend fun creatCommand(
topicName: String,
id: UUID,
commandStatus: CommandStatus,
request: Any,
bootstrapServers: String,
idempotence: Boolean,
acks: Acks,
retries: Int,
requestPerConnection: Int,
compression: Compression,
linger: Int,
batchSize: BatchSize
): Unit {
val producer = producer(
bootstrapServers,
idempotence,
acks,
retries,
requestPerConnection,
compression,
linger,
batchSize)
val command = toCommand(processStarted(id, commandStatus, request))
val record = ProducerRecord<String, Any>(topicName, id.toString(), command)
coroutineScope { launch { producer.dispatch(record) } }
}
所以,我创建了其他 API,只是调用这个函数来创建一个向 kafka 发送命令的生产者。喜欢:
fun Route.user(service: Service) =
route("/api/access") {
post("/test") {
call.respond(service.command(call.receive()))
}
}
>>>>>> other class <<<<<<<<
classService () {
fun command( all parameters) { creatCommand(all parameters)}
}
到目前为止还不错。一切都很好。
现在我的问题开始了。我正在尝试创建一个消费者。
首先我做了这个:
fun consumer(
bootstrapServers: String,
group: String,
autoCommit: Boolean,
offsetBehaviour: OffsetBehaviour,
pollMax: Int
): KafkaConsumer<String, Any> {
val prop: HashMap<String, Any> = HashMap()
prop[BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
prop[KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
prop[VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
prop[GROUP_ID_CONFIG] = group
prop[AUTO_OFFSET_RESET_CONFIG] = offsetBehaviour
prop[ENABLE_AUTO_COMMIT_CONFIG] = autoCommit
prop[MAX_POLL_RECORDS_CONFIG] = pollMax
return KafkaConsumer(prop)
}
之后:
fun<T> recordingCommand(
command: Class<T>,
topic: String,
bootstrapServers: String,
group: String,
autoCommit: Boolean,
offsetBehaviour: OffsetBehaviour,
pollMax: Int
) {
val consumer = consumer(bootstrapServers, group, autoCommit, offsetBehaviour, pollMax)
consumer.subscribe(mutableListOf(topic))
while (true) {
val records = consumer.poll(Duration.ofMillis(100))
for (record in records) {
val om = ObjectMaper
om.readvalue(record.value(), command::class.java)
>>>> I GOT LOST HERE <<<<<<
}
}
}
我需要的是:创建一个抽象消费者,将所有数据记录在数据库中的 Command.message()(仅消息)中。
例如,我需要将上面的用户(id 1, john wick)记录到一个postgresql数据库中。
所以如果我有一个带有插入方法的服务,我可以调用它,传递插入方法,如:
service.insert(recordingCommand(all parameters)).
有人可以帮我吗?
如果我没理解错的话,您在 JSON 映射到对象时遇到了问题
val mapper = ObjectMaper
while (true) {
val records = consumer.poll(Duration.ofMillis(100))
for (record in records) {
val cmd = mapper.readvalue(record.value(), command::class.java)
// do things with cmd
}
}
注意:Kafka 有自己的 JSON 到 POJO 反序列化器,如果您想将数据发送到数据库,Kafka Connect 通常比简单的消费循环更容错。
我正在为消费者和生产者 Kafka 创建抽象,以始终避免重复代码。所以我使用 kotlin 和 gradle 创建了一个名为 "kafka-commons" 的库,并放入以下代码: 对于 Kafka 生产者:
fun producer(
bootstrapServers: String,
idempotence: Boolean,
acks: Acks,
retries: Int,
requestPerConnection: Int,
compression: Compression,
linger: Int,
batchSize: BatchSize
): KafkaProducer<String, Any> {
val prop: HashMap<String, Any> = HashMap()
prop[BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
prop[KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
prop[VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
prop[ENABLE_IDEMPOTENCE_CONFIG] = idempotence
prop[ACKS_CONFIG] = acks.value
prop[RETRIES_CONFIG] = retries
prop[MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION] = requestPerConnection
prop[COMPRESSION_TYPE_CONFIG] = compression.value
prop[LINGER_MS_CONFIG] = linger
prop[BATCH_SIZE_CONFIG] = batchSize.value
return KafkaProducer(prop)
}
suspend inline fun <reified K : Any, reified V : Any> KafkaProducer<K, V>.dispatch(record: ProducerRecord<K, V>) =
suspendCoroutine<RecordMetadata> { continuation ->
val callback = Callback { metadata, exception ->
if (metadata == null) {
continuation.resumeWithException(exception!!)
} else {
continuation.resume(metadata)
}
}
this.send(record, callback)
}
因此,我创建了一个具有以下结构的默认命令对象
data class Command(
val id: UUID,
val status: CommandStatus,
val message: Any
)
id: 创建唯一ID 状态:创建消息状态(可以是:打开/处理中/关闭/错误) 消息:来自 http 请求的对象(例如:如果有一个 Post
例如:如果有一个 "insert user: POST" 正文:
{ "id": 1, "name" : "John", "lastName" : "Wick" }
所以消息将是这个对象,依此类推。
为了创建这个命令,我做了这个函数:
suspend fun creatCommand(
topicName: String,
id: UUID,
commandStatus: CommandStatus,
request: Any,
bootstrapServers: String,
idempotence: Boolean,
acks: Acks,
retries: Int,
requestPerConnection: Int,
compression: Compression,
linger: Int,
batchSize: BatchSize
): Unit {
val producer = producer(
bootstrapServers,
idempotence,
acks,
retries,
requestPerConnection,
compression,
linger,
batchSize)
val command = toCommand(processStarted(id, commandStatus, request))
val record = ProducerRecord<String, Any>(topicName, id.toString(), command)
coroutineScope { launch { producer.dispatch(record) } }
}
所以,我创建了其他 API,只是调用这个函数来创建一个向 kafka 发送命令的生产者。喜欢:
fun Route.user(service: Service) =
route("/api/access") {
post("/test") {
call.respond(service.command(call.receive()))
}
}
>>>>>> other class <<<<<<<<
classService () {
fun command( all parameters) { creatCommand(all parameters)}
}
到目前为止还不错。一切都很好。
现在我的问题开始了。我正在尝试创建一个消费者。 首先我做了这个:
fun consumer(
bootstrapServers: String,
group: String,
autoCommit: Boolean,
offsetBehaviour: OffsetBehaviour,
pollMax: Int
): KafkaConsumer<String, Any> {
val prop: HashMap<String, Any> = HashMap()
prop[BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
prop[KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
prop[VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
prop[GROUP_ID_CONFIG] = group
prop[AUTO_OFFSET_RESET_CONFIG] = offsetBehaviour
prop[ENABLE_AUTO_COMMIT_CONFIG] = autoCommit
prop[MAX_POLL_RECORDS_CONFIG] = pollMax
return KafkaConsumer(prop)
}
之后:
fun<T> recordingCommand(
command: Class<T>,
topic: String,
bootstrapServers: String,
group: String,
autoCommit: Boolean,
offsetBehaviour: OffsetBehaviour,
pollMax: Int
) {
val consumer = consumer(bootstrapServers, group, autoCommit, offsetBehaviour, pollMax)
consumer.subscribe(mutableListOf(topic))
while (true) {
val records = consumer.poll(Duration.ofMillis(100))
for (record in records) {
val om = ObjectMaper
om.readvalue(record.value(), command::class.java)
>>>> I GOT LOST HERE <<<<<<
}
}
}
我需要的是:创建一个抽象消费者,将所有数据记录在数据库中的 Command.message()(仅消息)中。
例如,我需要将上面的用户(id 1, john wick)记录到一个postgresql数据库中。 所以如果我有一个带有插入方法的服务,我可以调用它,传递插入方法,如:
service.insert(recordingCommand(all parameters)).
有人可以帮我吗?
如果我没理解错的话,您在 JSON 映射到对象时遇到了问题
val mapper = ObjectMaper
while (true) {
val records = consumer.poll(Duration.ofMillis(100))
for (record in records) {
val cmd = mapper.readvalue(record.value(), command::class.java)
// do things with cmd
}
}
注意:Kafka 有自己的 JSON 到 POJO 反序列化器,如果您想将数据发送到数据库,Kafka Connect 通常比简单的消费循环更容错。