如何通过 REST 将传入数据转发到 Quarkus 中的 SSE 流
How to forward incoming data via REST to an SSE stream in Quarkus
在我的设置中,我想通过 SSE 通道(服务器发送的事件)转发某些状态更改。状态更改是通过调用 REST 端点启动的。因此,我需要将传入的状态更改转发到 SSE 流。
在 Quarkus 中 best/simplest 实现此目的的方法是什么。
我能想到的一种解决方案是使用 EventBus (https://quarkus.io/guides/reactive-messaging)。 SSE 端点将订阅状态更改并将其推送到 SSE 通道。状态更改端点发布适当的事件。
这是可行的解决方案吗?还有其他(更简单的)解决方案吗?在任何情况下我都需要使用反应性的东西来完成这个吗?
非常感谢任何帮助!
最简单的方法是使用 rxjava 作为流提供程序。首先你需要添加 rxjava 依赖。它可以来自 quarkus 中的反应依赖项,例如 kafka,也可以直接使用它(如果您不需要任何流媒体库):
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.2.19</version>
</dependency>
下面是关于如何每秒发送随机双精度值的示例:
@GET
@Path("/stream")
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType("text/plain")
public Publisher<Double> stream() {
return Flowable.interval(1, TimeUnit.SECONDS).map(tick -> new Random().nextDouble());
}
我们创建了一个新的 Flowable,它将每秒触发一次,并且在每个 tick 上我们生成下一个随机双精度。调查关于如何创建 Flowable 的任何其他选项,例如 Flowable.fromFuture()
以适应您的特定代码逻辑。
P.S 上面的代码将在您每次查询此端点时生成新的 Flowable,我将其保存起来 space,在您的情况下,我假设您将拥有单一的事件源您可以构建一次并在每次端点查询时使用相同的实例
Dmytro,感谢您为我指明了正确的方向。
我选择了与 Kotlin 相关的 Mutiny。我的代码现在看起来像这样:
data class DeviceStatus(var status: Status = Status.OFFLINE) {
enum class Status {OFFLINE, CONNECTED, ANALYZING, MAINTENANCE}
}
@ApplicationScoped
class DeviceStatusService {
var deviceStatusProcessor: PublishProcessor<DeviceStatus> = PublishProcessor.create()
var deviceStatusQueue: Flowable<DeviceStatus> = Flowable.fromPublisher(deviceStatusProcessor)
fun pushDeviceStatus(deviceStatus: DeviceStatus) {
deviceStatusProcessor.onNext(deviceStatus)
}
fun getStream(): Multi<DeviceStatus> {
return Multi.createFrom().publisher(deviceStatusQueue)
}
}
@Path("/deviceStatus")
class DeviceStatusResource {
private val LOGGER: Logger = Logger.getLogger("DeviceStatusResource")
@Inject
@field: Default
lateinit var deviceStatusService: DeviceStatusService
@POST
@Consumes(MediaType.APPLICATION_JSON)
fun status(status: DeviceStatus): Response {
LOGGER.info("POST /deviceStatus " + status.status);
deviceStatusService.pushDeviceStatus(status)
return Response.ok().build();
}
@GET
@Path("/eventStream")
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.APPLICATION_JSON)
fun stream(): Multi<DeviceStatus>? {
return deviceStatusService.getStream()
}
}
作为最小设置,服务可以直接使用 deviceStatusProcessor 作为发布者。然而,Flowable 增加了缓冲。
欢迎对实施提出意见。
在我的设置中,我想通过 SSE 通道(服务器发送的事件)转发某些状态更改。状态更改是通过调用 REST 端点启动的。因此,我需要将传入的状态更改转发到 SSE 流。
在 Quarkus 中 best/simplest 实现此目的的方法是什么。
我能想到的一种解决方案是使用 EventBus (https://quarkus.io/guides/reactive-messaging)。 SSE 端点将订阅状态更改并将其推送到 SSE 通道。状态更改端点发布适当的事件。
这是可行的解决方案吗?还有其他(更简单的)解决方案吗?在任何情况下我都需要使用反应性的东西来完成这个吗?
非常感谢任何帮助!
最简单的方法是使用 rxjava 作为流提供程序。首先你需要添加 rxjava 依赖。它可以来自 quarkus 中的反应依赖项,例如 kafka,也可以直接使用它(如果您不需要任何流媒体库):
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.2.19</version>
</dependency>
下面是关于如何每秒发送随机双精度值的示例:
@GET
@Path("/stream")
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType("text/plain")
public Publisher<Double> stream() {
return Flowable.interval(1, TimeUnit.SECONDS).map(tick -> new Random().nextDouble());
}
我们创建了一个新的 Flowable,它将每秒触发一次,并且在每个 tick 上我们生成下一个随机双精度。调查关于如何创建 Flowable 的任何其他选项,例如 Flowable.fromFuture()
以适应您的特定代码逻辑。
P.S 上面的代码将在您每次查询此端点时生成新的 Flowable,我将其保存起来 space,在您的情况下,我假设您将拥有单一的事件源您可以构建一次并在每次端点查询时使用相同的实例
Dmytro,感谢您为我指明了正确的方向。 我选择了与 Kotlin 相关的 Mutiny。我的代码现在看起来像这样:
data class DeviceStatus(var status: Status = Status.OFFLINE) {
enum class Status {OFFLINE, CONNECTED, ANALYZING, MAINTENANCE}
}
@ApplicationScoped
class DeviceStatusService {
var deviceStatusProcessor: PublishProcessor<DeviceStatus> = PublishProcessor.create()
var deviceStatusQueue: Flowable<DeviceStatus> = Flowable.fromPublisher(deviceStatusProcessor)
fun pushDeviceStatus(deviceStatus: DeviceStatus) {
deviceStatusProcessor.onNext(deviceStatus)
}
fun getStream(): Multi<DeviceStatus> {
return Multi.createFrom().publisher(deviceStatusQueue)
}
}
@Path("/deviceStatus")
class DeviceStatusResource {
private val LOGGER: Logger = Logger.getLogger("DeviceStatusResource")
@Inject
@field: Default
lateinit var deviceStatusService: DeviceStatusService
@POST
@Consumes(MediaType.APPLICATION_JSON)
fun status(status: DeviceStatus): Response {
LOGGER.info("POST /deviceStatus " + status.status);
deviceStatusService.pushDeviceStatus(status)
return Response.ok().build();
}
@GET
@Path("/eventStream")
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.APPLICATION_JSON)
fun stream(): Multi<DeviceStatus>? {
return deviceStatusService.getStream()
}
}
作为最小设置,服务可以直接使用 deviceStatusProcessor 作为发布者。然而,Flowable 增加了缓冲。 欢迎对实施提出意见。