NATS Stream 创建失败

NATS Stream creation fail

当我尝试在本地 NATS 服务器上创建流时出现错误。连接建立成功,但 jsm.addStream(conf) 请求中断。

我正在使用 Maven 神器:

<dependency>
    <groupId>io.nats</groupId>
    <artifactId>jnats</artifactId>
    <version>2.12.0</version>
</dependency>

方法连接:

private Connection connection;

@SneakyThrows
public void openConnection() {
    connection = Nats.connect();
}

@SneakyThrows
public void openStream(String name, String sub) {
    JetStreamManagement jsm = connection.jetStreamManagement();
    StreamConfiguration conf = StreamConfiguration.builder()
            .name(name)
            .subjects(sub)
            .storageType(StorageType.Memory)
            .build();
    StreamInfo streamInfo = jsm.addStream(conf);
    JsonUtils.printFormatted(streamInfo);
}

通话:

@ResponseStatus(value = HttpStatus.OK)
@GetMapping("/connect")
public void connect() {
    natsProducer.openConnection();
    natsProducer.openStream("some-name", "com.demosub");
}

NATS 运行 inside docker conainer on localhost:

C:\Users\pavel>docker pull nats:latest
latest: Pulling from library/nats
d16cac695b49: Pull complete
c945b005f9f3: Pull complete
Digest: sha256:382506a4a72d887560f91eee19d3536c3694aa1750c7f9a172bcc09ebcac6273
Status: Downloaded newer image for nats:latest
docker.io/library/nats:latest

C:\Users\pavel>docker run -p 4222:4222 -ti nats:latest
[1] 2021/09/25 12:04:15.674548 [INF] Starting nats-server
[1] 2021/09/25 12:04:15.674585 [INF]   Version:  2.6.1
[1] 2021/09/25 12:04:15.674604 [INF]   Git:      [c91f0fe]
[1] 2021/09/25 12:04:15.674622 [INF]   Name:     NBJL25OYJYSFE6WOS4F3ZBXUQFVJFOAVGWH2XTBYLUPOHGKVLZI3TQJ4
[1] 2021/09/25 12:04:15.674644 [INF]   ID:       NBJL25OYJYSFE6WOS4F3ZBXUQFVJFOAVGWH2XTBYLUPOHGKVLZI3TQJ4
[1] 2021/09/25 12:04:15.674670 [INF] Using configuration file: nats-server.conf
[1] 2021/09/25 12:04:15.675538 [INF] Starting http monitor on 0.0.0.0:8222
[1] 2021/09/25 12:04:15.675631 [INF] Listening for client connections on 0.0.0.0:4222
[1] 2021/09/25 12:04:15.675871 [INF] Server is ready
[1] 2021/09/25 12:04:15.675904 [INF] Cluster name is tM3uBDWH0KKEqa2G2pVFv3
[1] 2021/09/25 12:04:15.675944 [WRN] Cluster name was dynamically generated, consider setting one
[1] 2021/09/25 12:04:15.675980 [INF] Listening for route connections on 0.0.0.0:6222

但是当我调用端点时出现错误:

java.io.IOException: Timeout or no response waiting for NATS JetStream server
    at io.nats.client.impl.NatsJetStreamImplBase.responseRequired(NatsJetStreamImplBase.java:84) ~[jnats-2.12.0.jar:2.12.0]
    at io.nats.client.impl.NatsJetStreamImplBase.makeRequestResponseRequired(NatsJetStreamImplBase.java:68) ~[jnats-2.12.0.jar:2.12.0]
    at io.nats.client.impl.NatsJetStreamManagement.addOrUpdateStream(NatsJetStreamManagement.java:66) ~[jnats-2.12.0.jar:2.12.0]
    at io.nats.client.impl.NatsJetStreamManagement.addStream(NatsJetStreamManagement.java:47) ~[jnats-2.12.0.jar:2.12.0]
    at com.example.natsdemo.NatsProducer.openStream(NatsProducer.java:36) ~[classes/:na]
    at com.example.natsdemo.NatsController.connect(NatsController.java:25) ~[classes/:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:78) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:567) ~[na:na]
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205) ~[spring-web-5.3.10.jar:5.3.10]
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:150) ~[spring-web-5.3.10.jar:5.3.10]
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:117) ~[spring-webmvc-5.3.10.jar:5.3.10]
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:895) ~[spring-webmvc-5.3.10.jar:5.3.10]
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808) ~[spring-webmvc-5.3.10.jar:5.3.10]
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.3.10.jar:5.3.10]
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1067) ~[spring-webmvc-5.3.10.jar:5.3.10]
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:963) ~[spring-webmvc-5.3.10.jar:5.3.10]
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) ~[spring-webmvc-5.3.10.jar:5.3.10]
    at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898) ~[spring-webmvc-5.3.10.jar:5.3.10]
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:655) ~[tomcat-embed-core-9.0.53.jar:4.0.FR]
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) ~[spring-webmvc-5.3.10.jar:5.3.10]
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:764) ~[tomcat-embed-core-9.0.53.jar:4.0.FR]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) ~[tomcat-embed-websocket-9.0.53.jar:9.0.53]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
    at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) ~[spring-web-5.3.10.jar:5.3.10]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.10.jar:5.3.10]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
    at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) ~[spring-web-5.3.10.jar:5.3.10]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.10.jar:5.3.10]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) ~[spring-web-5.3.10.jar:5.3.10]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.10.jar:5.3.10]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:197) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:540) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:135) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:357) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
    at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:382) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
    at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
    at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:893) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1726) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
    at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
    at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
    at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
    at java.base/java.lang.Thread.run(Thread.java:831) ~[na:na]

我的解决方案有什么问题?

看起来您没有在启用 JetStream 的情况下启动 nats-server,将 -js 添加到传递给 nats-server 的参数中,或者将以下部分添加到您的服务器配置文件中(调整服务器名称当然还有你喜欢的存储路径):

server_name: server1
jetstream {
        store_dir="/mystorage/server1"
}

问题是打开流的方式。正确解法:

Options options = new Options.Builder()
            .server(url)
            .connectionListener((conn, type) -> {
                        System.out.println(type);
                    }).build();
connection = Nats.connect(options);

并异步发送:

@SneakyThrows
void pushAsync(@NotNull String sub, @NotNull String msg) {
    JetStream stream = connection.jetStream();

    CompletableFuture<PublishAck> future = stream
            .publishAsync(sub, msg.getBytes(StandardCharsets.UTF_8));

    future.thenApplyAsync(ack -> {
        log.debug(ack);
        return ack;
    });
}