在一个 Kafka Topic 下发送两个 Serialized Java 对象
Send two Serialized Java objects under one Kafka Topic
我想实现发送和接收 Java 对象的 Kafka 消费者和生产者。完整 Source 我试过这个:
制作人:
@Configuration
public class KafkaProducerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, SaleRequestFactory> saleRequestFactoryProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SaleRequestFactorySerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate() {
return new KafkaTemplate<>(saleRequestFactoryProducerFactory());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ReplyingKafkaTemplate<String, SaleRequestFactory, SaleResponseFactory> replyKafkaTemplate(ProducerFactory<String, SaleRequestFactory> producerFactory, ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> factory) {
ConcurrentMessageListenerContainer<String, SaleResponseFactory> kafkaMessageListenerContainer = factory.createContainer("tp-sale");
kafkaMessageListenerContainer.getContainerProperties().setGroupId("tp-sale.reply");
return new ReplyingKafkaTemplate<>(producerFactory, kafkaMessageListenerContainer);
}
}
发送对象:
@RestController
@RequestMapping("/checkout")
public class CheckoutController {
private TransactionService transactionService;
private KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate;
private ReplyingKafkaTemplate<String, SaleRequestFactory, SaleResponseFactory> requestReplyKafkaTemplate;
private static String topic = "tp-sale";
@Autowired
public CheckoutController(ValidationMessage validationMessage, TransactionService transactionService,
KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate,
ReplyingKafkaTemplate<String, SaleRequestFactory, SaleResponseFactory> requestReplyKafkaTemplate){
this.transactionService = transactionService;
this.saleRequestFactoryKafkaTemplate = saleRequestFactoryKafkaTemplate;
this.requestReplyKafkaTemplate = requestReplyKafkaTemplate;
}
@PostMapping("test")
private void performPayment() throws ExecutionException, InterruptedException, TimeoutException {
Transaction transaction = new Transaction();
transaction.setStatus(PaymentTransactionStatus.IN_PROGRESS.getText());
Transaction insertedTransaction = transactionService.save(transaction);
SaleRequestFactory obj = new SaleRequestFactory();
obj.setId(100);
ProducerRecord<String, SaleRequestFactory> record = new ProducerRecord<>("tp-sale", obj);
RequestReplyFuture<String, SaleRequestFactory, SaleResponseFactory> replyFuture = requestReplyKafkaTemplate.sendAndReceive(record);
SendResult<String, SaleRequestFactory> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
ConsumerRecord<String, SaleResponseFactory> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
SaleResponseFactory value = consumerRecord.value();
System.out.println("!!!!!!!!!!!! " + value.getUnique_id());
}
}
消费者:
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
private String groupId = "test";
@Bean
public ConsumerFactory<String, SaleResponseFactory> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SaleResponseFactoryDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
接收对象
@Component
public class ProcessingSaleListener {
private static String topic = "tp-sale";
@KafkaListener(topics = "tp-sale")
public SaleResponseFactory process(@Payload SaleRequestFactory tf, @Headers MessageHeaders headers) throws Exception {
System.out.println(tf.getId());
SaleResponseFactory resObj = new SaleResponseFactory();
resObj.setUnique_id("123123");
return resObj;
}
}
自定义对象
import java.io.Serializable;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder(toBuilder = true)
public class SaleRequestFactory implements Serializable {
private static final long serialVersionUID = 1744050117179344127L;
private int id;
}
序列化器
import org.apache.kafka.common.serialization.Serializer;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;
public class SaleRequestFactorySerializer implements Serializable, Serializer<SaleRequestFactory> {
@Override
public byte[] serialize(String topic, SaleRequestFactory data)
{
ByteArrayOutputStream out = new ByteArrayOutputStream();
try
{
ObjectOutputStream outputStream = new ObjectOutputStream(out);
outputStream.writeObject(data);
out.close();
}
catch (IOException e)
{
throw new RuntimeException("Unhandled", e);
}
return out.toByteArray();
}
}
响应对象
import java.io.Serializable;
import java.time.LocalDateTime;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder(toBuilder = true)
public class SaleResponseFactory implements Serializable {
private static final long serialVersionUID = 1744050117179344127L;
private String unique_id;
}
回复Class
import org.apache.kafka.common.serialization.Deserializer;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
public class SaleResponseFactoryDeserializer implements Serializable, Deserializer<SaleRequestFactory> {
@Override
public SaleRequestFactory deserialize(String topic, byte[] data)
{
SaleRequestFactory saleRequestFactory = null;
try
{
ByteArrayInputStream bis = new ByteArrayInputStream(data);
ObjectInputStream in = new ObjectInputStream(bis);
saleRequestFactory = (SaleRequestFactory) in.readObject();
in.close();
}
catch (IOException | ClassNotFoundException e)
{
throw new RuntimeException("Unhandled", e);
}
return saleRequestFactory;
}
}
我想根据对象类型发送和接收不同的序列化 Java 对象。例如,有时 SaleRequestFactory
并接收 SaleResponseFactory
或发送 AuthRequestFactory
并接收 AuthResponseFactory
。是否可以使用一个主题发送和接收不同的 Java 对象?
完整示例code
这是可能的,但是每个 object 类型需要两个单独的生产者工厂。或者使用 ByteArraySerializer 并自己序列化 objects(等同于 Gary 的回答)
如果您确实想要正确反序列化 object,那么对于消费者来说也是如此。否则,您将使用 ByteArrayDeserializer(同样,相当于 Gary 展示的解串器),然后假设 Java 无法确定字节中的 object 类型(序列化的 Object 流是} ,您将在记录中包含额外的元数据,例如 headers,或者您解析以确定如何反序列化数据的特定键,然后自己调用相应的反序列化方法
总的来说,我建议 re-evaluating 为什么您需要将不同类型的记录放在一个主题中,或者查看其他消息格式,包括类似 CloudEvents 规范的内容,或者使用 Avro / Protobuf / 多态 JSON 类型,这将更好地与 Kafka 以外的客户端一起使用
使用 Object
作为值类型 - 这里是一个使用 Boot 的自动配置基础结构 bean 的示例...
@SpringBootApplication
public class So65866763Application {
public static void main(String[] args) {
SpringApplication.run(So65866763Application.class, args);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, Object> template) {
return args -> {
template.send("so65866763", new Foo());
template.send("so65866763", new Bar());
};
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so65866763").partitions(1).replicas(1).build();
}
}
class Foo implements Serializable {
}
class Bar implements Serializable {
}
@Component
@KafkaListener(id = "so65866763", topics = "so65866763")
class Listener {
@KafkaHandler
void fooListener(Foo foo) {
System.out.println("In fooListener: " + foo);
}
@KafkaHandler
void barListener(Bar bar) {
System.out.println("In barListener: " + bar);
}
}
public class JavaSerializer implements Serializer<Object> {
@Override
public byte[] serialize(String topic, Object data) {
return null;
}
@Override
public byte[] serialize(String topic, Headers headers, Object data) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
oos.writeObject(data);
return baos.toByteArray();
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
public class JavaDeserializer implements Deserializer<Object> {
@Override
public Object deserialize(String topic, byte[] data) {
return null;
}
@Override
public Object deserialize(String topic, Headers headers, byte[] data) {
ByteArrayInputStream bais = new ByteArrayInputStream(data);
try (ObjectInputStream ois = new ObjectInputStream(bais)) {
return ois.readObject();
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
catch (ClassNotFoundException e) {
throw new IllegalStateException(e);
}
}
}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.value-serializer=com.example.demo.JavaSerializer
spring.kafka.consumer.value-deserializer=com.example.demo.JavaDeserializer
In fooListener: com.example.demo.Foo@331ca660
In barListener: com.example.demo.Bar@26f54288
我想实现发送和接收 Java 对象的 Kafka 消费者和生产者。完整 Source 我试过这个:
制作人:
@Configuration
public class KafkaProducerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, SaleRequestFactory> saleRequestFactoryProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SaleRequestFactorySerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate() {
return new KafkaTemplate<>(saleRequestFactoryProducerFactory());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ReplyingKafkaTemplate<String, SaleRequestFactory, SaleResponseFactory> replyKafkaTemplate(ProducerFactory<String, SaleRequestFactory> producerFactory, ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> factory) {
ConcurrentMessageListenerContainer<String, SaleResponseFactory> kafkaMessageListenerContainer = factory.createContainer("tp-sale");
kafkaMessageListenerContainer.getContainerProperties().setGroupId("tp-sale.reply");
return new ReplyingKafkaTemplate<>(producerFactory, kafkaMessageListenerContainer);
}
}
发送对象:
@RestController
@RequestMapping("/checkout")
public class CheckoutController {
private TransactionService transactionService;
private KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate;
private ReplyingKafkaTemplate<String, SaleRequestFactory, SaleResponseFactory> requestReplyKafkaTemplate;
private static String topic = "tp-sale";
@Autowired
public CheckoutController(ValidationMessage validationMessage, TransactionService transactionService,
KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate,
ReplyingKafkaTemplate<String, SaleRequestFactory, SaleResponseFactory> requestReplyKafkaTemplate){
this.transactionService = transactionService;
this.saleRequestFactoryKafkaTemplate = saleRequestFactoryKafkaTemplate;
this.requestReplyKafkaTemplate = requestReplyKafkaTemplate;
}
@PostMapping("test")
private void performPayment() throws ExecutionException, InterruptedException, TimeoutException {
Transaction transaction = new Transaction();
transaction.setStatus(PaymentTransactionStatus.IN_PROGRESS.getText());
Transaction insertedTransaction = transactionService.save(transaction);
SaleRequestFactory obj = new SaleRequestFactory();
obj.setId(100);
ProducerRecord<String, SaleRequestFactory> record = new ProducerRecord<>("tp-sale", obj);
RequestReplyFuture<String, SaleRequestFactory, SaleResponseFactory> replyFuture = requestReplyKafkaTemplate.sendAndReceive(record);
SendResult<String, SaleRequestFactory> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
ConsumerRecord<String, SaleResponseFactory> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
SaleResponseFactory value = consumerRecord.value();
System.out.println("!!!!!!!!!!!! " + value.getUnique_id());
}
}
消费者:
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
private String groupId = "test";
@Bean
public ConsumerFactory<String, SaleResponseFactory> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SaleResponseFactoryDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
接收对象
@Component
public class ProcessingSaleListener {
private static String topic = "tp-sale";
@KafkaListener(topics = "tp-sale")
public SaleResponseFactory process(@Payload SaleRequestFactory tf, @Headers MessageHeaders headers) throws Exception {
System.out.println(tf.getId());
SaleResponseFactory resObj = new SaleResponseFactory();
resObj.setUnique_id("123123");
return resObj;
}
}
自定义对象
import java.io.Serializable;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder(toBuilder = true)
public class SaleRequestFactory implements Serializable {
private static final long serialVersionUID = 1744050117179344127L;
private int id;
}
序列化器
import org.apache.kafka.common.serialization.Serializer;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;
public class SaleRequestFactorySerializer implements Serializable, Serializer<SaleRequestFactory> {
@Override
public byte[] serialize(String topic, SaleRequestFactory data)
{
ByteArrayOutputStream out = new ByteArrayOutputStream();
try
{
ObjectOutputStream outputStream = new ObjectOutputStream(out);
outputStream.writeObject(data);
out.close();
}
catch (IOException e)
{
throw new RuntimeException("Unhandled", e);
}
return out.toByteArray();
}
}
响应对象
import java.io.Serializable;
import java.time.LocalDateTime;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder(toBuilder = true)
public class SaleResponseFactory implements Serializable {
private static final long serialVersionUID = 1744050117179344127L;
private String unique_id;
}
回复Class
import org.apache.kafka.common.serialization.Deserializer;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
public class SaleResponseFactoryDeserializer implements Serializable, Deserializer<SaleRequestFactory> {
@Override
public SaleRequestFactory deserialize(String topic, byte[] data)
{
SaleRequestFactory saleRequestFactory = null;
try
{
ByteArrayInputStream bis = new ByteArrayInputStream(data);
ObjectInputStream in = new ObjectInputStream(bis);
saleRequestFactory = (SaleRequestFactory) in.readObject();
in.close();
}
catch (IOException | ClassNotFoundException e)
{
throw new RuntimeException("Unhandled", e);
}
return saleRequestFactory;
}
}
我想根据对象类型发送和接收不同的序列化 Java 对象。例如,有时 SaleRequestFactory
并接收 SaleResponseFactory
或发送 AuthRequestFactory
并接收 AuthResponseFactory
。是否可以使用一个主题发送和接收不同的 Java 对象?
完整示例code
这是可能的,但是每个 object 类型需要两个单独的生产者工厂。或者使用 ByteArraySerializer 并自己序列化 objects(等同于 Gary 的回答)
如果您确实想要正确反序列化 object,那么对于消费者来说也是如此。否则,您将使用 ByteArrayDeserializer(同样,相当于 Gary 展示的解串器),然后假设 Java 无法确定字节中的 object 类型(序列化的 Object 流是} ,您将在记录中包含额外的元数据,例如 headers,或者您解析以确定如何反序列化数据的特定键,然后自己调用相应的反序列化方法
总的来说,我建议 re-evaluating 为什么您需要将不同类型的记录放在一个主题中,或者查看其他消息格式,包括类似 CloudEvents 规范的内容,或者使用 Avro / Protobuf / 多态 JSON 类型,这将更好地与 Kafka 以外的客户端一起使用
使用 Object
作为值类型 - 这里是一个使用 Boot 的自动配置基础结构 bean 的示例...
@SpringBootApplication
public class So65866763Application {
public static void main(String[] args) {
SpringApplication.run(So65866763Application.class, args);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, Object> template) {
return args -> {
template.send("so65866763", new Foo());
template.send("so65866763", new Bar());
};
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so65866763").partitions(1).replicas(1).build();
}
}
class Foo implements Serializable {
}
class Bar implements Serializable {
}
@Component
@KafkaListener(id = "so65866763", topics = "so65866763")
class Listener {
@KafkaHandler
void fooListener(Foo foo) {
System.out.println("In fooListener: " + foo);
}
@KafkaHandler
void barListener(Bar bar) {
System.out.println("In barListener: " + bar);
}
}
public class JavaSerializer implements Serializer<Object> {
@Override
public byte[] serialize(String topic, Object data) {
return null;
}
@Override
public byte[] serialize(String topic, Headers headers, Object data) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
oos.writeObject(data);
return baos.toByteArray();
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
public class JavaDeserializer implements Deserializer<Object> {
@Override
public Object deserialize(String topic, byte[] data) {
return null;
}
@Override
public Object deserialize(String topic, Headers headers, byte[] data) {
ByteArrayInputStream bais = new ByteArrayInputStream(data);
try (ObjectInputStream ois = new ObjectInputStream(bais)) {
return ois.readObject();
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
catch (ClassNotFoundException e) {
throw new IllegalStateException(e);
}
}
}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.value-serializer=com.example.demo.JavaSerializer
spring.kafka.consumer.value-deserializer=com.example.demo.JavaDeserializer
In fooListener: com.example.demo.Foo@331ca660
In barListener: com.example.demo.Bar@26f54288