Samza:将消息处理延迟到时间戳
Samza: Delay processing of messages until timestamp
我正在使用 Samza 处理来自 Kafka 主题的消息。有些消息带有未来的时间戳,我想将处理推迟到该时间戳之后。与此同时,我想继续处理其他收到的消息。
我尝试做的是让我的 Task
对消息进行排队并实施 WindowableTask
以定期检查消息(如果它们的时间戳允许处理它们)。基本思路如下:
public class MyTask implements StreamTask, WindowableTask {
private HashSet<MyMessage> waitingMessages = new HashSet<>();
@Override
public void process(IncomingMessageEnvelope incomingMessageEnvelope, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
byte[] message = (byte[]) incomingMessageEnvelope.getMessage();
MyMessage parsedMessage = MyMessage.parseFrom(message);
if (parsedMessage.getValidFromDateTime().isBeforeNow()) {
// Do the processing
} else {
waitingMessages.add(parsedMessage);
}
}
@Override
public void window(MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
for (MyMessage message : waitingMessages) {
if (message.getValidFromDateTime().isBeforeNow()) {
// Do the processing and remove the message from the set
}
}
}
}
这显然有一些缺点。当我重新部署我的任务时,我会在内存中丢失等待消息。所以我想知道用 Samza 延迟消息处理的最佳实践。我是否需要一次又一次地向同一个主题重新发送消息,直到我最终能够处理它们?我们在这里谈论的是将处理延迟几分钟最多 1-2 小时。
重要的是要记住,在处理消息队列时,它们在系统中执行非常特殊的功能:它们在处理器忙于处理前面的消息时保存消息。预期功能正常的消息队列将按需传送消息。这意味着一旦消息到达队列的头部,队列的下一次拉动就会产生消息。
请注意,延迟不是等式的可配置部分。相反,延迟是具有队列的系统的输出变量。事实上,Little's Law 对此提供了一些有趣的见解。
因此,在需要延迟的系统中(例如,要 join/wait 才能完成并行操作),您应该考虑其他方法。通常,可查询数据库在此特定实例中有意义。如果您发现自己将消息保存在队列中一段预先设定的时间,那么您实际上是在将消息队列用作数据库——它并非旨在提供此功能。这不仅有风险,而且很可能会损害消息代理的性能。
我认为您可以使用 Samza 的键值存储来保存任务实例的状态,而不是在内存中 Set
。
它应该看起来像:
public class MyTask implements StreamTask, WindowableTask, InitableTask {
private KeyValueStore<String, MyMessage> waitingMessages;
@SuppressWarnings("unchecked")
@Override
public void init(Config config, TaskContext context) throws Exception {
this.waitingMessages = (KeyValueStore<String, MyMessage>) context.getStore("messages-store");
}
@Override
public void process(IncomingMessageEnvelope incomingMessageEnvelope, MessageCollector messageCollector,
TaskCoordinator taskCoordinator) {
byte[] message = (byte[]) incomingMessageEnvelope.getMessage();
MyMessage parsedMessage = MyMessage.parseFrom(message);
if (parsedMessage.getValidFromDateTime().isBefore(LocalDate.now())) {
// Do the processing
} else {
waitingMessages.put(parsedMessage.getId(), parsedMessage);
}
}
@Override
public void window(MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
KeyValueIterator<String, MyMessage> all = waitingMessages.all();
while(all.hasNext()) {
MyMessage message = all.next().getValue();
// Do the processing and remove the message from the set
}
}
}
如果您重新部署任务,Samza 应该重新创建键值存储的状态(Samza 将值保存在与键值存储相关的特殊 kafka 主题中)。您当然需要提供一些额外的商店配置(在上面的示例中 messages-store
)。
您可以在此处阅读有关键值存储的信息(对于最新的 Samza 版本):
https://samza.apache.org/learn/documentation/0.14/container/state-management.html
我正在使用 Samza 处理来自 Kafka 主题的消息。有些消息带有未来的时间戳,我想将处理推迟到该时间戳之后。与此同时,我想继续处理其他收到的消息。
我尝试做的是让我的 Task
对消息进行排队并实施 WindowableTask
以定期检查消息(如果它们的时间戳允许处理它们)。基本思路如下:
public class MyTask implements StreamTask, WindowableTask {
private HashSet<MyMessage> waitingMessages = new HashSet<>();
@Override
public void process(IncomingMessageEnvelope incomingMessageEnvelope, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
byte[] message = (byte[]) incomingMessageEnvelope.getMessage();
MyMessage parsedMessage = MyMessage.parseFrom(message);
if (parsedMessage.getValidFromDateTime().isBeforeNow()) {
// Do the processing
} else {
waitingMessages.add(parsedMessage);
}
}
@Override
public void window(MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
for (MyMessage message : waitingMessages) {
if (message.getValidFromDateTime().isBeforeNow()) {
// Do the processing and remove the message from the set
}
}
}
}
这显然有一些缺点。当我重新部署我的任务时,我会在内存中丢失等待消息。所以我想知道用 Samza 延迟消息处理的最佳实践。我是否需要一次又一次地向同一个主题重新发送消息,直到我最终能够处理它们?我们在这里谈论的是将处理延迟几分钟最多 1-2 小时。
重要的是要记住,在处理消息队列时,它们在系统中执行非常特殊的功能:它们在处理器忙于处理前面的消息时保存消息。预期功能正常的消息队列将按需传送消息。这意味着一旦消息到达队列的头部,队列的下一次拉动就会产生消息。
请注意,延迟不是等式的可配置部分。相反,延迟是具有队列的系统的输出变量。事实上,Little's Law 对此提供了一些有趣的见解。
因此,在需要延迟的系统中(例如,要 join/wait 才能完成并行操作),您应该考虑其他方法。通常,可查询数据库在此特定实例中有意义。如果您发现自己将消息保存在队列中一段预先设定的时间,那么您实际上是在将消息队列用作数据库——它并非旨在提供此功能。这不仅有风险,而且很可能会损害消息代理的性能。
我认为您可以使用 Samza 的键值存储来保存任务实例的状态,而不是在内存中 Set
。
它应该看起来像:
public class MyTask implements StreamTask, WindowableTask, InitableTask {
private KeyValueStore<String, MyMessage> waitingMessages;
@SuppressWarnings("unchecked")
@Override
public void init(Config config, TaskContext context) throws Exception {
this.waitingMessages = (KeyValueStore<String, MyMessage>) context.getStore("messages-store");
}
@Override
public void process(IncomingMessageEnvelope incomingMessageEnvelope, MessageCollector messageCollector,
TaskCoordinator taskCoordinator) {
byte[] message = (byte[]) incomingMessageEnvelope.getMessage();
MyMessage parsedMessage = MyMessage.parseFrom(message);
if (parsedMessage.getValidFromDateTime().isBefore(LocalDate.now())) {
// Do the processing
} else {
waitingMessages.put(parsedMessage.getId(), parsedMessage);
}
}
@Override
public void window(MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
KeyValueIterator<String, MyMessage> all = waitingMessages.all();
while(all.hasNext()) {
MyMessage message = all.next().getValue();
// Do the processing and remove the message from the set
}
}
}
如果您重新部署任务,Samza 应该重新创建键值存储的状态(Samza 将值保存在与键值存储相关的特殊 kafka 主题中)。您当然需要提供一些额外的商店配置(在上面的示例中 messages-store
)。
您可以在此处阅读有关键值存储的信息(对于最新的 Samza 版本): https://samza.apache.org/learn/documentation/0.14/container/state-management.html