如何正确重启kafka s3 sink connect?
How to properly restart a kafka s3 sink connect?
我从 5 月 1 日开始使用 kafka s3 sink connector(来自 confluent 包的 bundle connector)。它工作正常,直到 5 月 8 日。检查状态,它表明某些 aws 异常使该连接器崩溃。这个应该问题不大,所以想恢复一下。
我尝试了以下步骤:
- 我POST/connectors/s3sink/restart。然后我看到连接器处于 运行 模式,但任务仍然失败。
- 然后我 PUT /connectors/s3sink/task/0/restart。好的,现在任务处于运行模式。
但是后来我跟踪日志,我发现它开始重写旧数据,比如5月3日的数据。它弄乱了旧数据!
那么,connect restart REST API 是否会重置偏移量?我以为它会保存偏移量并从它失败的偏移量开始。
以及如何正确重启失败的连接器任务?通过删除那些 PODs? (使用 kubernetes),还是通过 REST /task/0/restart?我什么时候应该使用 /connectors/s3sink/restart?
/connector/:name/restart
是对 worker leader 的滚动重启操作,需要以异步方式传播到所有 worker 服务器任务。因此,您需要确保 leader worker 和所有其他 worker 之间的网络连接。
/connector/:name/task/:num/restart
将直接向该工作人员发送请求,重新启动线程。
重新启动不应重置偏移量,因为它们存储在 中。如果有的话,任务无法将偏移量提交回 __consumer_offsets
主题,但您应该看到它的日志。
我从 5 月 1 日开始使用 kafka s3 sink connector(来自 confluent 包的 bundle connector)。它工作正常,直到 5 月 8 日。检查状态,它表明某些 aws 异常使该连接器崩溃。这个应该问题不大,所以想恢复一下。
我尝试了以下步骤:
- 我POST/connectors/s3sink/restart。然后我看到连接器处于 运行 模式,但任务仍然失败。
- 然后我 PUT /connectors/s3sink/task/0/restart。好的,现在任务处于运行模式。
但是后来我跟踪日志,我发现它开始重写旧数据,比如5月3日的数据。它弄乱了旧数据!
那么,connect restart REST API 是否会重置偏移量?我以为它会保存偏移量并从它失败的偏移量开始。
以及如何正确重启失败的连接器任务?通过删除那些 PODs? (使用 kubernetes),还是通过 REST /task/0/restart?我什么时候应该使用 /connectors/s3sink/restart?
/connector/:name/restart
是对 worker leader 的滚动重启操作,需要以异步方式传播到所有 worker 服务器任务。因此,您需要确保 leader worker 和所有其他 worker 之间的网络连接。
/connector/:name/task/:num/restart
将直接向该工作人员发送请求,重新启动线程。
重新启动不应重置偏移量,因为它们存储在 __consumer_offsets
主题,但您应该看到它的日志。