Spring-启动批处理文件读取并向Kafka发送数据
Spring-boot batch file read and send data to Kafka
我无法将数据从 CSV 文件发送到 Kafka。这是我的代码 writer.java 用于批处理
import java.util.List;
import javax.persistence.criteria.CriteriaBuilder.In;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import com.codenotfound.kafka.repository.*;
import java.util.*;
import com.codenotfound.kafka.Car;
import com.codenotfound.kafka.producer.Sender;
public class Writer implements ItemWriter<Car>{
private final Repository repo;
public Writer(Repository repo) {
this.repo = repo ;
}
@Override
public void write(List<? extends Car> car) throws Exception {
repo.save(car);
}
}
所以我不想 repo.save(car),而是希望将这辆车 class 的详细信息发送到 Kafka。
这是我的 Car class 和 Repository 接口分别
@Entity
@Table(name = "Car")
public class Car {
private String make;
private String manufacturer;
@Id
@GeneratedValue(strategy = GenerationType.AUTO)
private long id;
public Car() {
//super();
}
public Car(String make, String manufacturer) {
super();
this.make = make;
this.manufacturer = manufacturer;
}
public String getMake() {
return make;
}
public void setMake(String make) {
this.make = make;
}
public String getManufacturer() {
return manufacturer;
}
public void setManufacturer(String manufacturer) {
this.manufacturer = manufacturer;
}
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
@Override
public String toString() {
return "Car [make=" + make + ", manufacturer=" + manufacturer + ", id=" + id + "]";
}
}
和存储库class
public interface Repository extends CrudRepository<Car, Long>,CustomRepository {
}
我的 Kafka 发件人文件是:
package com.codenotfound.kafka.producer;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import com.codenotfound.kafka.Car;
public class Sender {
private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class);
@Value("${topic.json}")
private String jsonTopic;
@Autowired
private KafkaTemplate<String, Car> kafkaTemplate;
public void send(Car car) {
LOGGER.info("sending car='{}'", car.toString());
kafkaTemplate.send(jsonTopic, car);
}
}
请向我推荐一种将数据从 CSV 文件发送到我的 Kafka 的方法。
看来您已经为您准备好了所有的拼图。您需要做的是使用您的发件人 class 更改您的 ItemWriter,因此您会得到如下内容:
@Component
public class Writer implements ItemWriter<Car> {
@Value("${topic.json}")
private String jsonTopic;
@Autowired
private KafkaTemplate<String, Car> kafkaTemplate;
@Override
public void write(List<? extends Car> cars) throws Exception {
cars.forEach(car -> kafkaTemplate.send(jsonTopic, car));
}
}
并且在你的作业配置中,你需要像这样自动装配它(下面的代码被简化了,只是展示了如何在步骤中声明编写器):
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
private Writer carWriter;
@Bean
public Step myStep() {
this.stepBuilderFactory.get("myStep")
.<Car,Car> chunk(10)
.reader(reader())
.writer(carWriter) // don't use new here
.build();
}
}
我无法将数据从 CSV 文件发送到 Kafka。这是我的代码 writer.java 用于批处理
import java.util.List;
import javax.persistence.criteria.CriteriaBuilder.In;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import com.codenotfound.kafka.repository.*;
import java.util.*;
import com.codenotfound.kafka.Car;
import com.codenotfound.kafka.producer.Sender;
public class Writer implements ItemWriter<Car>{
private final Repository repo;
public Writer(Repository repo) {
this.repo = repo ;
}
@Override
public void write(List<? extends Car> car) throws Exception {
repo.save(car);
}
}
所以我不想 repo.save(car),而是希望将这辆车 class 的详细信息发送到 Kafka。 这是我的 Car class 和 Repository 接口分别
@Entity
@Table(name = "Car")
public class Car {
private String make;
private String manufacturer;
@Id
@GeneratedValue(strategy = GenerationType.AUTO)
private long id;
public Car() {
//super();
}
public Car(String make, String manufacturer) {
super();
this.make = make;
this.manufacturer = manufacturer;
}
public String getMake() {
return make;
}
public void setMake(String make) {
this.make = make;
}
public String getManufacturer() {
return manufacturer;
}
public void setManufacturer(String manufacturer) {
this.manufacturer = manufacturer;
}
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
@Override
public String toString() {
return "Car [make=" + make + ", manufacturer=" + manufacturer + ", id=" + id + "]";
}
}
和存储库class
public interface Repository extends CrudRepository<Car, Long>,CustomRepository {
}
我的 Kafka 发件人文件是:
package com.codenotfound.kafka.producer;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import com.codenotfound.kafka.Car;
public class Sender {
private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class);
@Value("${topic.json}")
private String jsonTopic;
@Autowired
private KafkaTemplate<String, Car> kafkaTemplate;
public void send(Car car) {
LOGGER.info("sending car='{}'", car.toString());
kafkaTemplate.send(jsonTopic, car);
}
}
请向我推荐一种将数据从 CSV 文件发送到我的 Kafka 的方法。
看来您已经为您准备好了所有的拼图。您需要做的是使用您的发件人 class 更改您的 ItemWriter,因此您会得到如下内容:
@Component
public class Writer implements ItemWriter<Car> {
@Value("${topic.json}")
private String jsonTopic;
@Autowired
private KafkaTemplate<String, Car> kafkaTemplate;
@Override
public void write(List<? extends Car> cars) throws Exception {
cars.forEach(car -> kafkaTemplate.send(jsonTopic, car));
}
}
并且在你的作业配置中,你需要像这样自动装配它(下面的代码被简化了,只是展示了如何在步骤中声明编写器):
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
private Writer carWriter;
@Bean
public Step myStep() {
this.stepBuilderFactory.get("myStep")
.<Car,Car> chunk(10)
.reader(reader())
.writer(carWriter) // don't use new here
.build();
}
}