如何更改主题的起始偏移量?

How to change start offset for topic?

可以更改新主题的起始偏移量吗?我想创建一个新主题并从偏移量 10000 开始阅读。怎么样?

您可以在动物园管理员的帮助下完成此操作 shell。 Kafka 使用 zookeeper 来跟踪消费者偏移量。

进入kafka bin目录并调用zookeeper shell。(我的kafka版本是0.8.0)

./zookeeper-shell.sh localhost:2181

现在使用 zookeeper get 命令

get /consumers/consumer_group_id/offsets/topic/0

显示类似

2043
cZxid = 0x4d
ctime = Wed Mar 18 03:56:32 EDT 2015
...

这里2043是消耗的最大offset。使用 zookeeper set 命令将其设置为所需的值

set /consumers/consumer_group_id/offsets/topic/0 10000

路径是这样构成的/consumers/[consumer_group_id]/offsets/[topic]/[partition_id]。
您将必须替换为适当的消费者组、主题和分区 ID。

*此外,由于您提到它是 kafka 的新实例,我不确定消费者是否会连接并创建消费者组。

由于kafka 0.9 偏移量存储在主题中。要更改偏移量,请使用 seek() method:

public void seek(TopicPartition partition, long offset)

Overrides the fetch offsets that the consumer will use on the next poll(timeout). If this API is invoked for the same partition more than once, the latest offset will be used on the next poll(). Note that you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsets

因为kafka 0.11.0.0 你可以使用脚本 kafka-consumer-groups.sh 来自

的例子
kafka-consumer-groups.sh --bootstrap-server kafka-host:9092 --group my-group --reset-offsets --to-earliest --all-topics --execute

KIP-122: Add Reset Consumer Group Offsets tooling

中列出的其他选项
.----------------------.-----------------------------------------------.----------------------------------------------------------------------------------------------------------------------------------------------.
|      Scenario        |                   Arguments                   |                                                                    Example                                                                   |
:----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------:
| Reset to Datetime    |  --to-datetime YYYY-MM-DDTHH:mm:SS.sss±hh:mm  |  Reset to first offset since 01 January 2017, 00:00:00 hrs: --reset-offsets –group test.group --topic foo --to-datetime 2017-01-01T00:00:00Z |
:----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------:
| Reset by Duration    |  --by-duration  PnDTnHnMnS                    |  Reset to first offset since one week ago (from current timestamp): --reset-offsets --group test.group --topic foo --by-duration P7D         |
:----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------:
| Reset to Earliest    |  --to-earliest                                |  Reset to earliest offset available: --reset-offsets --group test.group --topic foo --to-earliest                                            |
:----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------:
| Reset to Latest      |  --to-latest                                  |  Reset to latest offset available: --reset-offsets --group test.group --topic foo --to-latest                                                |
:----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------:
| Reset to Offset      |  --to-offset                                  |  Reset to offset 1 in all partitions: --reset-offsets --group test.group --topic foo --to-offset 1                                           |
:----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------:
| Shift Offset by 'n'  |  --shift-by n                                 |  Reset to current offset plus 5 positions: --reset-offsets --group test.group –topic foo --shift-by 5                                        |
:----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------:
| Reset from File      |  --from-file PATH_TO_FILE                     |  Reset using a file with reset plan: --reset-offsets --group test.group --from-file reset-plan.csv                                           |
'----------------------'-----------------------------------------------'----------------------------------------------------------------------------------------------------------------------------------------------'

您还可以定义要重置的分区,例如:

  • 将主题 foo 分区 0 的偏移量重置为 1

    --reset-offsets --group test.group --topic foo:0 --to-offset 1

  • 将主题 foo 分区 0,1,2 的偏移量重置为最早

    --reset-offsets --group test.group --topic foo:0,1,2 --to-earliest

提醒:不要忘记--execute标志(参见KIP中的执行选项)。如果没有此标志,脚本 将仅按范围 打印出场景的结果,例如:

TOPIC                 PARTITION NEW-OFFSET NEW-LAG LOG-END-OFFSET CONSUMER-ID HOST CLIENT-ID
foo                   0         90         10      100            -           -    -

感谢 this answer。 Table 创建于 ascii tables

如果您需要更改偏移量。

kafka-consumer-groups --bootstrap-server {url} \
--topic {topic} \
--group {consumer} \
--reset-offsets --to-datetime 2020-11-11T00:00:00.000+0900 \
--execute

Unparseable Date Error when parsing UTC string through SimpleDateFormat to Date