RabbitListener 注释上的动态路由键
Dynamic Routing key on RabbitListener Annotation
我需要为每个登录到应用程序的用户创建一个链接到 Direct Exchange 的队列。地下室路由将是 'user_' + userId.
也就是说,每次我通过用户管理队列收到一个用户登录的消息。实例化一个范围为 'prototype' 的 bean,它包含一个用 RabbitListener 注释的方法来声明它的队列。对于这个 bean,我传递了 userId 以便能够配置队列的名称和 routingKey。但是由于循环引用错误,我无法在 Spel 表达式中访问这个实例变量。
这里我放了声明队列的bean:
@Component("usersHandler")
@Scope(value = "prototype")
public class UsersHandler {
private static Logger logger = LoggerFactory.getLogger(UsersHandler.class);
private Long userId;
public UsersHandler(Long userId) {
this.userId = userId;
}
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
@RabbitListener(bindings
= @QueueBinding(
value = @Queue(
value = "#{'queue_'.concat(usersHandler.userId)}",
durable = "false",
autoDelete = "true",
arguments = {
@Argument(
name = "x-message-ttl",
value = "#{rabbitCustomProperties.directExchange.queueArguments['x-message-ttl']}",
type = "java.lang.Integer"
)
,
@Argument(
name = "x-expires",
value = "#{rabbitCustomProperties.directExchange.queueArguments['x-expires']}",
type = "java.lang.Integer"
)
,
@Argument(
name = "x-dead-letter-exchange",
value = "#{rabbitCustomProperties.directExchange.queueArguments['x-dead-letter-exchange']}",
type = "java.lang.String"
)
}
),
exchange = @Exchange(
value = "#{rabbitCustomProperties.directExchange.name}",
type = ExchangeTypes.DIRECT,
durable = "#{rabbitCustomProperties.directExchange.durable}",
autoDelete = "#{rabbitCustomProperties.directExchange.autoDelete}",
arguments = {
@Argument(
name = "alternate-exchange",
value = "#{rabbitCustomProperties.directExchange.arguments['alternate-exchange']}",
type = "java.lang.String"
)
}
),
key = "#{'user_'.concat(usersHandler.userId)}")
)
public void handleMessage(@Payload Notification notification) {
logger.info("Notification Received : " + notification);
}
}
这是负责创建与登录用户一样多的 UserHandler 的另一个 bean:
@Component("adminHandler")
public class AdminHandler implements UsersManadgementVisitor {
@Autowired
private ApplicationContext appCtx;
private Map<Long, UsersHandler> handlers = new HashMap<Long, UsersHandler>();
private static Logger logger = LoggerFactory.getLogger(AdminHandler.class);
@RabbitListener(queues="#{rabbitCustomProperties.adminExchange.queues['users'].name}")
public void handleMessage(@Payload UsersManadgementMessage message) {
logger.info("Message -> " + message);
message.getType().accept(this, message.getId());
}
@Override
public void visitUserConnected(Long idUser) {
logger.info("Declare new queue for user: " + idUser );
UsersHandler userHandler = appCtx.getBean(UsersHandler.class, idUser);
handlers.put(idUser, userHandler);
}
@Override
public void visitUserDisconnected(Long idUser) {
logger.info("Remove queue for user: " + idUser );
handlers.remove(idUser);
}
}
我的问题是:
如何使变量 userId 在 SpEL 表达式的计算上下文中可用?
您可以使用 ThreadLocal
和 T
运算符...
@SpringBootApplication
public class So43717710Application {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(So43717710Application.class, args);
UserHolder.setUser("someUser");
context.getBean(Listener.class);
UserHolder.clearUser();
context.getBean(RabbitTemplate.class).convertAndSend("foo", "user_someUser", "bar");
Thread.sleep(5000);
context.close();
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public Listener listener() {
return new Listener();
}
public static class Listener {
@RabbitListener(bindings = @QueueBinding(value = @Queue("#{'queue_' + T(com.example.UserHolder).getUser()}"),
exchange = @Exchange(value = "foo"),
key = "#{'user_' + T(com.example.UserHolder).getUser()}"))
public void listen(String in) {
System.out.println(in);
}
}
}
public class UserHolder {
private static final ThreadLocal<String> user = new ThreadLocal<String>();
public static void setUser(String userId) {
user.set(userId);
}
public static String getUser() {
return user.get();
}
public static void clearUser() {
user.remove();
}
}
如果 ThreadLocal
在 @Bean
中,您可以使用 bean 引用...
@SpringBootApplication
public class So43717710Application {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(So43717710Application.class, args);
UserHolder.setUser("someUser");
context.getBean(Listener.class);
UserHolder.clearUser();
context.getBean(RabbitTemplate.class).convertAndSend("foo", "user_someUser", "bar");
Thread.sleep(5000);
context.close();
}
@Bean
public UserHolder holder() {
return new UserHolder();
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public Listener listener() {
return new Listener();
}
public static class Listener {
@RabbitListener(bindings = @QueueBinding(value = @Queue("#{'queue_' + @holder.user}"),
exchange = @Exchange(value = "foo"),
key = "#{'user_' + @holder.user}"))
public void listen(String in) {
System.out.println(in);
}
}
}
我需要为每个登录到应用程序的用户创建一个链接到 Direct Exchange 的队列。地下室路由将是 'user_' + userId.
也就是说,每次我通过用户管理队列收到一个用户登录的消息。实例化一个范围为 'prototype' 的 bean,它包含一个用 RabbitListener 注释的方法来声明它的队列。对于这个 bean,我传递了 userId 以便能够配置队列的名称和 routingKey。但是由于循环引用错误,我无法在 Spel 表达式中访问这个实例变量。
这里我放了声明队列的bean:
@Component("usersHandler")
@Scope(value = "prototype")
public class UsersHandler {
private static Logger logger = LoggerFactory.getLogger(UsersHandler.class);
private Long userId;
public UsersHandler(Long userId) {
this.userId = userId;
}
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
@RabbitListener(bindings
= @QueueBinding(
value = @Queue(
value = "#{'queue_'.concat(usersHandler.userId)}",
durable = "false",
autoDelete = "true",
arguments = {
@Argument(
name = "x-message-ttl",
value = "#{rabbitCustomProperties.directExchange.queueArguments['x-message-ttl']}",
type = "java.lang.Integer"
)
,
@Argument(
name = "x-expires",
value = "#{rabbitCustomProperties.directExchange.queueArguments['x-expires']}",
type = "java.lang.Integer"
)
,
@Argument(
name = "x-dead-letter-exchange",
value = "#{rabbitCustomProperties.directExchange.queueArguments['x-dead-letter-exchange']}",
type = "java.lang.String"
)
}
),
exchange = @Exchange(
value = "#{rabbitCustomProperties.directExchange.name}",
type = ExchangeTypes.DIRECT,
durable = "#{rabbitCustomProperties.directExchange.durable}",
autoDelete = "#{rabbitCustomProperties.directExchange.autoDelete}",
arguments = {
@Argument(
name = "alternate-exchange",
value = "#{rabbitCustomProperties.directExchange.arguments['alternate-exchange']}",
type = "java.lang.String"
)
}
),
key = "#{'user_'.concat(usersHandler.userId)}")
)
public void handleMessage(@Payload Notification notification) {
logger.info("Notification Received : " + notification);
}
}
这是负责创建与登录用户一样多的 UserHandler 的另一个 bean:
@Component("adminHandler")
public class AdminHandler implements UsersManadgementVisitor {
@Autowired
private ApplicationContext appCtx;
private Map<Long, UsersHandler> handlers = new HashMap<Long, UsersHandler>();
private static Logger logger = LoggerFactory.getLogger(AdminHandler.class);
@RabbitListener(queues="#{rabbitCustomProperties.adminExchange.queues['users'].name}")
public void handleMessage(@Payload UsersManadgementMessage message) {
logger.info("Message -> " + message);
message.getType().accept(this, message.getId());
}
@Override
public void visitUserConnected(Long idUser) {
logger.info("Declare new queue for user: " + idUser );
UsersHandler userHandler = appCtx.getBean(UsersHandler.class, idUser);
handlers.put(idUser, userHandler);
}
@Override
public void visitUserDisconnected(Long idUser) {
logger.info("Remove queue for user: " + idUser );
handlers.remove(idUser);
}
}
我的问题是:
如何使变量 userId 在 SpEL 表达式的计算上下文中可用?
您可以使用 ThreadLocal
和 T
运算符...
@SpringBootApplication
public class So43717710Application {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(So43717710Application.class, args);
UserHolder.setUser("someUser");
context.getBean(Listener.class);
UserHolder.clearUser();
context.getBean(RabbitTemplate.class).convertAndSend("foo", "user_someUser", "bar");
Thread.sleep(5000);
context.close();
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public Listener listener() {
return new Listener();
}
public static class Listener {
@RabbitListener(bindings = @QueueBinding(value = @Queue("#{'queue_' + T(com.example.UserHolder).getUser()}"),
exchange = @Exchange(value = "foo"),
key = "#{'user_' + T(com.example.UserHolder).getUser()}"))
public void listen(String in) {
System.out.println(in);
}
}
}
public class UserHolder {
private static final ThreadLocal<String> user = new ThreadLocal<String>();
public static void setUser(String userId) {
user.set(userId);
}
public static String getUser() {
return user.get();
}
public static void clearUser() {
user.remove();
}
}
如果 ThreadLocal
在 @Bean
中,您可以使用 bean 引用...
@SpringBootApplication
public class So43717710Application {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(So43717710Application.class, args);
UserHolder.setUser("someUser");
context.getBean(Listener.class);
UserHolder.clearUser();
context.getBean(RabbitTemplate.class).convertAndSend("foo", "user_someUser", "bar");
Thread.sleep(5000);
context.close();
}
@Bean
public UserHolder holder() {
return new UserHolder();
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public Listener listener() {
return new Listener();
}
public static class Listener {
@RabbitListener(bindings = @QueueBinding(value = @Queue("#{'queue_' + @holder.user}"),
exchange = @Exchange(value = "foo"),
key = "#{'user_' + @holder.user}"))
public void listen(String in) {
System.out.println(in);
}
}
}