如何使用 Spring Kafka 生产者发送批量数据

how to send batched data with Spring Kafka producer

目前我有这样的代码:

KafkaTemplate<String, String> kafkaTemplate;

List<Pet> myData;

for(Pet p: myData) {
  String json = objectWriter.writeValueAsString(p)
  kafkaTemplate.send(topic, json)
}

所以每个列表项都一个一个发送。 我如何一次发送整个列表?

所以没有直接的方法直接使用KafkaTemplate or KafkaProducer将批量消息发送到kafka。他们没有接受 List 个对象并将它们单独发送到不同分区的任何方法。

kafka生产者如何向kafka发送消息?

KafkaProducer

Kafka 生产者创建一批记录,然后一次发送这些所有记录,以获得更多 information

The producer consists of a pool of buffer space that holds records that haven't yet been transmitted to the server as well as a background I/O thread that is responsible for turning these records into requests and transmitting them to the cluster.

The send() method is asynchronous. When called it adds the record to a buffer of pending record sends and immediately returns. This allows the producer to batch together individual records for efficiency.

Asynchronous send

Batching is one of the big drivers of efficiency, and to enable batching the Kafka producer will attempt to accumulate data in memory and to send out larger batches in a single request. The batching can be configured to accumulate no more than a fixed number of messages and to wait no longer than some fixed latency bound (say 64k or 10 ms). This allows the accumulation of more bytes to send, and few larger I/O operations on the servers. This buffering is configurable and gives a mechanism to trade off a small amount of additional latency for better throughput.

由于您使用的是 spring-kafka,因此您可以发送 List<Objects>,但在这里您发送的是 JSONObject 中的 JSONArray,而不是每个 JSONObject 到主题分区

public KafkaTemplate<String, List<Object>> createTemplate() {

        Map<String, Object> senderProps = senderProps();
ProducerFactory<Integer, String> pf =
          new DefaultKafkaProducerFactory<String, List<Object>>(senderProps);
        KafkaTemplate<String, List<Object>> template = new KafkaTemplate<>(pf);
return template;

 }

 public Map<String, Object> producerProps() {

        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
       props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
       return props;

 }

KafkaTemplate<String, List<Object>> kafkaTemplate;

使用已由 Deadpool 回答的 KafkaTemplate 有另一种方法可以将对象序列化为字节数组并发送整个对象。

这不是最佳方法,因为它否决了 Kafka 尽可能多地分布和并行化的最佳实践。所以一般分发消息,让生产者使用池缓冲区和分区来并行化。 但是有些时候我们可能需要使用特定用途.......

//你可以使用任何对象列表我只是使用字符串但可以增强到任何对象毕竟字符串只是对象..如果你使用的是 Pojo 可以转换为 JSON 字符串并作为 json.

的字符串列表传递
public byte[] searlizedByteArray(List<String> listObject) throws IOException {

            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            ObjectOutput out = null;
            byte[] inByteArray = null;
            try {
                out = new ObjectOutputStream(bos);
                out.writeObject(listObject);
                out.flush();
                inByteArray = bos.toByteArray();
            } finally {
                if (bos != null)
                    bos.close();

            }
            return inByteArray;
        }

将字节[]数组反序列化为对象列表

public List<String> desearlizedByteArray(byte[] byteArray) throws IOException, ClassNotFoundException {
    ByteArrayInputStream bis = new ByteArrayInputStream(byteArray);
    ObjectInput in = null;
    List<String> returnList=null;
    try {
      in = new ObjectInputStream(bis);
      List<String> o = (List<String>) in.readObject(); 

     for (String string : o) {
        System.out.println("==="+o);
    }

    } finally {
      try {
        if (in != null) {
          in.close();
        }
      } catch (IOException ex) {
        // ignore close exception
      }
    }
    return returnList;

}

请注意我们使用 VALUE_SERIALIZER_CLASS_CONFIG 作为 ByteArraySerializer

public void publishMessage() throws Exception {
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8080");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
//You might to increase buffer memory and request size in case of large size,
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "");
        properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "");

        Producer<String, byte[]> producer = new org.apache.kafka.clients.producer.KafkaProducer<String, byte[]>(
                properties);
        try {
            List asl=new ArrayList<String>();
            asl.add("test1");
            asl.add("test2");

            byte[] byteArray = searlizedByteArray(asl);
            ProducerRecord producerRecord = new ProducerRecord<String, byte[]>("testlist", null,
                    byteArray);

            producer.send(producerRecord).get();

        } finally {
            if (producer != null) {
                producer.flush();
                producer.close();
            }

        }

    }

最终在消费者中 ByteArrayDeserializer 将起作用

public void consumeMessage() {

    Properties properties = new Properties();

    properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
    properties.setProperty("key.deserializer", StringDeserializer.class.getName());
    properties.setProperty("value.deserializer", ByteArrayDeserializer.class.getName());
    properties.setProperty("group.id", "grp_consumer");
    properties.put("auto.offset.reset", "earliest");


    KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<String, byte[]>(properties);
    consumer.subscribe(Arrays.asList("testlist"));

    while (true) {
        ConsumerRecords<String, byte[]> records = consumer.poll(100);
        for (ConsumerRecord<String, byte[]> record : records) {


        }
    }



}

一般设置属性:

就够了
 props.put(ProducerConfig.LINGER_MS_CONFIG, 10);

并使用 属性 增加批处理缓冲区的大小:

props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);

根据您的要求。

注意: 确保代码中没有 flush() 方法调用,因为它会丢弃所有批处理设置。