Spring-启动批处理文件读取并向Kafka发送数据

Spring-boot batch file read and send data to Kafka

我无法将数据从 CSV 文件发送到 Kafka。这是我的代码 writer.java 用于批处理


import java.util.List;

import javax.persistence.criteria.CriteriaBuilder.In;

import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;

import com.codenotfound.kafka.repository.*;

import java.util.*;

import com.codenotfound.kafka.Car;
import com.codenotfound.kafka.producer.Sender;


public class Writer implements ItemWriter<Car>{

    private final Repository repo;

    public Writer(Repository repo) {
        this.repo = repo ;       
    }

    @Override
    public void write(List<? extends Car> car) throws Exception {
        repo.save(car);
    }   
}

所以我不想 repo.save(car),而是希望将这辆车 class 的详细信息发送到 Kafka。 这是我的 Car class 和 Repository 接口分别


@Entity
@Table(name = "Car")

public class Car {


  private String make;

  private String manufacturer;
  @Id
  @GeneratedValue(strategy = GenerationType.AUTO)
  private long id;

  public Car() {
    //super();
  }

  public Car(String make, String manufacturer) {
    super();
    this.make = make;
    this.manufacturer = manufacturer;
  }

  public String getMake() {
    return make;
  }

  public void setMake(String make) {
    this.make = make;
  }

  public String getManufacturer() {
    return manufacturer;
  }


  public void setManufacturer(String manufacturer) {
    this.manufacturer = manufacturer;
  }

  public long getId() {
    return id;
  }


  public void setId(long id) {
    this.id = id;
  }

  @Override
  public String toString() {
    return "Car [make=" + make + ", manufacturer=" + manufacturer + ", id=" + id + "]";
  }
}   

和存储库class


public interface Repository extends CrudRepository<Car, Long>,CustomRepository {
}

我的 Kafka 发件人文件是:


package com.codenotfound.kafka.producer;

import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;

import com.codenotfound.kafka.Car;

public class Sender {

  private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class);

  @Value("${topic.json}")
  private String jsonTopic;

  @Autowired
  private KafkaTemplate<String, Car> kafkaTemplate;

  public void send(Car car) {
    LOGGER.info("sending car='{}'", car.toString());
    kafkaTemplate.send(jsonTopic, car);
  }
}

请向我推荐一种将数据从 CSV 文件发送到我的 Kafka 的方法。

看来您已经为您准备好了所有的拼图。您需要做的是使用您的发件人 class 更改您的 ItemWriter,因此您会得到如下内容:

@Component
public class Writer implements ItemWriter<Car> {

    @Value("${topic.json}")
    private String jsonTopic;

    @Autowired
    private KafkaTemplate<String, Car> kafkaTemplate;

    @Override
    public void write(List<? extends Car> cars) throws Exception {
        cars.forEach(car -> kafkaTemplate.send(jsonTopic, car));
    }
}

并且在你的作业配置中,你需要像这样自动装配它(下面的代码被简化了,只是展示了如何在步骤中声明编写器):

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Autowired
    private Writer carWriter;

    @Bean
    public Step myStep() {
        this.stepBuilderFactory.get("myStep")
            .<Car,Car> chunk(10)
            .reader(reader())
            .writer(carWriter) // don't use new here
            .build();
    }
}