每一条消息都通过 RabbitMQ,应该通过每个

Every second message is passing by RabbitMQ, should pass each

我正在尝试 运行 Josh Long 演示文稿中的一些示例。我删除了一些我现在不需要的东西,因为我想评估 RabbitMQ 上的 Spring Cloud Stream。

不幸的是,每当我与我的出版商发布一些东西时,我的消费者每秒钟都会收到一条消息。我担心我的 PC 上隐藏了一些额外的消费者(不是杀死线程或 smth) - 所以我重新启动了 PC。我还创建了单独的 vhost 以确保它只是我的,而且每隔一秒的消息都会发送给消费者。

我的印象是有东西在后台消耗它,但它不应该。如果我 运行 有更多的发布者生成数据,消费者的输出可能开始看起来更好 - 就像那里涉及一些负载平衡。

所以,问题是 - 发布者-消费者应用程序的最小版本应该如何,其中使用 Spring Cloud Stream 每秒向频道发布消息?

消费者:

@EnableBinding(ServerChannels.class)
@SpringBootApplication
public class ReservationServiceApplication {

  public static void main(String[] args) {
    SpringApplication.run(ReservationServiceApplication.class, args);
  }

  @Bean
  IntegrationFlow inboundReservationFlow(ServerChannels channels) {

    return IntegrationFlows
            .from(channels.input())
            .handle((GenericHandler<String>) (reservationName, headers) -> {
              System.out.println(reservationName);
              return null;
            })
            .get();
  }
}

@Component
class StreamListenerComponent {

  @StreamListener("input")
  public void on(String reservationName) {
  }
}

interface ServerChannels {

  @Input
  SubscribableChannel input();

}

@NoArgsConstructor
@AllArgsConstructor
@Data
@Entity
class Reservation {

  public Reservation(String reservationName) {
    this.reservationName = reservationName;
  }

  @Id
  @GeneratedValue
  private Long id;

  private String reservationName; // reservation_name
}

出版商:

@IntegrationComponentScan
@EnableBinding(DeviceChannels.class)
@SpringBootApplication
public class VehicleClientApplication {

  private final EventWriter eventWriter;
  private final int randomNum;
  private int counter;

  VehicleClientApplication(EventWriter eventWriter) {
    this.eventWriter = eventWriter;
    Random rn = new Random();
    int range = 10;
    randomNum = rn.nextInt(range) + 10;

  }

  public static void main(String[] args) {
    SpringApplication.run(VehicleClientApplication.class, args);
  }

  @Scheduled(fixedRate = 50)
  public void checkRecords() {
    this.eventWriter
            .write(randomNum + " " + counter);
    System.out.println(counter);
    counter++;

  }
}

interface DeviceChannels {

  String OUTPUT = "output";

  @Output(OUTPUT)
  MessageChannel output();

}

@MessagingGateway
interface EventWriter {

  @Gateway(requestChannel = DeviceChannels.OUTPUT)
  void write(String rn);
}

输入:

0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

输出:

19 1
19 3
19 5
19 7
19 9
19 11
19 13
19 15
19 17
19 19
19 21
19 23
19 25

您在 input 频道上有 2 个消费者

@StreamListener("input")
public void on(String reservationName) {
}

 return IntegrationFlows
        .from(channels.input())

在流监听器中添加一个System.out,你就会看到。