是否可以将 Reactor Context 传递给从 Sinks 创建的 Flux?

Is it possible to pass the Reactor Context to a Flux created from a Sinks?

我们目前正在使用 ReactiveSecurityContextHolder,它获取我们正确的 Auth 详细信息并在 Flux 流中使用。

现在我们要解耦一些东西。第一次迭代是我们使用 Sinks 作为中间体 'event-hub'。因此,我们从端点向 Sinks.Many.

生成一些项目

侦听器正在使用来自此接收器的事件并执行繁重的工作。现在,在这个消费者中,我想使用生产站点上可用的上下文。我知道可以 deferContextual 将当前上下文传递给另一个 Flux。但是是否可以将上下文传递给从接收器生成的 Flux?

提前致谢。

亚历克斯

目前没有 API 在任意 Sinks 上公开它。 Sinks 的挑战在于它们中的很多正在多播到多个 Subscriber,并且 Context 是在每个 Subscriber.

上定义的

虽然有一个 hack:Sinks.Many<T>Scannable,并且大多数具体实现 应该 通过 [=17= 公开他们当前的订阅者集合] 方法。在单播接收器的情况下,scan(Attr.ACTUAL) 也可以。

两大注意事项:

  • 这些 API 仅公开 Scannable,不允许直接访问 Context
  • 如果实现的内部订阅者不是 Scannable,它在流中被替换为 Scannable#NOT_SCANNABLE 常量

大部分 reactor-core CoreSubscribers 都是 Scannable,但是如果您连接了一个不可扫描的自定义订阅者,即使它有 Context,您也看不到它。

reactor-core 中的多播接收器倾向于将下游订阅者包装在它们自己的内部 Scannable 内部跟踪器中,这将使这种方法起作用。

单播接收器有点不同,因为它们直接连接到下游 Subscriber。因此,如果它是 CoreSubscriber 但不知何故不是 Scannable,您将无法将其视为 CoreSubscriber 并访问其 Context.

方法总结:

  1. 致电sink.inners()获得Stream<Scannable>
  2. 确保值是 CoreSubscriber 的实例(这是可能出错的部分)
  3. 将值转换为 CoreSubscriber 并调用 currentContext()
  4. 以某种方式协调各种 Context 以提取相关的键值对