在不增加偏移量的情况下打印 kstream 内容
Print a kstream content without increasing offset
我目前正在 kafka 流上执行我的第一步,我很难理解 kafka 应用程序如何存储其状态。
我想在不更新偏移量的情况下打印 kstream 的内容,感觉这不是我应该做的事情,但我很难理解为什么:
def rawPlanningStream(
builder: StreamsBuilder,
topicName: String
): KStream[String, Planning] =
builder.stream(topicName)(Consumed.`with`(Serdes.String, Planning.serde))
def printPlanning(
key: String,
value: Planning
) = {
val logger = LoggerFactory.getLogger("PlanningEventSyncLogger")
logger.warn(s"Planning: $key, $value")
}
def process(
builder: StreamsBuilder,
rawTopic: String
) = {
val raw_planning_stream = PlanningEventSync.rawPlanningStream(
builder,
rawTopic
)
raw_planning_stream.peek((k,v) => printPlanning(k,v))
//Here I would like to perform an operation on raw_planning_stream
//but offset is already "wrong" because of the peek done earlier
}
我第一次开始处理时,主题的内容按预期打印,如果我再次启动它,它不再打印任何内容,因为偏移量已更新。
我的问题是是否可以像打印一样执行 'non invasive' 操作以保持偏移不变?
(注意:我设法在我的组中使用 --reset-offsets --to-earliest from kafka-consumer-groups.sh 以手动重置偏移量,但我希望能够以编程方式执行操作而不更改我的消费者组的偏移量)
如果你不能设置enable.auto.commit=false
,那么另一个选项是设置application.id="<some random UUID>"
,这样每次你运行应用程序,它都会创建一个新的消费者组,开始来自 auto.offset.reset
设置
我目前正在 kafka 流上执行我的第一步,我很难理解 kafka 应用程序如何存储其状态。 我想在不更新偏移量的情况下打印 kstream 的内容,感觉这不是我应该做的事情,但我很难理解为什么:
def rawPlanningStream(
builder: StreamsBuilder,
topicName: String
): KStream[String, Planning] =
builder.stream(topicName)(Consumed.`with`(Serdes.String, Planning.serde))
def printPlanning(
key: String,
value: Planning
) = {
val logger = LoggerFactory.getLogger("PlanningEventSyncLogger")
logger.warn(s"Planning: $key, $value")
}
def process(
builder: StreamsBuilder,
rawTopic: String
) = {
val raw_planning_stream = PlanningEventSync.rawPlanningStream(
builder,
rawTopic
)
raw_planning_stream.peek((k,v) => printPlanning(k,v))
//Here I would like to perform an operation on raw_planning_stream
//but offset is already "wrong" because of the peek done earlier
}
我第一次开始处理时,主题的内容按预期打印,如果我再次启动它,它不再打印任何内容,因为偏移量已更新。
我的问题是是否可以像打印一样执行 'non invasive' 操作以保持偏移不变?
(注意:我设法在我的组中使用 --reset-offsets --to-earliest from kafka-consumer-groups.sh 以手动重置偏移量,但我希望能够以编程方式执行操作而不更改我的消费者组的偏移量)
如果你不能设置enable.auto.commit=false
,那么另一个选项是设置application.id="<some random UUID>"
,这样每次你运行应用程序,它都会创建一个新的消费者组,开始来自 auto.offset.reset
设置