如何正确重启kafka s3 sink connect?

How to properly restart a kafka s3 sink connect?

我从 5 月 1 日开始使用 kafka s3 sink connector(来自 confluent 包的 bundle connector)。它工作正常,直到 5 月 8 日。检查状态,它表明某些 aws 异常使该连接器崩溃。这个应该问题不大,所以想恢复一下。

我尝试了以下步骤:

  1. 我POST/connectors/s3sink/restart。然后我看到连接器处于 运行 模式,但任务仍然失败。
  2. 然后我 PUT /connectors/s3sink/task/0/restart。好的,现在任务处于运行模式。

但是后来我跟踪日志,我发现它开始重写旧数据,比如5月3日的数据。它弄乱了旧数据!

那么,connect restart REST API 是否会重置偏移量?我以为它会保存偏移量并从它失败的偏移量开始。

以及如何正确重启失败的连接器任务?通过删除那些 PODs? (使用 kubernetes),还是通过 REST /task/0/restart?我什么时候应该使用 /connectors/s3sink/restart?

/connector/:name/restart 是对 worker leader 的滚动重启操作,需要以异步方式传播到所有 worker 服务器任务。因此,您需要确保 leader worker 和所有其他 worker 之间的网络连接。

/connector/:name/task/:num/restart 将直接向该工作人员发送请求,重新启动线程。

重新启动不应重置偏移量,因为它们存储在 中。如果有的话,任务无法将偏移量提交回 __consumer_offsets 主题,但您应该看到它的日志。