HazelcastJet kafka节流
HazelcastJet kafka throttling
我找不到任何在 hazelcast-jet-kafka 中创建管道的可能性,它将吞吐量限制为每个时间单位的特定元素数量,有人可以建议我可能的解决方案吗?我知道 alpaka (https://doc.akka.io/docs/alpakka-kafka/current/) 有这样的功能
你可以定义这个函数:
private <T, S extends GeneralStage<T>> FunctionEx<S, S> throttle(int itemsPerSecond) {
// context for the mapUsingService stage
class Service {
final int ratePerSecond;
final TreeMap<Long, Long> counts = new TreeMap<>();
public Service(int ratePerSecond) {
this.ratePerSecond = ratePerSecond;
}
}
// factory for the service
ServiceFactory<?, Service> serviceFactory = ServiceFactories
.nonSharedService(procCtx ->
// divide the count for the actual number of processors we have
new Service(Math.max(1, itemsPerSecond / procCtx.totalParallelism())))
// non-cooperative is needed because we sleep in the mapping function
.toNonCooperative();
return stage -> (S) stage
.mapUsingService(serviceFactory,
(ctx, item) -> {
// current time in 10ths of a second
long now = System.nanoTime() / 100_000_000;
// include this item in the counts
ctx.counts.merge(now, 1L, Long::sum);
// clear items emitted more than second ago
ctx.counts.headMap(now - 10, true).clear();
long countInLastSecond =
ctx.counts.values().stream().mapToLong(Long::longValue).sum();
// if we emitted too many items, sleep a while
if (countInLastSecond > ctx.ratePerSecond) {
Thread.sleep(
(countInLastSecond - ctx.ratePerSecond) * 1000/ctx.ratePerSecond);
}
// now we can pass the item on
return item;
}
);
}
然后用它在管道中节流:
Pipeline p = Pipeline.create();
p.readFrom(TestSources.items(IntStream.range(0, 2_000).boxed().toArray(Integer[]::new)))
.apply(throttle(100))
.writeTo(Sinks.noop());
上面的作业大约需要 20 秒才能完成,因为它有 2000 个项目并且速率限制为 100 items/s。速率是在最后一秒评估的,因此如果少于 100 items/s,将立即转发项目。如果一毫秒内有101条,则立即转发100条,休眠后转发下一条。
还要确保您的源代码已分发。该速率除以集群中的处理器数量,如果您的源未分布式且某些成员看不到任何数据,则您的总体速率将仅为所需速率的一小部分。
我找不到任何在 hazelcast-jet-kafka 中创建管道的可能性,它将吞吐量限制为每个时间单位的特定元素数量,有人可以建议我可能的解决方案吗?我知道 alpaka (https://doc.akka.io/docs/alpakka-kafka/current/) 有这样的功能
你可以定义这个函数:
private <T, S extends GeneralStage<T>> FunctionEx<S, S> throttle(int itemsPerSecond) {
// context for the mapUsingService stage
class Service {
final int ratePerSecond;
final TreeMap<Long, Long> counts = new TreeMap<>();
public Service(int ratePerSecond) {
this.ratePerSecond = ratePerSecond;
}
}
// factory for the service
ServiceFactory<?, Service> serviceFactory = ServiceFactories
.nonSharedService(procCtx ->
// divide the count for the actual number of processors we have
new Service(Math.max(1, itemsPerSecond / procCtx.totalParallelism())))
// non-cooperative is needed because we sleep in the mapping function
.toNonCooperative();
return stage -> (S) stage
.mapUsingService(serviceFactory,
(ctx, item) -> {
// current time in 10ths of a second
long now = System.nanoTime() / 100_000_000;
// include this item in the counts
ctx.counts.merge(now, 1L, Long::sum);
// clear items emitted more than second ago
ctx.counts.headMap(now - 10, true).clear();
long countInLastSecond =
ctx.counts.values().stream().mapToLong(Long::longValue).sum();
// if we emitted too many items, sleep a while
if (countInLastSecond > ctx.ratePerSecond) {
Thread.sleep(
(countInLastSecond - ctx.ratePerSecond) * 1000/ctx.ratePerSecond);
}
// now we can pass the item on
return item;
}
);
}
然后用它在管道中节流:
Pipeline p = Pipeline.create();
p.readFrom(TestSources.items(IntStream.range(0, 2_000).boxed().toArray(Integer[]::new)))
.apply(throttle(100))
.writeTo(Sinks.noop());
上面的作业大约需要 20 秒才能完成,因为它有 2000 个项目并且速率限制为 100 items/s。速率是在最后一秒评估的,因此如果少于 100 items/s,将立即转发项目。如果一毫秒内有101条,则立即转发100条,休眠后转发下一条。
还要确保您的源代码已分发。该速率除以集群中的处理器数量,如果您的源未分布式且某些成员看不到任何数据,则您的总体速率将仅为所需速率的一小部分。