有没有办法以编程方式删除 ActiveMQ 作业计划?

Is there a way to programmatically delete an ActiveMQ Job Schedule?

我正在尝试删除 ActiveMQ 中的计划作业,但到目前为止没有成功。

使用 NMS API or Amqpnetlite(openwire lib 除外,因为该库未更新且不能在 netstandard/netcore 上使用)

用于使用 NMS 创建计划的示例代码,与使用 AMQP lib 完成的相同:

var factory = new Apache.NMS.ActiveMQ.ConnectionFactory(brokerUri);
IConnection connection = factory.CreateConnection(user, password);
connection.Start();

ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
IDestination dest = session.GetQueue(destination);
IMessageProducer producer = session.CreateProducer(dest);
producer.DeliveryMode = MsgDeliveryMode.NonPersistent;

var msg = session.CreateTextMessage("Sample text message");
msg.Properties.SetString("AMQ_SCHEDULED_CRON", "* * * * *");
producer.Send(msg);
connection.Close();

这部分在浏览器控制台中产生以下结果,这就是我愿意删除的内容:

我已阅读 , also active mq system constants 但无法删除时间表。还尝试浏览文档,但到目前为止找不到任何有用的东西

ActiveMQ 甚至支持管理计划的编程方式吗? AMQP 解决方案会很棒,但 NMS 也很受欢迎。

您可以通过 STOMP、AMQP 或仅从 JMS 客户端管理 ActiveMQ 中的计划作业。我已经 written about this before 展示了如何使用 ActiveMQ Java 客户端来做到这一点,但原理是一样的。您可以发送具有特定 headers 集的消息,该集将对预定的消息进行操作。

要浏览 collection 预定消息,您需要执行如下操作:

    Connection connection = createConnection();
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

    // Create the Browse Destination and the Reply To location
    Destination requestBrowse = session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
    Destination browseDest = session.createTemporaryQueue();

    // Create the "Browser"
    MessageConsumer browser = session.createConsumer(browseDest);

    connection.start();

    // Send the browse request
    MessageProducer producer = session.createProducer(requestBrowse);
    Message request = session.createMessage();
    request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION,
                              ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE);
    request.setJMSReplyTo(browseDest);
    producer.send(request);

    Message scheduled = browser.receive(5000);
    while (scheduled != null) {
        // Do something clever...
    }

返回的消息包含有关先前已添加的实际计划消息作业的信息。获取job Id可以让你完全取消该消息的传递。

要删除使用 Java 客户端、AMQP 客户端或其他协议客户端安排的计划消息发送,您需要执行以下操作:

    Message remove = session.createMessage();
    remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION,
            ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVE);
    remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID,
            scheduled.getStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID));
    producer.send(remove);

记录了使用调度程序时可以使用的全套消息 属性 值 here,在 AMQP 中,只需将每个消息的字符串文字用作应用程序 属性您使用远程作业 ID 设置的值,或者在 NMS 客户端中,它只是一个字符串键消息 属性,其中包含您要删除的作业 ID。

尽管通过 AMQP 执行此操作时有一个警告,那就是您需要确保代理正在使用 JMS 转换器?transport.transformer=jms"请参阅 AMQP documentation ActiveMQ 5。

接受的答案有效且正确。原来 scheduledMessage.NMSMessageId 没有保存调度程序 ID。

如果有人感兴趣,这里是干净的 C# 源代码:

var factory = new Apache.NMS.ActiveMQ.ConnectionFactory(brokerUri);
IConnection connection = factory.CreateConnection(userName, password);

var session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
var requestBrowse = session.GetTopic("ActiveMQ.Scheduler.Management");
var queue = session.GetQueue(queueName);
var consumer = session.CreateConsumer(queue);

connection.Start();

var producer = session.CreateProducer(requestBrowse);
var scheduledMessage = consumer.Receive(TimeSpan.FromSeconds(10));
if (scheduledMessage != null)
{
    // do check with persistent storage, if schedule is canceled, remove it:
    var remove = session.CreateMessage();
    // get prop names from : http://activemq.apache.org/maven/apidocs/constant-values.htm
    remove.Properties["AMQ_SCHEDULER_ACTION"] = "REMOVE";
    remove.Properties["scheduledJobId"] = scheduledMessage.Properties.GetString("scheduledJobId");
    producer.Send(remove);
}

producer.Close();
session.Close();
connection.Close();

流程是这样的:从某个队列中获取消息,如果满足某些条件,则完全放弃计划。