指定主题时使用直接交换的 RabbitMQ
RabbitMQ using Direct Exchange when Topic was specified
在我的申请中我有 3 类:
- 公司,雇用工人从事 3 项工作中的任何一项
- 工人,每人可以做2份工作
- 管理员,接收程序中所有消息的副本,并可以将消息发送给所有公司、所有员工或所有人
我对公司密钥使用 work.companies.companyName
,对员工密钥使用 work.workers.workerName
,它们都使用默认交换和队列进行通信。管理员收到带有 admin
主题交换的消息。
问题出在管理员 -> 其他人的沟通上。它的工作原理与直接交换完全一样——我可以给公司和工人取任何名字,甚至像“#”、"company1.#" 等,他们不会收到任何东西,除非在管理员中我明确地用像[这样的键发送消息=39=].
我希望能够只使用 e. G。 "work.companies.#" 向所有公司发送消息。我做错了什么?
Administrator.java:
public class Administrator
{
public static void main(String[] args) throws IOException, TimeoutException
{
new Thread(new TopicListener("admin", ign -> {})).start();
TopicWriter writer = new TopicWriter();
// lots of code
TopicListener.java:
public class TopicListener implements Runnable
{
private final String EXCHANGE_NAME = "space";
private String key;
private Consumer<String> msgHandler;
public TopicListener(String key, Consumer<String> msgHandler)
{
this.key = key;
this.msgHandler = msgHandler;
}
@Override
public void run()
{
try
{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, key);
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel)
{
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
{
String msg = new String(body, StandardCharsets.UTF_8);
System.out.println("Received: \"" + msg + "\"");
msgHandler.accept(msg);
}
};
channel.basicConsume(queueName, true, consumer);
}
catch (IOException | TimeoutException e)
{ e.printStackTrace(); }
}
}
TopicWriter.java:
public class TopicWriter
{
private final String EXCHANGE_NAME = "space";
private final Channel channel;
public TopicWriter() throws IOException, TimeoutException
{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
this.channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
}
public void send(String msg, String key) throws IOException
{
channel.basicPublish(
EXCHANGE_NAME,
key,
null,
msg.getBytes(StandardCharsets.UTF_8));
}
}
Company.java 包含:
new Thread(new TopicListener("space.agencies." + agencyID, ign -> {})).start();
Worker.java 包含:
new Thread(new TopicListener("space.carriers." + carrierID, consumer)).start();
我发现了问题所在:我试图 发送 消息给使用 Topic 的每个人,在 RabbitMQ 中 Topic 用于指定谁应该 接收 消息。 “#”或“*”应该在队列键声明中使用,而不是在使用给定键发送消息时使用。
在我的申请中我有 3 类:
- 公司,雇用工人从事 3 项工作中的任何一项
- 工人,每人可以做2份工作
- 管理员,接收程序中所有消息的副本,并可以将消息发送给所有公司、所有员工或所有人
我对公司密钥使用 work.companies.companyName
,对员工密钥使用 work.workers.workerName
,它们都使用默认交换和队列进行通信。管理员收到带有 admin
主题交换的消息。
问题出在管理员 -> 其他人的沟通上。它的工作原理与直接交换完全一样——我可以给公司和工人取任何名字,甚至像“#”、"company1.#" 等,他们不会收到任何东西,除非在管理员中我明确地用像[这样的键发送消息=39=].
我希望能够只使用 e. G。 "work.companies.#" 向所有公司发送消息。我做错了什么?
Administrator.java:
public class Administrator
{
public static void main(String[] args) throws IOException, TimeoutException
{
new Thread(new TopicListener("admin", ign -> {})).start();
TopicWriter writer = new TopicWriter();
// lots of code
TopicListener.java:
public class TopicListener implements Runnable
{
private final String EXCHANGE_NAME = "space";
private String key;
private Consumer<String> msgHandler;
public TopicListener(String key, Consumer<String> msgHandler)
{
this.key = key;
this.msgHandler = msgHandler;
}
@Override
public void run()
{
try
{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, key);
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel)
{
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
{
String msg = new String(body, StandardCharsets.UTF_8);
System.out.println("Received: \"" + msg + "\"");
msgHandler.accept(msg);
}
};
channel.basicConsume(queueName, true, consumer);
}
catch (IOException | TimeoutException e)
{ e.printStackTrace(); }
}
}
TopicWriter.java:
public class TopicWriter
{
private final String EXCHANGE_NAME = "space";
private final Channel channel;
public TopicWriter() throws IOException, TimeoutException
{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
this.channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
}
public void send(String msg, String key) throws IOException
{
channel.basicPublish(
EXCHANGE_NAME,
key,
null,
msg.getBytes(StandardCharsets.UTF_8));
}
}
Company.java 包含:
new Thread(new TopicListener("space.agencies." + agencyID, ign -> {})).start();
Worker.java 包含:
new Thread(new TopicListener("space.carriers." + carrierID, consumer)).start();
我发现了问题所在:我试图 发送 消息给使用 Topic 的每个人,在 RabbitMQ 中 Topic 用于指定谁应该 接收 消息。 “#”或“*”应该在队列键声明中使用,而不是在使用给定键发送消息时使用。