Spring 批处理:一个 Reader,复合处理器(两个 类 具有不同的实体)和两个 kafkaItemWriter
Spring Batch : One Reader, composite processor (two classes with different entities) and two kafkaItemWriter
ItemReader
正在从 DB2 读取数据并给 java 对象 ClaimDto
。现在 ClaimProcessor
接收 ClaimDto
和 return 的对象 CompositeClaimRecord
对象,其中包含 claimRecord1
和 claimRecord2
将被发送到两个不同的对象卡夫卡主题。如何将claimRecord1
和claimRecord2
分别写到topic1和topic2。
您可以使用一个 ClassifierCompositeItemWriter 和两个 KafkaItemWriter
作为代表(每个主题一个)。
Classifier
会根据项目的类型(claimRecord1
或 claimRecord2
)对项目进行分类,并将它们路由到相应的 kafka 项目编写器(topic1
或 topic2
).
只需编写一个自定义 ItemWriter
即可。
public class YourItemWriter implements ItemWriter<CompositeClaimRecord>` {
private final ItemWriter<Record1> writer1;
private final ItemWriter<Record2> writer2;
public YourItemWriter(ItemWriter<Record1> writer1, ItemWriter<Record2> writer2>) {
this.writer1=writer1;
this.writer2=writer2;
}
public void write(List<CompositeClaimRecord> items) throws Exception {
for (CompositeClaimRecord record : items) {
writer1.write(Collections.singletonList(record.claimRecord1));
writer2.write(Collections.singletonList(record.claimRecord2));
}
}
}
或者不是一次写入 1 条记录,而是将单个列表转换为 2 个列表并传递。但是这样的错误处理可能有点挑战。 \
public class YourItemWriter implements ItemWriter<CompositeClaimRecord>` {
private final ItemWriter<Record1> writer1;
private final ItemWriter<Record2> writer2;
public YourItemWriter(ItemWriter<Record1> writer1, ItemWriter<Record2> writer2>) {
this.writer1=writer1;
this.writer2=writer2;
}
public void write(List<CompositeClaimRecord> items) throws Exception {
List<ClaimRecord1> record1List = items.stream().map(it -> it.claimRecord1).collect(Collectors.toList());
List<ClaimRecord2> record2List = items.stream().map(it -> it.claimRecord2).collect(Collectors.toList());
writer1.write(record1List);
writer2.write(record2List);
}
}
ItemReader
正在从 DB2 读取数据并给 java 对象 ClaimDto
。现在 ClaimProcessor
接收 ClaimDto
和 return 的对象 CompositeClaimRecord
对象,其中包含 claimRecord1
和 claimRecord2
将被发送到两个不同的对象卡夫卡主题。如何将claimRecord1
和claimRecord2
分别写到topic1和topic2。
您可以使用一个 ClassifierCompositeItemWriter 和两个 KafkaItemWriter
作为代表(每个主题一个)。
Classifier
会根据项目的类型(claimRecord1
或 claimRecord2
)对项目进行分类,并将它们路由到相应的 kafka 项目编写器(topic1
或 topic2
).
只需编写一个自定义 ItemWriter
即可。
public class YourItemWriter implements ItemWriter<CompositeClaimRecord>` {
private final ItemWriter<Record1> writer1;
private final ItemWriter<Record2> writer2;
public YourItemWriter(ItemWriter<Record1> writer1, ItemWriter<Record2> writer2>) {
this.writer1=writer1;
this.writer2=writer2;
}
public void write(List<CompositeClaimRecord> items) throws Exception {
for (CompositeClaimRecord record : items) {
writer1.write(Collections.singletonList(record.claimRecord1));
writer2.write(Collections.singletonList(record.claimRecord2));
}
}
}
或者不是一次写入 1 条记录,而是将单个列表转换为 2 个列表并传递。但是这样的错误处理可能有点挑战。 \
public class YourItemWriter implements ItemWriter<CompositeClaimRecord>` {
private final ItemWriter<Record1> writer1;
private final ItemWriter<Record2> writer2;
public YourItemWriter(ItemWriter<Record1> writer1, ItemWriter<Record2> writer2>) {
this.writer1=writer1;
this.writer2=writer2;
}
public void write(List<CompositeClaimRecord> items) throws Exception {
List<ClaimRecord1> record1List = items.stream().map(it -> it.claimRecord1).collect(Collectors.toList());
List<ClaimRecord2> record2List = items.stream().map(it -> it.claimRecord2).collect(Collectors.toList());
writer1.write(record1List);
writer2.write(record2List);
}
}