Java 中的 Nats request/reply
Nats request/reply in Java
我以前有 Kafka 知识,我一直在玩 Nats.io 这似乎是消息传递的可靠选择。
特别是,我对记录良好的 Request/Reply 机制很感兴趣,但我在使用 Jnats 驱动程序 Java 中正确实施它时遇到了困难。
这是我的连接器:
// Single server nats connection
@PostConstruct
public void connect() throws ExternalServiceUnavailableException {
Options options = new Options.Builder()
.server(connectionString)
.maxReconnects(20)
.reconnectWait(Duration.ofSeconds(5))
.connectionTimeout(Duration.ofSeconds(5))
.connectionListener((conn, type) -> {
if (type == ConnectionListener.Events.CONNECTED) {
LOG.info("Connected to Nats Server");
} else if (type == ConnectionListener.Events.RECONNECTED) {
LOG.info("Reconnected to Nats Server");
} else if (type == ConnectionListener.Events.DISCONNECTED) {
LOG.error("Disconnected to Nats Server, reconnect attempt in seconds");
} else if (type == ConnectionListener.Events.CLOSED) {
LOG.info("Closed connection with Nats Server");
}
})
.build();
try {
connection = Nats.connect(options);
} catch (Exception e) {
LOG.error("Unable to connect to Nats Server");
throw new ExternalServiceUnavailableException(ExternalServiceUnavailableException.Service.NATS);
}
}
这是请求方法(出于测试目的等待时间非常长):
public Optional<String> asyncRequest(String topic, String message) throws ExternalServiceUnavailableException {
Future<Message> reply = natsConnector.getConnection().request(topic, message.getBytes());
try {
Message msg = reply.get(10L, TimeUnit.SECONDS);
LOG.info(new String(msg.getData()));
return Optional.of(new String(msg.getData(), StandardCharsets.UTF_8));
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
LOG.error("Unable to retrieve response for the sent request: " + message);
throw new ExternalServiceUnavailableException(ExternalServiceUnavailableException.Service.NATS);
}
}
这是具有回复机制的响应处理程序:
@PostConstruct
private void init() {
Dispatcher dispatcher = natsConnector.getConnection().createDispatcher(message -> {
});
Subscription assetsInfo = dispatcher.subscribe("assets-info", message -> {
JSONObject requestMessage = new JSONObject(new String(message.getData(), StandardCharsets.UTF_8));
if (requestMessage.getString("requestType").equals("stock-status")) {
if (requestMessage.getString("of").equals("all")) {
JSONObject response = assetQuery.retrieveYesterdayStockStatus();
LOG.info("response ready");
natsOperation.publishEvent("assets-info", response);
LOG.info("message sent");
}
}
});
}
我的两个独立服务通过 dockerized Nats.io 进行通信,我可以通过 Nats Go 客户端正确检查两个服务已就同一主题发送消息。
不幸的是,即使 reply.get(...)
.
中的等待时间非常长,"Requestor" 在调用 asyncRequest
函数时也无法完全处理回复。
当我尝试在调试模式下评估 reply
对象时,它没有任何数据并显示 TimeoutException
.
在 msg.getData()
程序崩溃。
你们有什么提示吗?
谢谢!
您应该更改 "replyer" 代码以将原始消息发布到 replyTo 主题。
@PostConstruct
private void init() {
Dispatcher dispatcher = natsConnector.getConnection().createDispatcher(message -> {
});
Subscription assetsInfo = dispatcher.subscribe("assets-info", message -> {
JSONObject requestMessage = new JSONObject(new String(message.getData(), StandardCharsets.UTF_8));
if (requestMessage.getString("requestType").equals("stock-status")) {
if (requestMessage.getString("of").equals("all")) {
JSONObject response = assetQuery.retrieveYesterdayStockStatus();
LOG.info("response ready");
//See Change Here
natsOperation.publish(message.getReplyTo(), response);
LOG.info("message sent");
}
}
});
}
请求回复机制正在寻找对生成的 replyTo 主题的单一回复。
我以前有 Kafka 知识,我一直在玩 Nats.io 这似乎是消息传递的可靠选择。
特别是,我对记录良好的 Request/Reply 机制很感兴趣,但我在使用 Jnats 驱动程序 Java 中正确实施它时遇到了困难。
这是我的连接器:
// Single server nats connection
@PostConstruct
public void connect() throws ExternalServiceUnavailableException {
Options options = new Options.Builder()
.server(connectionString)
.maxReconnects(20)
.reconnectWait(Duration.ofSeconds(5))
.connectionTimeout(Duration.ofSeconds(5))
.connectionListener((conn, type) -> {
if (type == ConnectionListener.Events.CONNECTED) {
LOG.info("Connected to Nats Server");
} else if (type == ConnectionListener.Events.RECONNECTED) {
LOG.info("Reconnected to Nats Server");
} else if (type == ConnectionListener.Events.DISCONNECTED) {
LOG.error("Disconnected to Nats Server, reconnect attempt in seconds");
} else if (type == ConnectionListener.Events.CLOSED) {
LOG.info("Closed connection with Nats Server");
}
})
.build();
try {
connection = Nats.connect(options);
} catch (Exception e) {
LOG.error("Unable to connect to Nats Server");
throw new ExternalServiceUnavailableException(ExternalServiceUnavailableException.Service.NATS);
}
}
这是请求方法(出于测试目的等待时间非常长):
public Optional<String> asyncRequest(String topic, String message) throws ExternalServiceUnavailableException {
Future<Message> reply = natsConnector.getConnection().request(topic, message.getBytes());
try {
Message msg = reply.get(10L, TimeUnit.SECONDS);
LOG.info(new String(msg.getData()));
return Optional.of(new String(msg.getData(), StandardCharsets.UTF_8));
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
LOG.error("Unable to retrieve response for the sent request: " + message);
throw new ExternalServiceUnavailableException(ExternalServiceUnavailableException.Service.NATS);
}
}
这是具有回复机制的响应处理程序:
@PostConstruct
private void init() {
Dispatcher dispatcher = natsConnector.getConnection().createDispatcher(message -> {
});
Subscription assetsInfo = dispatcher.subscribe("assets-info", message -> {
JSONObject requestMessage = new JSONObject(new String(message.getData(), StandardCharsets.UTF_8));
if (requestMessage.getString("requestType").equals("stock-status")) {
if (requestMessage.getString("of").equals("all")) {
JSONObject response = assetQuery.retrieveYesterdayStockStatus();
LOG.info("response ready");
natsOperation.publishEvent("assets-info", response);
LOG.info("message sent");
}
}
});
}
我的两个独立服务通过 dockerized Nats.io 进行通信,我可以通过 Nats Go 客户端正确检查两个服务已就同一主题发送消息。
不幸的是,即使 reply.get(...)
.
asyncRequest
函数时也无法完全处理回复。
当我尝试在调试模式下评估 reply
对象时,它没有任何数据并显示 TimeoutException
.
在 msg.getData()
程序崩溃。
你们有什么提示吗? 谢谢!
您应该更改 "replyer" 代码以将原始消息发布到 replyTo 主题。
@PostConstruct
private void init() {
Dispatcher dispatcher = natsConnector.getConnection().createDispatcher(message -> {
});
Subscription assetsInfo = dispatcher.subscribe("assets-info", message -> {
JSONObject requestMessage = new JSONObject(new String(message.getData(), StandardCharsets.UTF_8));
if (requestMessage.getString("requestType").equals("stock-status")) {
if (requestMessage.getString("of").equals("all")) {
JSONObject response = assetQuery.retrieveYesterdayStockStatus();
LOG.info("response ready");
//See Change Here
natsOperation.publish(message.getReplyTo(), response);
LOG.info("message sent");
}
}
});
}
请求回复机制正在寻找对生成的 replyTo 主题的单一回复。