Kafka 是否支持主题或消息的优先级?

Does Kafka support priority for topic or message?

我正在探索 Kafka 是否支持优先处理任何队列或消息的事实。

好像不支持这样的东西。我用谷歌搜索并找到了这个也支持这个的邮件存档: http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201206.mbox/%3CCAOeJiJhVHsr=d6aSTihPsqWVg6vK5xYLam6yMDcd6UAUoXf-DQ@mail.gmail.com%3E

这里有人配置了 Kafka 来优先处理任何主题或消息吗?

Kafka 是一种快速、可扩展、本质上分布式的设计、分区和复制的提交日志service.So主题或消息没有优先级。

我也面临同样的问题,你 have.Solution 在 kafka 队列中有非常 simple.Create 个主题,比方说:

  1. high_priority_queue

  2. medium_priority_queue

  3. low_priority_queue

在 high_priority_queue 中发布高优先级消息,在 medium_priority_queue 中发布中优先级消息。

现在您可以为所有主题创建 kafka 消费者和打开流。

  // this is scala code 
  val props = new Properties()
  props.put("group.id", groupId)
  props.put("zookeeper.connect", zookeeperConnect)
  val config = new ConsumerConfig(props)
  val connector = Consumer.create(config)
  val topicWithStreamCount = Map(
       "high_priority_queue" -> 1,
       "medium_priority_queue" ->  1, 
       "low_priority_queue" -> 1
  )
  val streamsMap = connector.createMessageStreams(topicWithStreamCount)

您获得每个 topic.Now 的流,如果主题没有任何消息,您可以先阅读 high_priority 主题,然后回退到 medium_priority_queue 主题。如果 medium_priority_queue 为空,则读取 low_priority 队列。

这个技巧很好用 me.May 对你有帮助!!

您需要有一个单独的主题并根据它们的优先级进行流式处理

您可以查看priority-kafka-client 主题中的优先消费。

基本思路如下(README的copy/pasting部分):

在此上下文中,优先级是一个正整数 (N),优先级为 0 < 1 < ... < N-1

PriorityKafkaProducer (implements org.apache.kafka.clients.producer.Producer):

该实现接受了一个额外的优先级 Future<RecordMetadata> send(int priority, ProducerRecord<K, V> record) arg。这是在该优先级上生成记录的指示。 Future<RecordMetadata> send(int priority, ProducerRecord<K, V> record) 默认在最低优先级 0 上生成记录。对于每个逻辑主题 XYZ - 优先级 0 <= i < N 由 Kafka 主题支持 XYZ-i

CapacityBurstPriorityKafkaConsumer (implements org.apache.kafka.clients.consumer.Consumer):

实现为每个优先级 0 <= i < N 维护一个 KafkaConsumer。对于每个逻辑主题 XYZ 和逻辑​​组 ID ABC - 优先级 0 <= i < N 消费者绑定到 Kafka 主题 XYZ-i组 ID ABC-i。这与 PriorityKafkaProducer 协同工作。

max.poll.records 属性 根据 maxPollRecordsDistributor 在优先主题消费者之间拆分 - 默认为 ExpMaxPollRecordsDistributor。其余的 KafkaConsumer 配置按原样传递给每个优先主题消费者。在定义 max.partition.fetch.bytesfetch.max.bytesmax.poll.interval.ms 时必须小心,因为这些值将按原样用于所有优先主题消费者。

致力于将 max.poll.records 属性 分配给每个优先主题消费者作为其预留容量的想法。从配置了分布式 max.poll.records 值的所有优先级主题消费者中按顺序获取记录。分发必须为更高的优先级保留更高的容量或处理率。

警告 1 - 如果我们在优先级主题中有倾斜分区,例如优先级为 2 的分区中有 10K 条记录,优先级为 1 的分区中有 100 条记录,优先级为 0 的分区中有 10 条记录分配给不同的消费者线程,那么实现将不会在这些消费者之间同步以调节容量,因此将无法遵守优先级.因此,生产者必须确保没有偏斜的分区(例如,使用循环法——这 "may" 意味着没有消息排序假设,消费者可以选择通过分离获取和处理问题来并行处理记录)。

警告 2 - 如果我们在优先级主题中有空分区,例如分配的优先级 2 和 1 分区中没有未决记录,优先级 0 分区中的 10K 记录分配给同一消费者线程,那么我们希望优先级 0 主题分区消费者将其容量突发到 max.poll.records 而不是限制自己到它的预留容量基于 maxPollRecordsDistributor 否则总容量将未被充分利用。

此实现将尝试解决上述注意事项。每个消费者对象都有单独的优先级主题消费者,每个优先级消费者都有基于 maxPollRecordsDistributor 的预留容量。如果满足以下所有条件,每个优先级主题消费者将尝试突破组中其他优先级主题消费者的容量:

  • 它有资格突发 - 这是如果在最后 max.poll.history.window.size 次尝试中 poll() 至少 min.poll.window.maxout.threshold 次它收到的记录数等于分配max.poll.records 是基于 maxPollRecordsDistributor 分发的。这表明该分区有更多传入记录需要处理。

  • Higher priority level is not eligible to burst - 根据以上逻辑,没有更高优先级的topic consumer 有资格进行burst。基本上让位给更高的优先级。

