Mutiny - Kafka 按顺序写入
Mutiny - Kafka writes happening sequentially
我是 Quarkus 的新手。我正在尝试使用接收输入的 quarkus reactive 编写 REST 端点,进行一些验证,将输入转换为列表,然后将消息写入 kafka。我的理解是将所有内容都转换为 Uni/Multi,将导致在 I/O 线程上以异步方式执行。在 intelliJ 日志中,我可以看到代码在执行程序线程中以顺序方式执行。 kafka 写入顺序发生在它自己的网络线程中,这会增加延迟。
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Multi<OutputSample> send(InputSample inputSample) {
ObjectMapper mapper = new ObjectMapper();
//deflateMessage() converts input to a list of inputSample
Multi<InputSample> keys = Multi.createFrom().item(inputSample)
.onItem().transformToMulti(array -> Multi.createFrom().iterable(deflateMessage.deflateMessage(array)))
.concatenate();
return keys.onItem().transformToUniAndMerge(payload -> {
try {
return producer.writeToKafka(payload, mapper);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return null;
});
}
@Inject
@Channel("write")
Emitter<String> emitter;
Uni<OutputSample> writeToKafka(InputSample kafkaPayload, ObjectMapper mapper) throws JsonProcessingException {
String inputSampleJson = mapper.writeValueAsString(kafkaPayload);
return Uni.createFrom().completionStage(emitter.send(inputSampleJson))
.onItem().transform(ignored -> new OutputSample("id", 200, "OK"))
.onFailure().recoverWithItem(new OutputSample("id", 500, "INTERNAL_SERVER_ERROR"));
}
我已经用了几天了。不确定是否做错了什么。任何帮助,将不胜感激。
谢谢
mutiny 与任何其他反应式库一样,主要围绕数据流控制设计。
也就是说,从本质上讲,它将提供一组功能(通常通过一些运算符)来控制流执行和调度。这意味着除非您指示 munity 对象进行异步操作,否则它们将简单地以顺序(旧)方式执行。
执行调度使用两个运算符控制:
runSubscriptionOn
:这将导致生成项目的代码片段(通常称为上游)在指定 Executor
的线程上执行
emitOn
:这将导致订阅代码(通常称为下游)在指定的线程上执行 Executor
然后您可以按如下方式更新您的代码,使通货紧缩变为异步:
Multi<InputSample> keys = Multi.createFrom()
.item(inputSample)
.onItem()
.transformToMulti(array -> Multi.createFrom()
.iterable(deflateMessage.deflateMessage(array)))
.runSubscriptionOn(Infrastructure.getDefaultExecutor()) // items will be transformed on a separate thread
.concatenate();
编辑:下游在单独的线程上
为了在单独的线程上完成完整的下游、转换和写入 Kafka 队列,您可以使用 emitOn
运算符,如下所示:
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Multi<OutputSample> send(InputSample inputSample) {
ObjectMapper mapper = new ObjectMapper();
return Uni.createFrom()
.item(inputSample)
.onItem()
.transformToMulti(array -> Multi.createFrom().iterable(deflateMessage.deflateMessage(array)))
.emitOn(Executors.newFixedThreadPool(5)) // items will be emitted on a separate thread after transformation
.onItem()
.transformToUniAndConcatenate(payload -> {
try {
return producer.writeToKafka(payload, mapper);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return Uni.createFrom().<OutputSample>nothing();
});
}
当您有一个持续发出项目直到发出完成事件的源时,将使用 Multi,但您的情况并非如此。
来自兵变docs:
A Multi represents a stream of data. A stream can emit 0, 1, n, or an
infinite number of items.
You will rarely create instances of Multi yourself but instead use a
reactive client that exposes a Mutiny API.
您要查找的是 Uni<List<OutputSample>>
因为您的 API 您 return 1 且只有 1 项具有完整的结果列表。
所以你需要的是将每条消息发送到 Kafka 而不是立即等待它们 return 而是收集生成的 Unis 然后收集到单个 Uni.
@POST
public Uni<List<OutputSample>> send(InputSample inputSample) {
// This could be injected directly inside your producer
ObjectMapper mapper = new ObjectMapper();
// Send each item to Kafka and collect resulting Unis
List<Uni<OutputSample>> uniList = deflateMessage(inputSample).stream()
.map(input -> producer.writeToKafka(input, mapper))
.collect(Collectors.toList());
// Transform a list of Unis to a single Uni of a list
@SuppressWarnings("unchecked") // Mutiny API fault...
Uni<List<OutputSample>> result = Uni.combine().all().unis(uniList)
.combinedWith(list -> (List<OutputSample>) list);
return result;
}
我是 Quarkus 的新手。我正在尝试使用接收输入的 quarkus reactive 编写 REST 端点,进行一些验证,将输入转换为列表,然后将消息写入 kafka。我的理解是将所有内容都转换为 Uni/Multi,将导致在 I/O 线程上以异步方式执行。在 intelliJ 日志中,我可以看到代码在执行程序线程中以顺序方式执行。 kafka 写入顺序发生在它自己的网络线程中,这会增加延迟。
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Multi<OutputSample> send(InputSample inputSample) {
ObjectMapper mapper = new ObjectMapper();
//deflateMessage() converts input to a list of inputSample
Multi<InputSample> keys = Multi.createFrom().item(inputSample)
.onItem().transformToMulti(array -> Multi.createFrom().iterable(deflateMessage.deflateMessage(array)))
.concatenate();
return keys.onItem().transformToUniAndMerge(payload -> {
try {
return producer.writeToKafka(payload, mapper);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return null;
});
}
@Inject
@Channel("write")
Emitter<String> emitter;
Uni<OutputSample> writeToKafka(InputSample kafkaPayload, ObjectMapper mapper) throws JsonProcessingException {
String inputSampleJson = mapper.writeValueAsString(kafkaPayload);
return Uni.createFrom().completionStage(emitter.send(inputSampleJson))
.onItem().transform(ignored -> new OutputSample("id", 200, "OK"))
.onFailure().recoverWithItem(new OutputSample("id", 500, "INTERNAL_SERVER_ERROR"));
}
我已经用了几天了。不确定是否做错了什么。任何帮助,将不胜感激。 谢谢
mutiny 与任何其他反应式库一样,主要围绕数据流控制设计。
也就是说,从本质上讲,它将提供一组功能(通常通过一些运算符)来控制流执行和调度。这意味着除非您指示 munity 对象进行异步操作,否则它们将简单地以顺序(旧)方式执行。
执行调度使用两个运算符控制:
runSubscriptionOn
:这将导致生成项目的代码片段(通常称为上游)在指定Executor
的线程上执行
emitOn
:这将导致订阅代码(通常称为下游)在指定的线程上执行Executor
然后您可以按如下方式更新您的代码,使通货紧缩变为异步:
Multi<InputSample> keys = Multi.createFrom()
.item(inputSample)
.onItem()
.transformToMulti(array -> Multi.createFrom()
.iterable(deflateMessage.deflateMessage(array)))
.runSubscriptionOn(Infrastructure.getDefaultExecutor()) // items will be transformed on a separate thread
.concatenate();
编辑:下游在单独的线程上
为了在单独的线程上完成完整的下游、转换和写入 Kafka 队列,您可以使用 emitOn
运算符,如下所示:
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Multi<OutputSample> send(InputSample inputSample) {
ObjectMapper mapper = new ObjectMapper();
return Uni.createFrom()
.item(inputSample)
.onItem()
.transformToMulti(array -> Multi.createFrom().iterable(deflateMessage.deflateMessage(array)))
.emitOn(Executors.newFixedThreadPool(5)) // items will be emitted on a separate thread after transformation
.onItem()
.transformToUniAndConcatenate(payload -> {
try {
return producer.writeToKafka(payload, mapper);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return Uni.createFrom().<OutputSample>nothing();
});
}
当您有一个持续发出项目直到发出完成事件的源时,将使用 Multi,但您的情况并非如此。
来自兵变docs:
A Multi represents a stream of data. A stream can emit 0, 1, n, or an infinite number of items.
You will rarely create instances of Multi yourself but instead use a reactive client that exposes a Mutiny API.
您要查找的是 Uni<List<OutputSample>>
因为您的 API 您 return 1 且只有 1 项具有完整的结果列表。
所以你需要的是将每条消息发送到 Kafka 而不是立即等待它们 return 而是收集生成的 Unis 然后收集到单个 Uni.
@POST
public Uni<List<OutputSample>> send(InputSample inputSample) {
// This could be injected directly inside your producer
ObjectMapper mapper = new ObjectMapper();
// Send each item to Kafka and collect resulting Unis
List<Uni<OutputSample>> uniList = deflateMessage(inputSample).stream()
.map(input -> producer.writeToKafka(input, mapper))
.collect(Collectors.toList());
// Transform a list of Unis to a single Uni of a list
@SuppressWarnings("unchecked") // Mutiny API fault...
Uni<List<OutputSample>> result = Uni.combine().all().unis(uniList)
.combinedWith(list -> (List<OutputSample>) list);
return result;
}