graphql-java - 如何在 spring 启动时使用订阅?

graphql-java - How to use subscriptions with spring boot?

在一个项目中我使用了graphql-java and spring boot with a postgreSQL Database. Now I would like to use the subscription feature发布的3.0.0版本。不幸的是,关于订阅功能应用的资料还不是很成熟。

使用 graphql-java 和订阅实现 实时功能 的方法如何?

我遇到了同样的问题,我在 lib 上加注以与 spring boot 集成。我找到了 graphql-java,但是,它似乎只支持模式级别的 'subscription',它不对此功能执行任何跨国支持。这意味着您可能需要自己实施它。

请参考https://github.com/graphql-java/graphql-java/blob/master/docs/schema.rst#subscription-support

从最近的 graphql-java 版本开始,完全支持订阅。订阅的 DataFetcher 必须 return 和 org.reactivestreams.Publisher,并且 graphql-java 将负责将查询函数映射到结果上。

该功能非常好 documented and there's a complete example 使用官方存储库中可用的网络套接字。

如果你有一个响应式数据源(例如 Mongo 和一个响应式驱动程序,或者任何 R2DBC supports), you're all set. Just use @Tailable 和 Spring 数据可能已经给你一个 Flux(它实现了Publisher),你不需要做任何其他事情。

至于更手动的 Spring 具体实现,我无法想象使用 Spring's own event mechanism (a nice tutorial here 也太难了) 作为 Publisher.[=28= 的基础]

每次有传入订阅时,创建并注册一个新的侦听器到应用程序上下文:context.addApplicationListener(listener),它将发布到正确的 Publisher。例如。在 DataFetcher:

// Somehow create a publisher, probably using Spring's Reactor project. Or RxJava.
Publisher<ResultObject> publisher = ...; 
//The listener reacts on application events and pushes new values through the publisher
ApplicationListener listener = createListener(publisher);
context.addApplicationListener(listener);
return publisher;

当网络套接字断开连接或您以某种方式知道事件流已完成时,您必须确保删除侦听器。

我还没有真正尝试过这些,请注意,我只是在大声思考。

另一种选择是直接使用 Reactor(有或没有 Spring WebFlux)。有一个使用 Reactor 和 WebSocket 的示例(通过 GraphQL SPQR Spring Boot Starter) here.

你创建一个 Publisher 这样的:

//This is really just a thread-safe wrapper around Map<String, Set<FluxSink<Task>>>
private final ConcurrentMultiRegistry<String, FluxSink<Task>> subscribers = new ConcurrentMultiRegistry<>();

@GraphQLSubscription
public Publisher<Task> taskStatusChanged(String taskId) {
    return Flux.create(subscriber -> subscribers.add(taskId, subscriber.onDispose(() -> subscribers.remove(taskId, subscriber))), FluxSink.OverflowStrategy.LATEST);
}

然后像这样从其他地方推送新值(可能是相关的突变或反应性存储):

subscribers.get(taskId).forEach(subscriber -> subscriber.next(task));

例如

@GraphQLMutation
public Task updateTask(@GraphQLNonNull String taskId, @GraphQLNonNull Status status) {
    Task task = repo.byId(taskId); //find the task
    task.setStatus(status); //update the task
    repo.save(task); //persist the task
    //Notify all the subscribers following this task
    subscribers.get(taskId).forEach(subscriber -> subscriber.next(task));
    return task;
}

使用 SPQR Spring Starter,这就是让您获得与 Apollo 兼容的订阅实施所需的全部。

记录在案:这是另一个非常好的、紧凑的示例,它实现了 GraphQL 的基本功能查询、突变和订阅:https://github.com/npalm/blog-graphql-spring-service