ActiveMQ/STOMP 清除指向目的地的计划消息
ActiveMQ/STOMP Clear Schedule Messages Pointed To Destination
我想删除计划发送到特定 queue 的邮件,但我发现该过程不必要地繁琐。
我在这里向 queue 延迟发送一条空白消息:
self._connection.send(body="test", destination=f"/queue/my-queue", headers={
"AMQ_SCHEDULED_DELAY": 100_000_000,
"foo": "bar"
})
在这里,我想清除 queue:
的预定消息
self._connection.send(destination=f"ActiveMQ.Scheduler.Management", headers={
"AMQ_SCHEDULER_ACTION": "REMOVEALL",
}, body="")
当然这里的“目的地”需要是ActiveMQ.Scheduler.Management
而不是我实际的queue。但无论如何我都找不到删除发往 queue/my-queue
的预定消息。我尝试使用 selector
header,但这似乎不适用于 AMQ_SCHEDULER_ACTION
类型的消息。
我看到的唯一建议是编写一个消费者来浏览所有计划的消息,检查每个消息的目的地,并通过其 ID 删除每个计划。这对我来说似乎很疯狂,因为我不仅有几条消息,还有数百万条消息我想删除。
有没有一种方法可以向 ActiveMQ 发送命令以清除具有自定义 header 值的预定消息?
也许我可以为每个 queue 定义一个自定义的预定消息位置?
编辑:
我已经围绕 stomp.py
连接编写了一个包装器来处理发往 queue 的清除计划。 MQStompFacade
采用现有的 stomp.Connection
和您正在使用的 queue 的名称,并提供 enqueue
、enqueue_many
、receive
、purge
, 和 move
.
从 queue 接收时,如果 include_delayed
是 True
,它将同时订阅 queue 和使用计划的主题。假设消息是用 class 发送的 queued 并且将原始目的地 queue 的名称作为自定义 header,预定的消息不是用于接收queue 将被过滤掉。
尚未在生产中进行测试。可能这里有很多优化。
用法:
stomp = MQStompFacade(connection, "my-queue")
stomp.enqueue_many([
EnqueueRequest(message="hello"),
EnqueueRequest(message="goodbye", delay=100_000)
])
stomp.purge() # <- removes queued and scheduled messages destined for "/queues/my-queue"
class MQStompFacade (ConnectionListener):
def __init__(self, connection: Connection, queue: str):
self._connection = connection
self._queue = queue
self._messages: List[Message] = []
self._connection_id = rand_string(6)
self._connection.set_listener(self._connection_id, self)
def __del__(self):
self._connection.remove_listener(self._connection_id)
def enqueue_many(self, requests: List[EnqueueRequest]):
txid = self._connection.begin()
for request in requests:
headers = request.headers or {}
# Used in scheduled message selectors
headers["queue"] = self._queue
if request.delay_millis:
headers['AMQ_SCHEDULED_DELAY'] = request.delay_millis
if request.priority is not None:
headers['priority'] = request.priority
self._connection.send(body=request.message,
destination=f"/queue/{self._queue}",
txid=txid,
headers=headers)
self._connection.commit(txid)
def enqueue(self, request: EnqueueRequest):
self.enqueue_many([request])
def purge(self, selector: Optional[str] = None):
num_purged = 0
for _ in self.receive(idle_timeout=5, selector=selector):
num_purged += 1
return num_purged
def move(self, destination_queue: AbstractQueueFacade,
selector: Optional[str] = None):
buffer_size = 500
move_buffer = []
for message in self.receive(idle_timeout=5, selector=selector):
move_buffer.append(EnqueueRequest(
message=message.body
))
if len(move_buffer) >= buffer_size:
destination_queue.enqueue_many(move_buffer)
move_buffer = []
if move_buffer:
destination_queue.enqueue_many(move_buffer)
def receive(self,
max: Optional[int] = None,
timeout: Optional[int] = None,
idle_timeout: Optional[int] = None,
selector: Optional[str] = None,
peek: Optional[bool] = False,
include_delayed: Optional[bool] = False):
"""
Receiving messages until one of following conditions are met
Args:
max: Receive messages until the [max] number of messages are received
timeout: Receive message until this timeout is reached
idle_timeout (seconds): Receive messages until the queue is idle for this amount of time
selector: JMS selector that can be applied to message headers. See https://activemq.apache.org/selector
peek: Set to TRUE to disable automatic ack on matched criteria. Peeked messages will remain the queue
include_delayed: Set to TRUE to return messages scheduled for delivery in the future
"""
self._connection.subscribe(f"/queue/{self._queue}",
id=self._connection_id,
ack="client",
selector=selector
)
if include_delayed:
browse_topic = f"topic/scheduled_{self._queue}_{rand_string(6)}"
schedule_selector = f"queue = '{self._queue}'"
if selector:
schedule_selector = f"{schedule_selector} AND ({selector})"
self._connection.subscribe(browse_topic,
id=self._connection_id,
ack="auto",
selector=schedule_selector
)
self._connection.send(
destination=f"ActiveMQ.Scheduler.Management",
headers={
"AMQ_SCHEDULER_ACTION": "BROWSE",
"JMSReplyTo": browse_topic
},
id=self._connection_id,
body=""
)
listen_start = time.time()
last_receive = time.time()
messages_received = 0
scanning = True
empty_receive = False
while scanning:
try:
message = self._messages.pop()
last_receive = time.time()
if not peek:
self._ack(message)
messages_received += 1
yield message
except IndexError:
empty_receive = True
time.sleep(0.1)
if max and messages_received >= max:
scanning = False
elif timeout and time.time() > listen_start + timeout:
scanning = False
elif empty_receive and idle_timeout and time.time() > last_receive + idle_timeout:
scanning = False
else:
scanning = True
self._connection.unsubscribe(id=self._connection_id)
def on_message(self, frame):
destination = frame.headers.get("original-destination", frame.headers.get("destination"))
schedule_id = frame.headers.get("scheduledJobId")
message = Message(
attributes=MessageAttributes(
id=frame.headers["message-id"],
schedule_id=schedule_id,
timestamp=frame.headers["timestamp"],
queue=destination.replace("/queue/", "")
),
body=frame.body
)
self._messages.append(message)
def _ack(self, message: Message):
"""
Deletes the message from queue.
If the message has an scheduled_id, will also remove the associated scheduled job
"""
if message.attributes.schedule_id:
self._connection.send(
destination=f"ActiveMQ.Scheduler.Management",
headers={
"AMQ_SCHEDULER_ACTION": "REMOVE",
"scheduledJobId": message.attributes.schedule_id
},
id=self._connection_id,
body=""
)
self._connection.ack(message.attributes.id, subscription=self._connection_id)
要删除特定消息,您需要知道可以通过浏览预定消息获得的 ID。唯一可用的其他选项是使用删除操作中的开始和停止时间选项来删除范围内的所有邮件。
MessageProducer producer = session.createProducer(management);
Message request = session.createMessage();
request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);
request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME, Long.toString(start));
request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME, Long.toString(end));
producer.send(request);
如果这不符合您的需要,我相信该项目会欢迎您的贡献。
我想删除计划发送到特定 queue 的邮件,但我发现该过程不必要地繁琐。
我在这里向 queue 延迟发送一条空白消息:
self._connection.send(body="test", destination=f"/queue/my-queue", headers={
"AMQ_SCHEDULED_DELAY": 100_000_000,
"foo": "bar"
})
在这里,我想清除 queue:
的预定消息self._connection.send(destination=f"ActiveMQ.Scheduler.Management", headers={
"AMQ_SCHEDULER_ACTION": "REMOVEALL",
}, body="")
当然这里的“目的地”需要是ActiveMQ.Scheduler.Management
而不是我实际的queue。但无论如何我都找不到删除发往 queue/my-queue
的预定消息。我尝试使用 selector
header,但这似乎不适用于 AMQ_SCHEDULER_ACTION
类型的消息。
我看到的唯一建议是编写一个消费者来浏览所有计划的消息,检查每个消息的目的地,并通过其 ID 删除每个计划。这对我来说似乎很疯狂,因为我不仅有几条消息,还有数百万条消息我想删除。
有没有一种方法可以向 ActiveMQ 发送命令以清除具有自定义 header 值的预定消息?
也许我可以为每个 queue 定义一个自定义的预定消息位置?
编辑:
我已经围绕 stomp.py
连接编写了一个包装器来处理发往 queue 的清除计划。 MQStompFacade
采用现有的 stomp.Connection
和您正在使用的 queue 的名称,并提供 enqueue
、enqueue_many
、receive
、purge
, 和 move
.
从 queue 接收时,如果 include_delayed
是 True
,它将同时订阅 queue 和使用计划的主题。假设消息是用 class 发送的 queued 并且将原始目的地 queue 的名称作为自定义 header,预定的消息不是用于接收queue 将被过滤掉。
尚未在生产中进行测试。可能这里有很多优化。
用法:
stomp = MQStompFacade(connection, "my-queue")
stomp.enqueue_many([
EnqueueRequest(message="hello"),
EnqueueRequest(message="goodbye", delay=100_000)
])
stomp.purge() # <- removes queued and scheduled messages destined for "/queues/my-queue"
class MQStompFacade (ConnectionListener):
def __init__(self, connection: Connection, queue: str):
self._connection = connection
self._queue = queue
self._messages: List[Message] = []
self._connection_id = rand_string(6)
self._connection.set_listener(self._connection_id, self)
def __del__(self):
self._connection.remove_listener(self._connection_id)
def enqueue_many(self, requests: List[EnqueueRequest]):
txid = self._connection.begin()
for request in requests:
headers = request.headers or {}
# Used in scheduled message selectors
headers["queue"] = self._queue
if request.delay_millis:
headers['AMQ_SCHEDULED_DELAY'] = request.delay_millis
if request.priority is not None:
headers['priority'] = request.priority
self._connection.send(body=request.message,
destination=f"/queue/{self._queue}",
txid=txid,
headers=headers)
self._connection.commit(txid)
def enqueue(self, request: EnqueueRequest):
self.enqueue_many([request])
def purge(self, selector: Optional[str] = None):
num_purged = 0
for _ in self.receive(idle_timeout=5, selector=selector):
num_purged += 1
return num_purged
def move(self, destination_queue: AbstractQueueFacade,
selector: Optional[str] = None):
buffer_size = 500
move_buffer = []
for message in self.receive(idle_timeout=5, selector=selector):
move_buffer.append(EnqueueRequest(
message=message.body
))
if len(move_buffer) >= buffer_size:
destination_queue.enqueue_many(move_buffer)
move_buffer = []
if move_buffer:
destination_queue.enqueue_many(move_buffer)
def receive(self,
max: Optional[int] = None,
timeout: Optional[int] = None,
idle_timeout: Optional[int] = None,
selector: Optional[str] = None,
peek: Optional[bool] = False,
include_delayed: Optional[bool] = False):
"""
Receiving messages until one of following conditions are met
Args:
max: Receive messages until the [max] number of messages are received
timeout: Receive message until this timeout is reached
idle_timeout (seconds): Receive messages until the queue is idle for this amount of time
selector: JMS selector that can be applied to message headers. See https://activemq.apache.org/selector
peek: Set to TRUE to disable automatic ack on matched criteria. Peeked messages will remain the queue
include_delayed: Set to TRUE to return messages scheduled for delivery in the future
"""
self._connection.subscribe(f"/queue/{self._queue}",
id=self._connection_id,
ack="client",
selector=selector
)
if include_delayed:
browse_topic = f"topic/scheduled_{self._queue}_{rand_string(6)}"
schedule_selector = f"queue = '{self._queue}'"
if selector:
schedule_selector = f"{schedule_selector} AND ({selector})"
self._connection.subscribe(browse_topic,
id=self._connection_id,
ack="auto",
selector=schedule_selector
)
self._connection.send(
destination=f"ActiveMQ.Scheduler.Management",
headers={
"AMQ_SCHEDULER_ACTION": "BROWSE",
"JMSReplyTo": browse_topic
},
id=self._connection_id,
body=""
)
listen_start = time.time()
last_receive = time.time()
messages_received = 0
scanning = True
empty_receive = False
while scanning:
try:
message = self._messages.pop()
last_receive = time.time()
if not peek:
self._ack(message)
messages_received += 1
yield message
except IndexError:
empty_receive = True
time.sleep(0.1)
if max and messages_received >= max:
scanning = False
elif timeout and time.time() > listen_start + timeout:
scanning = False
elif empty_receive and idle_timeout and time.time() > last_receive + idle_timeout:
scanning = False
else:
scanning = True
self._connection.unsubscribe(id=self._connection_id)
def on_message(self, frame):
destination = frame.headers.get("original-destination", frame.headers.get("destination"))
schedule_id = frame.headers.get("scheduledJobId")
message = Message(
attributes=MessageAttributes(
id=frame.headers["message-id"],
schedule_id=schedule_id,
timestamp=frame.headers["timestamp"],
queue=destination.replace("/queue/", "")
),
body=frame.body
)
self._messages.append(message)
def _ack(self, message: Message):
"""
Deletes the message from queue.
If the message has an scheduled_id, will also remove the associated scheduled job
"""
if message.attributes.schedule_id:
self._connection.send(
destination=f"ActiveMQ.Scheduler.Management",
headers={
"AMQ_SCHEDULER_ACTION": "REMOVE",
"scheduledJobId": message.attributes.schedule_id
},
id=self._connection_id,
body=""
)
self._connection.ack(message.attributes.id, subscription=self._connection_id)
要删除特定消息,您需要知道可以通过浏览预定消息获得的 ID。唯一可用的其他选项是使用删除操作中的开始和停止时间选项来删除范围内的所有邮件。
MessageProducer producer = session.createProducer(management);
Message request = session.createMessage();
request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);
request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME, Long.toString(start));
request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME, Long.toString(end));
producer.send(request);
如果这不符合您的需要,我相信该项目会欢迎您的贡献。