动态限制 Apache Spout
Throttle Apache Spout Dynamically
我有一个拓扑,其中 spout 从 Kafka 读取数据并发送到 bolt,后者又调用一个 REST API (A) 并调用另一个 REST API (B)。到目前为止API B 没有节流。现在他们已经实施了限制(x 每时钟分钟的最大调用数)。
我们需要实施节流处理程序。
选项 A
最初我们想在 REST API (A) 级别上做,然后放一个
Thread.sleep(x in millis)
一旦调用被 REST 限制 API (B)
但这将使所有 REST (A) 调用等待那么长时间(在 1 秒到 59 秒之间变化),并且可能会增加新来电的负载。
选项 B
REST API (A) 向 Bolt 发送回关于被限制的响应。 Bolt 通知 Spout 进程失败
- 不要更改这些消息的偏移量
- 告诉 spout 停止从 Kafka 读取并停止向 Bolt 发送消息。
- Spout 等待一段时间(比如一分钟),然后从它离开的地方恢复
选项 A 很容易实施,但在我看来不是一个好的解决方案。
试图弄清楚选项 B 是否可行 topology.max.spout.pending 但是如何与 Storm 动态通信以放置一个在喷嘴中节流。任何人都可以分享一些关于这个选项的想法。
选项 C
REST API (B) 限制来自 REST (A) 的调用,它不会处理相同的调用,但会将 429 响应代码发送到螺栓。螺栓会将消息重新排队到另一个风暴拓扑的错误主题部分。此消息可以将重试计数作为其中的一部分,以防相同的消息再次受到限制,我们可以使用 ++retry count 重新排队。
更新 post 找到了使选项 B 可行的解决方案。
选项 D
/**
* The time stamp of the next retry is scheduled according to the exponential backoff formula (geometric progression):
* nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod^(failCount-1),
* where failCount = 1, 2, 3, ... nextRetry = Min(nextRetry, currentTime + maxDelay).
* <p/>
* By specifying a value for maxRetries lower than Integer.MAX_VALUE, the user decides to sacrifice guarantee of delivery for the
* previous polled records in favor of processing more records.
*
* @param initialDelay initial delay of the first retry
* @param delayPeriod the time interval that is the ratio of the exponential backoff formula (geometric progression)
* @param maxRetries maximum number of times a tuple is retried before being acked and scheduled for commit
* @param maxDelay maximum amount of time waiting before retrying
*
*/
public KafkaSpoutRetryExponentialBackoff(TimeInterval initialDelay, TimeInterval delayPeriod, int maxRetries, TimeInterval maxDelay) {
this.initialDelay = initialDelay;
this.delayPeriod = delayPeriod;
this.maxRetries = maxRetries;
this.maxDelay = maxDelay;
LOG.debug("Instantiated {}", this.toStringImpl());
}
步骤如下:
- 使用上述构造函数创建 kafkaSpoutRetryService
- 使用 KafkaSpoutConfig 设置重试
KafkaSpoutConfig.builder(kafkaBootStrapServers, topic).setRetry(kafkaSpoutRetryService)
- 如果 Rest API (B) 中存在节流,则使 Bolt 失效
collector.fail(tuple)
这将根据步骤 1 和 2 中的重试配置设置向 spout 发出再次处理元组的信号
你的选项 D 听起来不错,但为了避免重复调用 API A,我认为你应该考虑将你的拓扑分成两部分。
有一个拓扑结构可以从您的原始 Kafka 主题(称为主题 1)读取,调用 REST API A,并将 bolt 的任何输出写回到 Kafka 主题(称为主题 2) ).
然后创建第二个拓扑,其唯一工作是从主题 2 读取,并调用 REST API B。
这将允许您使用选项 D,同时避免在饱和 API B 时额外调用 API A。您的拓扑结构看起来像
Kafka 1 -> Bolt A -> REST API A -> Kafka 2
Kafka 2 -> 螺栓 B -> REST API B
如果你想让解决方案对节流反应更灵敏一些,你可以使用 Storm 中的 topology.max.spout.pending
配置来限制同时运行的元组数量。然后你可以让你的 bolt B 缓冲飞行中的元组,直到节流到期,此时你可以让它尝试再次发送元组。您可以使用 OutputCollector.resetTupleTimeout
来避免元组在 Bolt B 等待限制到期时超时。可以使用tick tuples让Bolt B周期性的唤醒并检查节流是否过期
我有一个拓扑,其中 spout 从 Kafka 读取数据并发送到 bolt,后者又调用一个 REST API (A) 并调用另一个 REST API (B)。到目前为止API B 没有节流。现在他们已经实施了限制(x 每时钟分钟的最大调用数)。
我们需要实施节流处理程序。
选项 A
最初我们想在 REST API (A) 级别上做,然后放一个
Thread.sleep(x in millis)
一旦调用被 REST 限制 API (B)
但这将使所有 REST (A) 调用等待那么长时间(在 1 秒到 59 秒之间变化),并且可能会增加新来电的负载。
选项 B
REST API (A) 向 Bolt 发送回关于被限制的响应。 Bolt 通知 Spout 进程失败
- 不要更改这些消息的偏移量
- 告诉 spout 停止从 Kafka 读取并停止向 Bolt 发送消息。
- Spout 等待一段时间(比如一分钟),然后从它离开的地方恢复
选项 A 很容易实施,但在我看来不是一个好的解决方案。
试图弄清楚选项 B 是否可行 topology.max.spout.pending 但是如何与 Storm 动态通信以放置一个在喷嘴中节流。任何人都可以分享一些关于这个选项的想法。
选项 C
REST API (B) 限制来自 REST (A) 的调用,它不会处理相同的调用,但会将 429 响应代码发送到螺栓。螺栓会将消息重新排队到另一个风暴拓扑的错误主题部分。此消息可以将重试计数作为其中的一部分,以防相同的消息再次受到限制,我们可以使用 ++retry count 重新排队。
更新 post 找到了使选项 B 可行的解决方案。
选项 D
/**
* The time stamp of the next retry is scheduled according to the exponential backoff formula (geometric progression):
* nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod^(failCount-1),
* where failCount = 1, 2, 3, ... nextRetry = Min(nextRetry, currentTime + maxDelay).
* <p/>
* By specifying a value for maxRetries lower than Integer.MAX_VALUE, the user decides to sacrifice guarantee of delivery for the
* previous polled records in favor of processing more records.
*
* @param initialDelay initial delay of the first retry
* @param delayPeriod the time interval that is the ratio of the exponential backoff formula (geometric progression)
* @param maxRetries maximum number of times a tuple is retried before being acked and scheduled for commit
* @param maxDelay maximum amount of time waiting before retrying
*
*/
public KafkaSpoutRetryExponentialBackoff(TimeInterval initialDelay, TimeInterval delayPeriod, int maxRetries, TimeInterval maxDelay) {
this.initialDelay = initialDelay;
this.delayPeriod = delayPeriod;
this.maxRetries = maxRetries;
this.maxDelay = maxDelay;
LOG.debug("Instantiated {}", this.toStringImpl());
}
步骤如下:
- 使用上述构造函数创建 kafkaSpoutRetryService
- 使用 KafkaSpoutConfig 设置重试
KafkaSpoutConfig.builder(kafkaBootStrapServers, topic).setRetry(kafkaSpoutRetryService)
- 如果 Rest API (B) 中存在节流,则使 Bolt 失效
collector.fail(tuple)
这将根据步骤 1 和 2 中的重试配置设置向 spout 发出再次处理元组的信号
你的选项 D 听起来不错,但为了避免重复调用 API A,我认为你应该考虑将你的拓扑分成两部分。
有一个拓扑结构可以从您的原始 Kafka 主题(称为主题 1)读取,调用 REST API A,并将 bolt 的任何输出写回到 Kafka 主题(称为主题 2) ).
然后创建第二个拓扑,其唯一工作是从主题 2 读取,并调用 REST API B。
这将允许您使用选项 D,同时避免在饱和 API B 时额外调用 API A。您的拓扑结构看起来像
Kafka 1 -> Bolt A -> REST API A -> Kafka 2 Kafka 2 -> 螺栓 B -> REST API B
如果你想让解决方案对节流反应更灵敏一些,你可以使用 Storm 中的 topology.max.spout.pending
配置来限制同时运行的元组数量。然后你可以让你的 bolt B 缓冲飞行中的元组,直到节流到期,此时你可以让它尝试再次发送元组。您可以使用 OutputCollector.resetTupleTimeout
来避免元组在 Bolt B 等待限制到期时超时。可以使用tick tuples让Bolt B周期性的唤醒并检查节流是否过期