HazelcastJet kafka节流

HazelcastJet kafka throttling

我找不到任何在 hazelcast-jet-kafka 中创建管道的可能性,它将吞吐量限制为每个时间单位的特定元素数量,有人可以建议我可能的解决方案吗?我知道 alpaka (https://doc.akka.io/docs/alpakka-kafka/current/) 有这样的功能

你可以定义这个函数:

private <T, S extends GeneralStage<T>> FunctionEx<S, S> throttle(int itemsPerSecond) {
    // context for the mapUsingService stage
    class Service {
        final int ratePerSecond;
        final TreeMap<Long, Long> counts = new TreeMap<>();

        public Service(int ratePerSecond) {
            this.ratePerSecond = ratePerSecond;
        }
    }

    // factory for the service
    ServiceFactory<?, Service> serviceFactory = ServiceFactories
            .nonSharedService(procCtx ->
                    // divide the count for the actual number of processors we have
                    new Service(Math.max(1, itemsPerSecond / procCtx.totalParallelism())))
            // non-cooperative is needed because we sleep in the mapping function
            .toNonCooperative();

    return stage -> (S) stage
        .mapUsingService(serviceFactory,
            (ctx, item) -> {
                // current time in 10ths of a second
                long now = System.nanoTime() / 100_000_000;
                // include this item in the counts
                ctx.counts.merge(now, 1L, Long::sum);
                // clear items emitted more than second ago
                ctx.counts.headMap(now - 10, true).clear();
                long countInLastSecond =
                        ctx.counts.values().stream().mapToLong(Long::longValue).sum();
                // if we emitted too many items, sleep a while
                if (countInLastSecond > ctx.ratePerSecond) {
                    Thread.sleep(
                        (countInLastSecond - ctx.ratePerSecond) * 1000/ctx.ratePerSecond);
                }
                // now we can pass the item on
                return item;
            }
        );
}

然后用它在管道中节流:

Pipeline p = Pipeline.create();
p.readFrom(TestSources.items(IntStream.range(0, 2_000).boxed().toArray(Integer[]::new)))
 .apply(throttle(100))
 .writeTo(Sinks.noop());

上面的作业大约需要 20 秒才能完成,因为它有 2000 个项目并且速率限制为 100 items/s。速率是在最后一秒评估的,因此如果少于 100 items/s,将立即转发项目。如果一毫秒内有101条,则立即转发100条,休眠后转发下一条。

还要确保您的源代码已分发。该速率除以集群中的处理器数量,如果您的源未分布式且某些成员看不到任何数据,则您的总体速率将仅为所需速率的一小部分。