NATS Stream 创建失败
NATS Stream creation fail
当我尝试在本地 NATS 服务器上创建流时出现错误。连接建立成功,但 jsm.addStream(conf)
请求中断。
我正在使用 Maven 神器:
<dependency>
<groupId>io.nats</groupId>
<artifactId>jnats</artifactId>
<version>2.12.0</version>
</dependency>
方法连接:
private Connection connection;
@SneakyThrows
public void openConnection() {
connection = Nats.connect();
}
@SneakyThrows
public void openStream(String name, String sub) {
JetStreamManagement jsm = connection.jetStreamManagement();
StreamConfiguration conf = StreamConfiguration.builder()
.name(name)
.subjects(sub)
.storageType(StorageType.Memory)
.build();
StreamInfo streamInfo = jsm.addStream(conf);
JsonUtils.printFormatted(streamInfo);
}
通话:
@ResponseStatus(value = HttpStatus.OK)
@GetMapping("/connect")
public void connect() {
natsProducer.openConnection();
natsProducer.openStream("some-name", "com.demosub");
}
NATS 运行 inside docker conainer on localhost
:
C:\Users\pavel>docker pull nats:latest
latest: Pulling from library/nats
d16cac695b49: Pull complete
c945b005f9f3: Pull complete
Digest: sha256:382506a4a72d887560f91eee19d3536c3694aa1750c7f9a172bcc09ebcac6273
Status: Downloaded newer image for nats:latest
docker.io/library/nats:latest
C:\Users\pavel>docker run -p 4222:4222 -ti nats:latest
[1] 2021/09/25 12:04:15.674548 [INF] Starting nats-server
[1] 2021/09/25 12:04:15.674585 [INF] Version: 2.6.1
[1] 2021/09/25 12:04:15.674604 [INF] Git: [c91f0fe]
[1] 2021/09/25 12:04:15.674622 [INF] Name: NBJL25OYJYSFE6WOS4F3ZBXUQFVJFOAVGWH2XTBYLUPOHGKVLZI3TQJ4
[1] 2021/09/25 12:04:15.674644 [INF] ID: NBJL25OYJYSFE6WOS4F3ZBXUQFVJFOAVGWH2XTBYLUPOHGKVLZI3TQJ4
[1] 2021/09/25 12:04:15.674670 [INF] Using configuration file: nats-server.conf
[1] 2021/09/25 12:04:15.675538 [INF] Starting http monitor on 0.0.0.0:8222
[1] 2021/09/25 12:04:15.675631 [INF] Listening for client connections on 0.0.0.0:4222
[1] 2021/09/25 12:04:15.675871 [INF] Server is ready
[1] 2021/09/25 12:04:15.675904 [INF] Cluster name is tM3uBDWH0KKEqa2G2pVFv3
[1] 2021/09/25 12:04:15.675944 [WRN] Cluster name was dynamically generated, consider setting one
[1] 2021/09/25 12:04:15.675980 [INF] Listening for route connections on 0.0.0.0:6222
但是当我调用端点时出现错误:
java.io.IOException: Timeout or no response waiting for NATS JetStream server
at io.nats.client.impl.NatsJetStreamImplBase.responseRequired(NatsJetStreamImplBase.java:84) ~[jnats-2.12.0.jar:2.12.0]
at io.nats.client.impl.NatsJetStreamImplBase.makeRequestResponseRequired(NatsJetStreamImplBase.java:68) ~[jnats-2.12.0.jar:2.12.0]
at io.nats.client.impl.NatsJetStreamManagement.addOrUpdateStream(NatsJetStreamManagement.java:66) ~[jnats-2.12.0.jar:2.12.0]
at io.nats.client.impl.NatsJetStreamManagement.addStream(NatsJetStreamManagement.java:47) ~[jnats-2.12.0.jar:2.12.0]
at com.example.natsdemo.NatsProducer.openStream(NatsProducer.java:36) ~[classes/:na]
at com.example.natsdemo.NatsController.connect(NatsController.java:25) ~[classes/:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:78) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:567) ~[na:na]
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205) ~[spring-web-5.3.10.jar:5.3.10]
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:150) ~[spring-web-5.3.10.jar:5.3.10]
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:117) ~[spring-webmvc-5.3.10.jar:5.3.10]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:895) ~[spring-webmvc-5.3.10.jar:5.3.10]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808) ~[spring-webmvc-5.3.10.jar:5.3.10]
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.3.10.jar:5.3.10]
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1067) ~[spring-webmvc-5.3.10.jar:5.3.10]
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:963) ~[spring-webmvc-5.3.10.jar:5.3.10]
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) ~[spring-webmvc-5.3.10.jar:5.3.10]
at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898) ~[spring-webmvc-5.3.10.jar:5.3.10]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:655) ~[tomcat-embed-core-9.0.53.jar:4.0.FR]
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) ~[spring-webmvc-5.3.10.jar:5.3.10]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:764) ~[tomcat-embed-core-9.0.53.jar:4.0.FR]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) ~[tomcat-embed-websocket-9.0.53.jar:9.0.53]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) ~[spring-web-5.3.10.jar:5.3.10]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.10.jar:5.3.10]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) ~[spring-web-5.3.10.jar:5.3.10]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.10.jar:5.3.10]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) ~[spring-web-5.3.10.jar:5.3.10]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.10.jar:5.3.10]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:197) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:540) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:135) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:357) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:382) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:893) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1726) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at java.base/java.lang.Thread.run(Thread.java:831) ~[na:na]
我的解决方案有什么问题?
看起来您没有在启用 JetStream 的情况下启动 nats-server,将 -js
添加到传递给 nats-server
的参数中,或者将以下部分添加到您的服务器配置文件中(调整服务器名称当然还有你喜欢的存储路径):
server_name: server1
jetstream {
store_dir="/mystorage/server1"
}
问题是打开流的方式。正确解法:
Options options = new Options.Builder()
.server(url)
.connectionListener((conn, type) -> {
System.out.println(type);
}).build();
connection = Nats.connect(options);
并异步发送:
@SneakyThrows
void pushAsync(@NotNull String sub, @NotNull String msg) {
JetStream stream = connection.jetStream();
CompletableFuture<PublishAck> future = stream
.publishAsync(sub, msg.getBytes(StandardCharsets.UTF_8));
future.thenApplyAsync(ack -> {
log.debug(ack);
return ack;
});
}
当我尝试在本地 NATS 服务器上创建流时出现错误。连接建立成功,但 jsm.addStream(conf)
请求中断。
我正在使用 Maven 神器:
<dependency>
<groupId>io.nats</groupId>
<artifactId>jnats</artifactId>
<version>2.12.0</version>
</dependency>
方法连接:
private Connection connection;
@SneakyThrows
public void openConnection() {
connection = Nats.connect();
}
@SneakyThrows
public void openStream(String name, String sub) {
JetStreamManagement jsm = connection.jetStreamManagement();
StreamConfiguration conf = StreamConfiguration.builder()
.name(name)
.subjects(sub)
.storageType(StorageType.Memory)
.build();
StreamInfo streamInfo = jsm.addStream(conf);
JsonUtils.printFormatted(streamInfo);
}
通话:
@ResponseStatus(value = HttpStatus.OK)
@GetMapping("/connect")
public void connect() {
natsProducer.openConnection();
natsProducer.openStream("some-name", "com.demosub");
}
NATS 运行 inside docker conainer on localhost
:
C:\Users\pavel>docker pull nats:latest
latest: Pulling from library/nats
d16cac695b49: Pull complete
c945b005f9f3: Pull complete
Digest: sha256:382506a4a72d887560f91eee19d3536c3694aa1750c7f9a172bcc09ebcac6273
Status: Downloaded newer image for nats:latest
docker.io/library/nats:latest
C:\Users\pavel>docker run -p 4222:4222 -ti nats:latest
[1] 2021/09/25 12:04:15.674548 [INF] Starting nats-server
[1] 2021/09/25 12:04:15.674585 [INF] Version: 2.6.1
[1] 2021/09/25 12:04:15.674604 [INF] Git: [c91f0fe]
[1] 2021/09/25 12:04:15.674622 [INF] Name: NBJL25OYJYSFE6WOS4F3ZBXUQFVJFOAVGWH2XTBYLUPOHGKVLZI3TQJ4
[1] 2021/09/25 12:04:15.674644 [INF] ID: NBJL25OYJYSFE6WOS4F3ZBXUQFVJFOAVGWH2XTBYLUPOHGKVLZI3TQJ4
[1] 2021/09/25 12:04:15.674670 [INF] Using configuration file: nats-server.conf
[1] 2021/09/25 12:04:15.675538 [INF] Starting http monitor on 0.0.0.0:8222
[1] 2021/09/25 12:04:15.675631 [INF] Listening for client connections on 0.0.0.0:4222
[1] 2021/09/25 12:04:15.675871 [INF] Server is ready
[1] 2021/09/25 12:04:15.675904 [INF] Cluster name is tM3uBDWH0KKEqa2G2pVFv3
[1] 2021/09/25 12:04:15.675944 [WRN] Cluster name was dynamically generated, consider setting one
[1] 2021/09/25 12:04:15.675980 [INF] Listening for route connections on 0.0.0.0:6222
但是当我调用端点时出现错误:
java.io.IOException: Timeout or no response waiting for NATS JetStream server
at io.nats.client.impl.NatsJetStreamImplBase.responseRequired(NatsJetStreamImplBase.java:84) ~[jnats-2.12.0.jar:2.12.0]
at io.nats.client.impl.NatsJetStreamImplBase.makeRequestResponseRequired(NatsJetStreamImplBase.java:68) ~[jnats-2.12.0.jar:2.12.0]
at io.nats.client.impl.NatsJetStreamManagement.addOrUpdateStream(NatsJetStreamManagement.java:66) ~[jnats-2.12.0.jar:2.12.0]
at io.nats.client.impl.NatsJetStreamManagement.addStream(NatsJetStreamManagement.java:47) ~[jnats-2.12.0.jar:2.12.0]
at com.example.natsdemo.NatsProducer.openStream(NatsProducer.java:36) ~[classes/:na]
at com.example.natsdemo.NatsController.connect(NatsController.java:25) ~[classes/:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:78) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:567) ~[na:na]
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205) ~[spring-web-5.3.10.jar:5.3.10]
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:150) ~[spring-web-5.3.10.jar:5.3.10]
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:117) ~[spring-webmvc-5.3.10.jar:5.3.10]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:895) ~[spring-webmvc-5.3.10.jar:5.3.10]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808) ~[spring-webmvc-5.3.10.jar:5.3.10]
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.3.10.jar:5.3.10]
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1067) ~[spring-webmvc-5.3.10.jar:5.3.10]
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:963) ~[spring-webmvc-5.3.10.jar:5.3.10]
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) ~[spring-webmvc-5.3.10.jar:5.3.10]
at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898) ~[spring-webmvc-5.3.10.jar:5.3.10]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:655) ~[tomcat-embed-core-9.0.53.jar:4.0.FR]
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) ~[spring-webmvc-5.3.10.jar:5.3.10]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:764) ~[tomcat-embed-core-9.0.53.jar:4.0.FR]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) ~[tomcat-embed-websocket-9.0.53.jar:9.0.53]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) ~[spring-web-5.3.10.jar:5.3.10]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.10.jar:5.3.10]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) ~[spring-web-5.3.10.jar:5.3.10]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.10.jar:5.3.10]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) ~[spring-web-5.3.10.jar:5.3.10]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.10.jar:5.3.10]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:197) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:540) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:135) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:357) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:382) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:893) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1726) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) ~[tomcat-embed-core-9.0.53.jar:9.0.53]
at java.base/java.lang.Thread.run(Thread.java:831) ~[na:na]
我的解决方案有什么问题?
看起来您没有在启用 JetStream 的情况下启动 nats-server,将 -js
添加到传递给 nats-server
的参数中,或者将以下部分添加到您的服务器配置文件中(调整服务器名称当然还有你喜欢的存储路径):
server_name: server1
jetstream {
store_dir="/mystorage/server1"
}
问题是打开流的方式。正确解法:
Options options = new Options.Builder()
.server(url)
.connectionListener((conn, type) -> {
System.out.println(type);
}).build();
connection = Nats.connect(options);
并异步发送:
@SneakyThrows
void pushAsync(@NotNull String sub, @NotNull String msg) {
JetStream stream = connection.jetStream();
CompletableFuture<PublishAck> future = stream
.publishAsync(sub, msg.getBytes(StandardCharsets.UTF_8));
future.thenApplyAsync(ack -> {
log.debug(ack);
return ack;
});
}