在 Kafka 中重放消息
Replay messages in Kafka
我在 spring 启动时使用 kafka,我正在尝试添加一项功能,允许我们启动服务并让它重播消息回到特定时间。
消费者是这样设置的
public interface ProductScenarioStream {
String SERVICE_REQUESTS_PRODUCT_PRICE = "serviceRequestsProductPrice";
String SERVICE_CONCLUDES_PRODUCT_SCENARIO = "serviceConcludesProductScenario";
@Output(SERVICE_REQUESTS_PRODUCT_PRICE)
MessageChannel serviceRequestsProductPrice();
@Input(SERVICE_CONCLUDES_PRODUCT_SCENARIO)
SubscribableChannel serviceConcludesProductScenario();
}
和
@Service
@EnableBinding(ProductScenarioStream.class)
@Profile("stream")
public class ProductStreamServiceImpl implements ProductStreamService
{
@Resource
private ProductScenarioStream productScenarioStream;
@Override
public void send(final ServiceRequestsProductPrice event) {
...
}
}
你知道我在哪里可以找到允许我在这种情况下倒回流上的偏移量的设置吗?
我假设你的意思是 replay
而不是 reply
- 我已经编辑了你的问题。
Spring Cloud Stream 当前未提供寻找偏移量的机制。
可以使用spring-kafka的@KafkaListener
代替;实施 ConsumerSeekAware
,它为您提供了在启动期间(或任何时间)寻找的机制。
我在 spring 启动时使用 kafka,我正在尝试添加一项功能,允许我们启动服务并让它重播消息回到特定时间。
消费者是这样设置的
public interface ProductScenarioStream {
String SERVICE_REQUESTS_PRODUCT_PRICE = "serviceRequestsProductPrice";
String SERVICE_CONCLUDES_PRODUCT_SCENARIO = "serviceConcludesProductScenario";
@Output(SERVICE_REQUESTS_PRODUCT_PRICE)
MessageChannel serviceRequestsProductPrice();
@Input(SERVICE_CONCLUDES_PRODUCT_SCENARIO)
SubscribableChannel serviceConcludesProductScenario();
}
和
@Service
@EnableBinding(ProductScenarioStream.class)
@Profile("stream")
public class ProductStreamServiceImpl implements ProductStreamService
{
@Resource
private ProductScenarioStream productScenarioStream;
@Override
public void send(final ServiceRequestsProductPrice event) {
...
}
}
你知道我在哪里可以找到允许我在这种情况下倒回流上的偏移量的设置吗?
我假设你的意思是 replay
而不是 reply
- 我已经编辑了你的问题。
Spring Cloud Stream 当前未提供寻找偏移量的机制。
可以使用spring-kafka的@KafkaListener
代替;实施 ConsumerSeekAware
,它为您提供了在启动期间(或任何时间)寻找的机制。