重新启动 Reactive Messaging,例如重新配置后

Restart Reactive Messaging, e.g. after reconfigure

我如何重新启动或stop/resume反应消息,例如更改间隔时间后? 此示例来自 Quarkus 指南:https://quarkus.io/guides/kafka-streams

@Outgoing("temperature-values")                             
public Flowable<KafkaRecord<Integer, String>> generate() {

    return Flowable.interval(500, TimeUnit.MILLISECONDS)    
            .onBackpressureDrop()
            .map(tick -> {
                WeatherStation station = stations.get(random.nextInt(stations.size()));
                double temperature = BigDecimal.valueOf(random.nextGaussian() * 15 + station.averageTemperature)
                        .setScale(1, RoundingMode.HALF_UP)
                        .doubleValue();

                LOG.infov("station: {0}, temperature: {1}", station.name, temperature);
                return KafkaRecord.of(station.id, Instant.now() + ";" + temperature);
            });
}

您可以尝试将 Flowable 替换为 Subject 作为选项,并使用 Flowable 将值输入 Subject 本身。然后,当你想要替换任何你需要的东西时,你将删除当前的 Flowable 并创建新的,这将提供 Subject

class YourClass {

    private Subject<KafkaRecord<Integer, String>> temperatureSubject = BehaviorSubject.create();
    private Disposable currentSubscription;

    public void setFlowable() {
        if(currentSubscription != null && !currentSubscription.isDisposed()) {
            currentSubscription.dispose();
        }
        currentSubscription = Flowable.interval(5, TimeUnit.SECONDS)
                .map(it -> {
                    WeatherStation station = stations.get(random.nextInt(stations.size()));
                    double temperature = BigDecimal.valueOf(random.nextGaussian() * 15 + station.averageTemperature)
                        .setScale(1, RoundingMode.HALF_UP)
                        .doubleValue();

                    LOG.infov("station: {0}, temperature: {1}", station.name, temperature);
                    return KafkaRecord.of(station.id, Instant.now() + ";" + temperature);
                }).subscribe(it -> {
                    temperatureSubject.onNext(it);
                });
    }

    @Outgoing("temperature-values")
    public Flowable<KafkaRecord<Integer, String>> generate() {
        return temperatureSubject.toFlowable(BackpressureStrategy.LATEST);
    }
}