在 RabbitMq 中使用主题和致谢

Using Topics and Acknowledgment with RabbitMq

我一直在尝试使用 RabbitMq 而不是使用 Kafka。我是使用 RabbitMq 的初学者。如何将此 KafkaListener 事件更改为 RabbitMQListener?

我一直想找出答案,但我找不到。我需要为 RabbitMQ 更改此侦听器。

制作人

     private final KafkaTemplate<String, Object> kafkaTemplate;
        
          public AccountEventProducer(KafkaTemplate<String, Object> kafkaTemplate) {
            this.kafkaTemplate = kafkaTemplate;
          }
        
          @Override
          public void produce(String topic, BaseEvent event) {
            this.kafkaTemplate.send(topic, event);
          }

消费者

            @KafkaListener(topics = "AccountOpenedEvent", groupId = "${spring.kafka.consumer.group-id}")
            @Override
            public void consume(AccountOpenedEvent event, Acknowledgment ack) {
              eventHandler.on(event);
              ack.acknowledge();
            }

谁能帮帮我?

To use rabbitmq on the producer side, follow the steps below:
    
1. The first step is to add dependencies:
    
   <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
     <version>2.5.5</version>
   </dependency>         
   <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-clenter code hereoud-starter-stream-rabbit</artifactId>
    <version>3.1.3</version>
   </dependency>

2. The second step is to configure the connection to rabbitmq in the yml file:

spring:
  rabbitmq:
    host: localhost
    password: guest
    port: 5672
    username: guest
    exchange: user.exchange
    queue: user.queue
    routingkey: user.routingkey

3. In the third step, create a class to configure the required beans :
@Configuration
public class ProducerConfig {   
@Bean
  public Queue queue(){
    return new Queue("user.queue", false);
  }
  @Bean
  public TopicExchange topicExchange(){
    return new TopicExchange ( "user.exchange" );
  }
  @Bean
  public Binding binding(Queue queue, TopicExchange topicExchange){
    return BindingBuilder.bind( queue).to( topicExchange).with( "user.routingkey" );
  }
  @Bean
  public MessageConverter jsonMessageConverter() {
    return new Jackson2JsonMessageConverter();
  }
  @Bean
  public AmqpTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
     final RabbitTemplate rabbitTemplate = new RabbitTemplate( connectionFactory);
     rabbitTemplate.setMessageConverter(jsonMessageConverter());
     return rabbitTemplate;
  }
}

5. The last step is to create a service on the producer side to submit a request to rabbitmq
@Autowired
  AmqpTemplate amqpTemplate; 
  public void send(Object requestEvent){
    amqpTemplate.convertAndSend( "user.exchange","user.routingkey",requestEvent );
    System.out.println("Send messages successfully.");
  }
}

To use rabbitmq on the consumer side, follow the steps below:

1. first do steps 1 and 2 of the producer side, after that  create a service  to read the message:
@RabbitListener(queues = "user.queue")
  public void getMessage(Object requestEvent){
    System.out.println(requestEvent.toString());
  }
2. The second step is create a class to configure the required beans
@Configuration
public class ConsumerConfig {
@Bean
public MessageConverter jsonMessageConverter() {
  return new Jackson2JsonMessageConverter();
}
}