如果以上为真,那么优先级主题消费者将爆发到所有其他优先级主题消费者容量中。每个优先级主题消费者的突发量等于最近 max.poll.history.window.sizepoll() 尝试中未使用的最少容量。

解决方案是根据优先级创建 3 个不同的主题。

  • 高优先级主题
  • 中等优先级主题
  • 低优先级主题

根据一般经验,高优先级主题的消费者数量 > 中优先级主题的消费者数量 > 低优先级主题的消费者数量

这样可以保证到达高优先级主题的消息比低优先级主题处理得更快。

Confluent 在 Implementing Message Prioritization in Apache Kafka 上有一篇博客描述了如何实现消息优先级排序。

首先,了解 Kafka 的设计不允许开箱即用的消息优先级解决方案很重要。主要原因是:

  • 存储:Kafka 被设计为 append-only 提交日志,其中包含 immutable 反映实时事件及时发生的消息.
  • 消费者:Kafka主题中的消息可以同时被多个消费者消费。每个消费者可能有不同的优先级,这使得无法提前对主题内的消息进行排序。

建议的解决方案是使用 桶优先级模式 ,它在 GitHub 上可用,可以用其自述文件中的图表进行最好的描述。您可以通过自定义生产者的 partitioner 和消费者。

基于消息键,生产者将消息写入正确的优先级桶:

另一方面,消费者组将自定义其分配策略并优先从具有最高分区的分区读取消息:

在您的客户端代码(生产者和消费者)中,您需要启动并调整以下客户端配置。

# Producer
configs.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG,
   BucketPriorityPartitioner.class.getName());
configs.setProperty(BucketPriorityConfig.BUCKETS_CONFIG, "Platinum, Gold");
configs.setProperty(BucketPriorityConfig.ALLOCATION_CONFIG, "70%, 30%");

# Consumer
configs.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
   BucketPriorityAssignor.class.getName());
configs.setProperty(BucketPriorityConfig.BUCKETS_CONFIG, "Platinum, Gold");
configs.setProperty(BucketPriorityConfig.ALLOCATION_CONFIG, "70%, 30%");

我将在此处添加 @Sky's 的 Java 版本以供任何人参考。注意:我没有使用 KafkaStreams,而是使用普通的 KafkaConsumer 实现的。

从生产者的角度,您可以根据优先级将消息发布到相应的主题。

从消费者的角度来看,您可以尝试实施如下所示的内容。请注意,这不是可用于生产的实现。此解决方案是单线程的,可能会很慢。

与桶优先级模式不同,此代码将继续处理来自高优先级主题的消息,直到处理完所有消息。当高优先级主题没有消息时,这将回退到下一个优先级等等。

import lombok.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import java.util.function.Consumer;

import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.*;

@Component
@Slf4j
public class PriorityBasedConsumer {

    @Value("${spring.kafka.consumer.group-id}")
    private String consumerGroupId;

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    private final List<TopicConsumer> consumersInPriorityOrder = new ArrayList<>();
    
    @RequiredArgsConstructor(staticName = "of")
    @Getter
    private static class TopicConsumer {
        private final String topic;
        private final KafkaConsumer<String, String> kafkaConsumer;
        private final Consumer<ConsumerRecords<String, String>> consumerLogic;
    }

    private void highPriorityConsumer(ConsumerRecords<String, String> records) {
        // high priority processing...
    }

    private void mediumPriorityConsumer(ConsumerRecords<String, String> records) {
        // medium priority processing...
    }

    private void lowPriorityConsumer(ConsumerRecords<String, String> records) {
        // low priority processing...
    }

    @PostConstruct
    public void init() {
        Map<String, Consumer<ConsumerRecords<String, String>>> topicVsConsumerLogic = new HashMap<>();
        topicVsConsumerLogic.put("high_priority_queue", this::highPriorityConsumer);
        topicVsConsumerLogic.put("medium_priority_queue", this::mediumPriorityConsumer);
        topicVsConsumerLogic.put("low_priority_queue", this::lowPriorityConsumer);
        // if you're taking the topic names from external configuration, make sure to order it based on priority.
        for (String topic : Arrays.asList("high_priority_queue", "medium_priority_queue", "low_priority_queue")) {
            Properties consumerProperties = new Properties();
            consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
            // add other properties.
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);
            consumer.subscribe(Collections.singletonList(topic));
            consumersInPriorityOrder.add(TopicConsumer.of(topic, consumer, topicVsConsumerLogic.get(topic)));
        }
    }

    @EventListener(ApplicationReadyEvent.class) // To execute once the application is ready.
    public void startConsumers() {
        // For illustration purposes, I just wrote this synchronous code. Use thread pools where ever 
        // necessary for high performance.
        while (true) { // poll infinitely
            try {
                // Consumers iterated based on priority.
                for (TopicConsumer topicConsumer : consumersInPriorityOrder) {
                    ConsumerRecords<String, String> records
                            = topicConsumer.getKafkaConsumer().poll(Duration.ofMillis(100));
                    if (!records.isEmpty()) {
                        topicConsumer.getConsumerLogic().accept(records);
                        break;  // To start consuming again based on priority.
                    }
                }
            } catch (Exception e) {
                // on any unknown runtime exceptions, ignoring here. You can add your proper logic.
                log.error("Unknown exception occurred.", e);
            }
        }
    }
}