ACTIVEMQ + NMS无法同步接收
ACTIVEMQ + NMS cannot synchronously receive
我正在尝试将 activeMQ 与 NMS (C#) 消费者一起使用来获取消息,进行一些处理,然后通过 HttpClient.PostAsync() 将内容发送到 webserivce,所有 运行 windows 服务(通过 Topshelf)。
我正在与之通信的下游系统非常敏感,我正在使用单独的确认,以便我可以检查响应并通过确认或触发自定义重试(即不是 session.recover)来相应地采取行动。
由于下游系统不可靠,我一直在尝试几种不同的方法来降低消费者的吞吐量。我以为我可以通过转换为同步并使用预取来完成此操作,但它似乎没有用。
我的理解是,对于异步消费者,预取 'limit' 永远不会被命中,但是使用同步方法,预取队列只会在消息被确认时被吃掉,这意味着我可以调整我的监听器以通过以下游组件可以处理的速率发送消息。
队列加载了 100 条消息,并使用侦听器(即异步)启动我的代码,然后我可以成功记录 100 条消息已通过。
当我将其更改为使用 consumer.Receive()(或 ReceiveNoWait)时,我再也没有收到消息。
这是我为同步消费者尝试的片段,其中包含异步选项但已注释掉:
public Worker(LogWriter logger, ServiceConfiguration config, IConnectionFactory connectionFactory, IEndpointClient endpointClient)
{
log = logger;
configuration = config;
this.endpointClient = endpointClient;
connection = connectionFactory.CreateConnection();
connection.RedeliveryPolicy = GetRedeliveryPolicy();
connection.ExceptionListener += new ExceptionListener(OnException);
session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
queue = session.GetQueue(configuration.JmsConfig.SourceQueueName);
consumer = session.CreateConsumer(queue);
// Asynchronous
//consumer.Listener += new MessageListener(OnMessage);
// Synchronous
var message = consumer.Receive(TimeSpan.FromSeconds(5));
while (true)
{
if (!Equals(message, null))
{
OnMessage(message);
}
}
}
public void OnMessage(IMessage message)
{
log.DebugFormat("Message {count} Received. Attempt:{attempt}", message.Properties.GetInt("count"), message.Properties.GetInt("NMSXDeliveryCount"));
message.Acknowledge();
}
我认为您需要在 connection
上调用 Start()
,例如:
connection.Start();
呼叫 Start()
表示您希望消息流动。
还值得注意的是,除了从 OnMessage
.
抛出异常外,没有办法跳出 while(true)
循环
我正在尝试将 activeMQ 与 NMS (C#) 消费者一起使用来获取消息,进行一些处理,然后通过 HttpClient.PostAsync() 将内容发送到 webserivce,所有 运行 windows 服务(通过 Topshelf)。
我正在与之通信的下游系统非常敏感,我正在使用单独的确认,以便我可以检查响应并通过确认或触发自定义重试(即不是 session.recover)来相应地采取行动。
由于下游系统不可靠,我一直在尝试几种不同的方法来降低消费者的吞吐量。我以为我可以通过转换为同步并使用预取来完成此操作,但它似乎没有用。
我的理解是,对于异步消费者,预取 'limit' 永远不会被命中,但是使用同步方法,预取队列只会在消息被确认时被吃掉,这意味着我可以调整我的监听器以通过以下游组件可以处理的速率发送消息。
队列加载了 100 条消息,并使用侦听器(即异步)启动我的代码,然后我可以成功记录 100 条消息已通过。 当我将其更改为使用 consumer.Receive()(或 ReceiveNoWait)时,我再也没有收到消息。
这是我为同步消费者尝试的片段,其中包含异步选项但已注释掉:
public Worker(LogWriter logger, ServiceConfiguration config, IConnectionFactory connectionFactory, IEndpointClient endpointClient)
{
log = logger;
configuration = config;
this.endpointClient = endpointClient;
connection = connectionFactory.CreateConnection();
connection.RedeliveryPolicy = GetRedeliveryPolicy();
connection.ExceptionListener += new ExceptionListener(OnException);
session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
queue = session.GetQueue(configuration.JmsConfig.SourceQueueName);
consumer = session.CreateConsumer(queue);
// Asynchronous
//consumer.Listener += new MessageListener(OnMessage);
// Synchronous
var message = consumer.Receive(TimeSpan.FromSeconds(5));
while (true)
{
if (!Equals(message, null))
{
OnMessage(message);
}
}
}
public void OnMessage(IMessage message)
{
log.DebugFormat("Message {count} Received. Attempt:{attempt}", message.Properties.GetInt("count"), message.Properties.GetInt("NMSXDeliveryCount"));
message.Acknowledge();
}
我认为您需要在 connection
上调用 Start()
,例如:
connection.Start();
呼叫 Start()
表示您希望消息流动。
还值得注意的是,除了从 OnMessage
.
while(true)
循环