如何让 RabbitMQ 从队列中一个一个读取?
How do I get RabbitMQ to read from the queue one by one?
我只是在玩 RabbitMQ 并尝试在两个 C# 项目中设置测试发送方和接收方。
TestSender.cs
using System;
using RabbitMQ.Client;
using System.Text;
public class TestSender
{
public TestSender()
{
}
public static void Main()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "Test Queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
Console.WriteLine(" Press [S] to send a message, [Enter] to exit.");
ConsoleKey key;
int messageId = 0;
while (true)
{
key = Console.ReadKey(true).Key;
if (key == ConsoleKey.Enter)
break;
if (key == ConsoleKey.S)
{
string message = "Message " + (++messageId).ToString();
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
routingKey: "hello",
basicProperties: null,
body: body);
Console.WriteLine("Sent {0}", message);
}
}
}
}
}
}
TestReceiver.cs
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
public class TestReceiver
{
public TestReceiver()
{
}
public static void Main()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
// Declare the queue here because we might start the consumer before the publisher
// so the queue should exist before we try to consume messages from it.
channel.QueueDeclare(queue: "Test Queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
var consumer = new EventingBasicConsumer(channel);
Console.WriteLine(" Press [R] to receive a message, [Enter] to exit.");
// Register a consumer to listen to a specific queue.
channel.BasicConsume(queue: "Test Queue",
autoAck: true,
consumer: consumer);
ConsoleKey key;
while (true)
{
key = Console.ReadKey(true).Key;
if (key == ConsoleKey.Enter)
break;
if (key == ConsoleKey.R)
{
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine("Received {0}", message);
};
}
}
}
}
}
}
我将其设置为如果您按下 S
键,它会向队列发送一条消息,如果您按下 R
键,它会从队列中读取。发送有效,但接收绝对没有任何作用。我删除了 while
循环和按键代码以及接收工作。但是,我想弄清楚如何让消息一条一条地消费,而不是一次全部消费。另外,有谁知道RabbitMQ是否可以在队列中持久化消息,或者在消费时是否必须出队?
当您确认当前消息时,RabbitMQ 会向 consumer/subscriber 发送一条新消息。 RabbitMQ 将在队列中保留消息(仅限持久模式)直到未被确认。
从 channel.BasicConsume 函数中删除 autoAck: true 选项。使用 channel.BasicAck 函数在按下 R 键时确认消息。
我只是在玩 RabbitMQ 并尝试在两个 C# 项目中设置测试发送方和接收方。
TestSender.cs
using System;
using RabbitMQ.Client;
using System.Text;
public class TestSender
{
public TestSender()
{
}
public static void Main()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "Test Queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
Console.WriteLine(" Press [S] to send a message, [Enter] to exit.");
ConsoleKey key;
int messageId = 0;
while (true)
{
key = Console.ReadKey(true).Key;
if (key == ConsoleKey.Enter)
break;
if (key == ConsoleKey.S)
{
string message = "Message " + (++messageId).ToString();
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
routingKey: "hello",
basicProperties: null,
body: body);
Console.WriteLine("Sent {0}", message);
}
}
}
}
}
}
TestReceiver.cs
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
public class TestReceiver
{
public TestReceiver()
{
}
public static void Main()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
// Declare the queue here because we might start the consumer before the publisher
// so the queue should exist before we try to consume messages from it.
channel.QueueDeclare(queue: "Test Queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
var consumer = new EventingBasicConsumer(channel);
Console.WriteLine(" Press [R] to receive a message, [Enter] to exit.");
// Register a consumer to listen to a specific queue.
channel.BasicConsume(queue: "Test Queue",
autoAck: true,
consumer: consumer);
ConsoleKey key;
while (true)
{
key = Console.ReadKey(true).Key;
if (key == ConsoleKey.Enter)
break;
if (key == ConsoleKey.R)
{
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine("Received {0}", message);
};
}
}
}
}
}
}
我将其设置为如果您按下 S
键,它会向队列发送一条消息,如果您按下 R
键,它会从队列中读取。发送有效,但接收绝对没有任何作用。我删除了 while
循环和按键代码以及接收工作。但是,我想弄清楚如何让消息一条一条地消费,而不是一次全部消费。另外,有谁知道RabbitMQ是否可以在队列中持久化消息,或者在消费时是否必须出队?
当您确认当前消息时,RabbitMQ 会向 consumer/subscriber 发送一条新消息。 RabbitMQ 将在队列中保留消息(仅限持久模式)直到未被确认。
从 channel.BasicConsume 函数中删除 autoAck: true 选项。使用 channel.BasicAck 函数在按下 R 键时确认消息。