NATS 持久消息 Java 客户端
NATS persistent message Java Client
有没有人有将 NATS 流媒体服务器与 Java 客户端结合使用的经验?具体来说,我不知道如何使用 Java 客户端检索订阅者离线时发送的消息。
我可以看到使用 Go client that I can publish a message and later add a subscription to retrieve all published messages. This is in the NATS Streaming Getting Started 文档并且它像宣传的那样工作。
Publish several messages. For each publication you should get a
result.
$ cd $GOPATH/src/github.com/nats-io/go-nats-streaming/examples
go run stan-pub.go foo "msg one"
Published [foo] : 'msg one'
$ go run stan-pub.go foo "msg two"
Published [foo] : 'msg two'
$ go run stan-pub.go foo "msg three"
Published [foo] : 'msg three'
Run the subscriber client.
Use the --all flag to receive all published messages.
$ go run stan-sub.go --all -c test-cluster -id myID foo
Connected to nats://localhost:4222 clusterID: [test-cluster] clientID: [myID]
subscribing with DeliverAllAvailable
Listening on [foo], clientID=[myID], qgroup=[] durable=[]
[#1] Received on [foo]: 'sequence:1 subject:"foo" data:"msg one" timestamp:1465962202884478817 '
[#2] Received on [foo]: 'sequence:2 subject:"foo" data:"msg two" timestamp:1465962208545003897 '
[#3] Received on [foo]: 'sequence:3 subject:"foo" data:"msg three" timestamp:1465962215567601196
我正在尝试使用 NATS Java client 执行此操作。我不知道是不是我没有找到类似的方法调用,还是 Java 客户端中不存在该功能。
这是我试过的方法
import io.nats.client.Connection;
import io.nats.client.ConnectionFactory;
import io.nats.client.Constants;
import io.nats.client.Message;
import io.nats.client.SyncSubscription;
import java.io.IOException;
import java.security.SecureRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class NatsTest2 {
private static final SecureRandom random = new SecureRandom();
public static void main(String... args) throws Exception {
final ConnectionFactory factory = new ConnectionFactory(Constants.DEFAULT_URL);
try (final Connection conn = factory.createConnection()) {
// Simple Async Subscriber
final String expectMessage = "Yum, cookies " + System.currentTimeMillis();
works(conn, expectMessage);
broken(conn, expectMessage);
}
}
private static void works(Connection conn, String expectMessage) throws IOException, TimeoutException {
final String queue = Long.toString(random.nextLong());
System.out.print(queue + "=>");
try (final SyncSubscription subscription = conn.subscribeSync(queue)) {
conn.publish(queue, expectMessage.getBytes());
subscribe(subscription);
}
}
private static void broken(Connection conn, String expectMessage) throws IOException, TimeoutException {
final String queue = Long.toString(random.nextLong());
System.out.print(queue + "=>");
conn.publish(queue, expectMessage.getBytes());
try (final SyncSubscription subscription = conn.subscribeSync(queue)) {
subscribe(subscription);
}
}
private static void subscribe(SyncSubscription subscription) throws IOException, TimeoutException {
final Message message = subscription.nextMessage(1, TimeUnit.SECONDS);
System.out.println(new String(message.getData()));
}
}
这给出了输出
-8522002637987832314=>Yum, cookies 1473462495040
-3024385525006291780=>Exception in thread "main" java.util.concurrent.TimeoutException: Channel timed out waiting for items
如果您使用 nats-streaming-server
,则需要使用 java-nats-streaming 客户端。您正在寻找的功能(订阅历史消息)仅存在于该客户端。
无论如何,这就是为什么你看到了你对 jnats
客户端所做的事情:
nats-streaming-server
当前嵌入了一个 NATS 服务器 (gnatsd
),因此允许常规 NATS 客户端使用标准 NATS 功能,这就是您所看到的。
在您的示例代码中,works()
恰好有效,因为您的订阅在您发布消息之前已经创建(换句话说,您的 try-with-resources
块保证订阅在其他任何事情之前已经处于活动状态发生)。因此,您实际上并没有收到过去发布的消息;您收到的消息是在订阅开始后发布的。
broken()
示例不工作,因为它实际上是在创建订阅之前发布消息,并且该消息被服务器丢弃,因为(还)没有兴趣。
有没有人有将 NATS 流媒体服务器与 Java 客户端结合使用的经验?具体来说,我不知道如何使用 Java 客户端检索订阅者离线时发送的消息。
我可以看到使用 Go client that I can publish a message and later add a subscription to retrieve all published messages. This is in the NATS Streaming Getting Started 文档并且它像宣传的那样工作。
Publish several messages. For each publication you should get a result.
$ cd $GOPATH/src/github.com/nats-io/go-nats-streaming/examples
go run stan-pub.go foo "msg one"
Published [foo] : 'msg one'
$ go run stan-pub.go foo "msg two"
Published [foo] : 'msg two'
$ go run stan-pub.go foo "msg three"
Published [foo] : 'msg three'
Run the subscriber client. Use the --all flag to receive all published messages.
$ go run stan-sub.go --all -c test-cluster -id myID foo
Connected to nats://localhost:4222 clusterID: [test-cluster] clientID: [myID]
subscribing with DeliverAllAvailable
Listening on [foo], clientID=[myID], qgroup=[] durable=[]
[#1] Received on [foo]: 'sequence:1 subject:"foo" data:"msg one" timestamp:1465962202884478817 '
[#2] Received on [foo]: 'sequence:2 subject:"foo" data:"msg two" timestamp:1465962208545003897 '
[#3] Received on [foo]: 'sequence:3 subject:"foo" data:"msg three" timestamp:1465962215567601196
我正在尝试使用 NATS Java client 执行此操作。我不知道是不是我没有找到类似的方法调用,还是 Java 客户端中不存在该功能。
这是我试过的方法
import io.nats.client.Connection;
import io.nats.client.ConnectionFactory;
import io.nats.client.Constants;
import io.nats.client.Message;
import io.nats.client.SyncSubscription;
import java.io.IOException;
import java.security.SecureRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class NatsTest2 {
private static final SecureRandom random = new SecureRandom();
public static void main(String... args) throws Exception {
final ConnectionFactory factory = new ConnectionFactory(Constants.DEFAULT_URL);
try (final Connection conn = factory.createConnection()) {
// Simple Async Subscriber
final String expectMessage = "Yum, cookies " + System.currentTimeMillis();
works(conn, expectMessage);
broken(conn, expectMessage);
}
}
private static void works(Connection conn, String expectMessage) throws IOException, TimeoutException {
final String queue = Long.toString(random.nextLong());
System.out.print(queue + "=>");
try (final SyncSubscription subscription = conn.subscribeSync(queue)) {
conn.publish(queue, expectMessage.getBytes());
subscribe(subscription);
}
}
private static void broken(Connection conn, String expectMessage) throws IOException, TimeoutException {
final String queue = Long.toString(random.nextLong());
System.out.print(queue + "=>");
conn.publish(queue, expectMessage.getBytes());
try (final SyncSubscription subscription = conn.subscribeSync(queue)) {
subscribe(subscription);
}
}
private static void subscribe(SyncSubscription subscription) throws IOException, TimeoutException {
final Message message = subscription.nextMessage(1, TimeUnit.SECONDS);
System.out.println(new String(message.getData()));
}
}
这给出了输出
-8522002637987832314=>Yum, cookies 1473462495040
-3024385525006291780=>Exception in thread "main" java.util.concurrent.TimeoutException: Channel timed out waiting for items
如果您使用 nats-streaming-server
,则需要使用 java-nats-streaming 客户端。您正在寻找的功能(订阅历史消息)仅存在于该客户端。
无论如何,这就是为什么你看到了你对 jnats
客户端所做的事情:
nats-streaming-server
当前嵌入了一个 NATS 服务器 (gnatsd
),因此允许常规 NATS 客户端使用标准 NATS 功能,这就是您所看到的。
在您的示例代码中,works()
恰好有效,因为您的订阅在您发布消息之前已经创建(换句话说,您的 try-with-resources
块保证订阅在其他任何事情之前已经处于活动状态发生)。因此,您实际上并没有收到过去发布的消息;您收到的消息是在订阅开始后发布的。
broken()
示例不工作,因为它实际上是在创建订阅之前发布消息,并且该消息被服务器丢弃,因为(还)没有兴趣。