使用 Java 程序在 Azure 服务总线队列中接收待处理消息?
Receive pending message in Azure Service Bus queue with Java program?
我正在努力接收 Azure 服务总线队列中的待处理消息。
我在 Azure ServiceBus 中创建了一个队列 (SampleQueue),我能够使用我的 Java 程序生成的 SAS 令牌通过 POSTMAN 在该队列中成功发送消息。
在到达我的服务总线队列 api url 后,我也获得了 201 created 状态(下图)。
我想接收在我的服务总线队列中挂起的消息。我浏览了一些有关接收消息的链接 (https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-java-how-to-use-queues),但这不包含有关如何接收和查看这些消息的信息。
我的 Java 从服务总线队列接收消息的代码如下所示[我是 Java 的新手]:-
public class Test2 {
public static void main(String[] args) throws ServiceException {
String namespace = "SampleNamespace";
String sharedKeyName = "RootManageSharedAccessKey";
String sharedSecretKey = "t+U5ERMAnIyxgEUDUouGOKn6ADM/CuLWzEJZtauwVsc=";
String queueName = "QueueName";
// Azure Service Bus Service
com.microsoft.windowsazure.Configuration config = ServiceBusConfiguration.configureWithSASAuthentication(namespace, sharedKeyName, sharedSecretKey, ".servicebus.windows.net");
ServiceBusContract service = ServiceBusService.create(config);
// Receive and Delete Messages
ReceiveMessageOptions opts = ReceiveMessageOptions.DEFAULT;
opts.setReceiveMode(ReceiveMode.RECEIVE_AND_DELETE);
while (true) {
ReceiveQueueMessageResult resultQM = service.receiveQueueMessage(queueName , opts);
BrokeredMessage message = resultQM.getValue();
if (message != null && message.getMessageId() != null) {
System.out.println("Body: " + message.toString());
System.out.println("MessageID: " + message.getMessageId());
} else {
System.out.println("No more messages.");
break;
}
}
}
}
但是当我 运行 这段代码时,我得到以下错误:-
Exception in thread "main" java.lang.NoClassDefFoundError: javax/ws/rs/WebApplicationException
at java.lang.Class.getDeclaredConstructors0(Native Method)
at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
at java.lang.Class.getConstructors(Class.java:1651)
at com.microsoft.windowsazure.core.DefaultBuilder.findInjectConstructor(DefaultBuilder.java:67)
at com.microsoft.windowsazure.core.DefaultBuilder.add(DefaultBuilder.java:94)
at com.microsoft.windowsazure.services.servicebus.Exports.register(Exports.java:34)
at com.microsoft.windowsazure.core.DefaultBuilder.create(DefaultBuilder.java:46)
at com.microsoft.windowsazure.Configuration.<init>(Configuration.java:80)
at com.microsoft.windowsazure.Configuration.load(Configuration.java:100)
at com.microsoft.windowsazure.Configuration.getInstance(Configuration.java:90)
at com.microsoft.windowsazure.services.servicebus.ServiceBusConfiguration.configureWithSASAuthentication(ServiceBusConfiguration.java:252)
at com.rocky.servicebus.queue.Test2.main(Test2.java:24)
Caused by: java.lang.ClassNotFoundException: javax.ws.rs.WebApplicationException
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
任何人都可以帮助纠正我做错了什么吗?
如果有任何帮助,我们将不胜感激。
谢谢,
楼陀罗
在tutorial for receiving message的基础上,您需要创建一个队列客户端,并为其注册一个消息处理程序。
A) 获取连接字符串。
B) 发送和接收消息的代码示例
public static void registerReceiver(QueueClient queueClient, ExecutorService executorService) throws Exception {
queueClient.registerMessageHandler(
new IMessageHandler() {
public CompletableFuture<Void> onMessageAsync(IMessage message) {
if (message.getLabel() != null &&
message.getContentType() != null &&
message.getLabel().contentEquals("TestMessage") &&
message.getContentType().contentEquals("text/plain")) {
System.out.printf(
"\nMessage received: \n -->MessageId = %s\n -->ContentType = %s\n -->Content = %s\n",
message.getMessageId(),
message.getContentType(),
new String(message.getBody())
);
return queueClient.completeAsync(message.getLockToken());
}
return queueClient.abandonAsync(message.getLockToken());
}
public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) {
System.out.printf(exceptionPhase + "-" + throwable.getMessage());
}
},
new MessageHandlerOptions(1, false, Duration.ofSeconds(10)),
executorService
);
}
public static void sendMessages(QueueClient client) throws ServiceBusException, InterruptedException {
for (int i = 0; i < 100; i++) {
String messageId = Integer.toString(i);
Message message = new Message("This is message " + i);
message.setContentType("text/plain");
message.setLabel("TestMessage");
message.setMessageId(messageId);
message.setTimeToLive(Duration.ofMinutes(10));
client.send(message);
System.out.printf("Message sent: Id = %s \n", message.getMessageId());
}
}
public static void main(String[] args) throws Exception {
String connectionString = "your_connection_string, Endpoint=sb://j*9.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=V*=";
String queueName = "your_queue_name, testQueue";
QueueClient client = new QueueClient(new ConnectionStringBuilder(connectionString, queueName), ReceiveMode.PEEKLOCK);
sendMessages(client);
client.close();
QueueClient receiveClient = new QueueClient(new ConnectionStringBuilder(connectionString, queueName), ReceiveMode.PEEKLOCK);
ExecutorService executorService = Executors.newSingleThreadExecutor();
registerReceiver(receiveClient, executorService);
Thread.sleep(60 * 1000); // Wait for 60 seconds to receive all the messages.
receiveClient.close();
executorService.shutdown();
}
结果:
将发送 100 条消息。
Message sent: Id = 0
Message sent: Id = 1
Message sent: Id = 2
Message sent: Id = 3
*
*
*
Message sent: Id = 99
然后会开始接收消息。
Message received:
-->MessageId = 0
-->ContentType = text/plain
-->Content = This is message 0
Message received:
-->MessageId = 1
-->ContentType = text/plain
-->Content = This is message 1
Message received:
-->MessageId = 2
-->ContentType = text/plain
-->Content = This is message 2
*
*
*
Message received:
-->MessageId = 99
-->ContentType = text/plain
-->Content = This is message 99
对于所有流浪者,下面是工作Java代码,用于从 Azure SB 队列中获取待处理消息:-
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
import com.microsoft.windowsazure.services.servicebus.ServiceBusConfiguration;
import com.microsoft.windowsazure.services.servicebus.ServiceBusContract;
import com.microsoft.windowsazure.services.servicebus.ServiceBusService;
import com.microsoft.windowsazure.services.servicebus.models.BrokeredMessage;
import com.microsoft.windowsazure.services.servicebus.models.ReceiveMessageOptions;
import com.microsoft.windowsazure.services.servicebus.models.ReceiveMode;
import com.microsoft.windowsazure.services.servicebus.models.ReceiveQueueMessageResult;
public class Test1 {
//static StringWriter writer = new StringWriter();
public static void main(String...s) throws Exception{
com.microsoft.windowsazure.Configuration config = ServiceBusConfiguration.configureWithSASAuthentication("Your_NameSpace", "RootManageSharedAccessKey", "Mkf1H3g9qg0LrNEP1QbZ/EJKSARmJZQdOI6ek6obalI=", ".servicebus.windows.net");
ServiceBusContract service = ServiceBusService.create(config);
ReceiveMessageOptions opts = ReceiveMessageOptions.DEFAULT;
opts.setReceiveMode(ReceiveMode.PEEK_LOCK);
while(true)
{
ReceiveQueueMessageResult resultQM = service.receiveQueueMessage("Queue_Name", opts);
BrokeredMessage message = resultQM.getValue();
if (message != null && message.getMessageId() != null)
{
try
{
// IOUtils.copy(message.getBody(), writer, encoding);
Scanner s1 = new Scanner(message.getBody()).useDelimiter("\A");
String result = s1.hasNext() ? s1.next() : "";
//above will convert InputStream in String
System.out.println("Body: " + message.toString());
System.out.println("MainBody : " + result );
System.out.println("MessageID: " + message.getMessageId());
System.out.println("Custom Property: " +
message.getProperty("TestProperty"));
// Remove message from queue
System.out.println("Deleting this message.");
service.deleteMessage(message);
}
catch (Exception ex)
{
// Indicate a problem, unlock message in queue
System.out.println("Inner exception encountered!");
service.unlockMessage(message);
}
}
else
{
System.out.println("Finishing up - no more messages.");
break;
// Added to handle no more messages in the queue.
// Could instead wait for more messages to be added.
}
}
}
}
确保获得 "BrokeredMessage" 所需的 Maven 依赖项。
谢谢,
楼陀罗
我正在努力接收 Azure 服务总线队列中的待处理消息。
我在 Azure ServiceBus 中创建了一个队列 (SampleQueue),我能够使用我的 Java 程序生成的 SAS 令牌通过 POSTMAN 在该队列中成功发送消息。
在到达我的服务总线队列 api url 后,我也获得了 201 created 状态(下图)。
我想接收在我的服务总线队列中挂起的消息。我浏览了一些有关接收消息的链接 (https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-java-how-to-use-queues),但这不包含有关如何接收和查看这些消息的信息。
我的 Java 从服务总线队列接收消息的代码如下所示[我是 Java 的新手]:-
public class Test2 {
public static void main(String[] args) throws ServiceException {
String namespace = "SampleNamespace";
String sharedKeyName = "RootManageSharedAccessKey";
String sharedSecretKey = "t+U5ERMAnIyxgEUDUouGOKn6ADM/CuLWzEJZtauwVsc=";
String queueName = "QueueName";
// Azure Service Bus Service
com.microsoft.windowsazure.Configuration config = ServiceBusConfiguration.configureWithSASAuthentication(namespace, sharedKeyName, sharedSecretKey, ".servicebus.windows.net");
ServiceBusContract service = ServiceBusService.create(config);
// Receive and Delete Messages
ReceiveMessageOptions opts = ReceiveMessageOptions.DEFAULT;
opts.setReceiveMode(ReceiveMode.RECEIVE_AND_DELETE);
while (true) {
ReceiveQueueMessageResult resultQM = service.receiveQueueMessage(queueName , opts);
BrokeredMessage message = resultQM.getValue();
if (message != null && message.getMessageId() != null) {
System.out.println("Body: " + message.toString());
System.out.println("MessageID: " + message.getMessageId());
} else {
System.out.println("No more messages.");
break;
}
}
}
}
但是当我 运行 这段代码时,我得到以下错误:-
Exception in thread "main" java.lang.NoClassDefFoundError: javax/ws/rs/WebApplicationException
at java.lang.Class.getDeclaredConstructors0(Native Method)
at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
at java.lang.Class.getConstructors(Class.java:1651)
at com.microsoft.windowsazure.core.DefaultBuilder.findInjectConstructor(DefaultBuilder.java:67)
at com.microsoft.windowsazure.core.DefaultBuilder.add(DefaultBuilder.java:94)
at com.microsoft.windowsazure.services.servicebus.Exports.register(Exports.java:34)
at com.microsoft.windowsazure.core.DefaultBuilder.create(DefaultBuilder.java:46)
at com.microsoft.windowsazure.Configuration.<init>(Configuration.java:80)
at com.microsoft.windowsazure.Configuration.load(Configuration.java:100)
at com.microsoft.windowsazure.Configuration.getInstance(Configuration.java:90)
at com.microsoft.windowsazure.services.servicebus.ServiceBusConfiguration.configureWithSASAuthentication(ServiceBusConfiguration.java:252)
at com.rocky.servicebus.queue.Test2.main(Test2.java:24)
Caused by: java.lang.ClassNotFoundException: javax.ws.rs.WebApplicationException
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
任何人都可以帮助纠正我做错了什么吗? 如果有任何帮助,我们将不胜感激。
谢谢, 楼陀罗
在tutorial for receiving message的基础上,您需要创建一个队列客户端,并为其注册一个消息处理程序。
A) 获取连接字符串。
B) 发送和接收消息的代码示例
public static void registerReceiver(QueueClient queueClient, ExecutorService executorService) throws Exception {
queueClient.registerMessageHandler(
new IMessageHandler() {
public CompletableFuture<Void> onMessageAsync(IMessage message) {
if (message.getLabel() != null &&
message.getContentType() != null &&
message.getLabel().contentEquals("TestMessage") &&
message.getContentType().contentEquals("text/plain")) {
System.out.printf(
"\nMessage received: \n -->MessageId = %s\n -->ContentType = %s\n -->Content = %s\n",
message.getMessageId(),
message.getContentType(),
new String(message.getBody())
);
return queueClient.completeAsync(message.getLockToken());
}
return queueClient.abandonAsync(message.getLockToken());
}
public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) {
System.out.printf(exceptionPhase + "-" + throwable.getMessage());
}
},
new MessageHandlerOptions(1, false, Duration.ofSeconds(10)),
executorService
);
}
public static void sendMessages(QueueClient client) throws ServiceBusException, InterruptedException {
for (int i = 0; i < 100; i++) {
String messageId = Integer.toString(i);
Message message = new Message("This is message " + i);
message.setContentType("text/plain");
message.setLabel("TestMessage");
message.setMessageId(messageId);
message.setTimeToLive(Duration.ofMinutes(10));
client.send(message);
System.out.printf("Message sent: Id = %s \n", message.getMessageId());
}
}
public static void main(String[] args) throws Exception {
String connectionString = "your_connection_string, Endpoint=sb://j*9.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=V*=";
String queueName = "your_queue_name, testQueue";
QueueClient client = new QueueClient(new ConnectionStringBuilder(connectionString, queueName), ReceiveMode.PEEKLOCK);
sendMessages(client);
client.close();
QueueClient receiveClient = new QueueClient(new ConnectionStringBuilder(connectionString, queueName), ReceiveMode.PEEKLOCK);
ExecutorService executorService = Executors.newSingleThreadExecutor();
registerReceiver(receiveClient, executorService);
Thread.sleep(60 * 1000); // Wait for 60 seconds to receive all the messages.
receiveClient.close();
executorService.shutdown();
}
结果:
将发送 100 条消息。
Message sent: Id = 0
Message sent: Id = 1
Message sent: Id = 2
Message sent: Id = 3
*
*
*
Message sent: Id = 99
然后会开始接收消息。
Message received:
-->MessageId = 0
-->ContentType = text/plain
-->Content = This is message 0
Message received:
-->MessageId = 1
-->ContentType = text/plain
-->Content = This is message 1
Message received:
-->MessageId = 2
-->ContentType = text/plain
-->Content = This is message 2
*
*
*
Message received:
-->MessageId = 99
-->ContentType = text/plain
-->Content = This is message 99
对于所有流浪者,下面是工作Java代码,用于从 Azure SB 队列中获取待处理消息:-
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
import com.microsoft.windowsazure.services.servicebus.ServiceBusConfiguration;
import com.microsoft.windowsazure.services.servicebus.ServiceBusContract;
import com.microsoft.windowsazure.services.servicebus.ServiceBusService;
import com.microsoft.windowsazure.services.servicebus.models.BrokeredMessage;
import com.microsoft.windowsazure.services.servicebus.models.ReceiveMessageOptions;
import com.microsoft.windowsazure.services.servicebus.models.ReceiveMode;
import com.microsoft.windowsazure.services.servicebus.models.ReceiveQueueMessageResult;
public class Test1 {
//static StringWriter writer = new StringWriter();
public static void main(String...s) throws Exception{
com.microsoft.windowsazure.Configuration config = ServiceBusConfiguration.configureWithSASAuthentication("Your_NameSpace", "RootManageSharedAccessKey", "Mkf1H3g9qg0LrNEP1QbZ/EJKSARmJZQdOI6ek6obalI=", ".servicebus.windows.net");
ServiceBusContract service = ServiceBusService.create(config);
ReceiveMessageOptions opts = ReceiveMessageOptions.DEFAULT;
opts.setReceiveMode(ReceiveMode.PEEK_LOCK);
while(true)
{
ReceiveQueueMessageResult resultQM = service.receiveQueueMessage("Queue_Name", opts);
BrokeredMessage message = resultQM.getValue();
if (message != null && message.getMessageId() != null)
{
try
{
// IOUtils.copy(message.getBody(), writer, encoding);
Scanner s1 = new Scanner(message.getBody()).useDelimiter("\A");
String result = s1.hasNext() ? s1.next() : "";
//above will convert InputStream in String
System.out.println("Body: " + message.toString());
System.out.println("MainBody : " + result );
System.out.println("MessageID: " + message.getMessageId());
System.out.println("Custom Property: " +
message.getProperty("TestProperty"));
// Remove message from queue
System.out.println("Deleting this message.");
service.deleteMessage(message);
}
catch (Exception ex)
{
// Indicate a problem, unlock message in queue
System.out.println("Inner exception encountered!");
service.unlockMessage(message);
}
}
else
{
System.out.println("Finishing up - no more messages.");
break;
// Added to handle no more messages in the queue.
// Could instead wait for more messages to be added.
}
}
}
}
确保获得 "BrokeredMessage" 所需的 Maven 依赖项。
谢谢, 楼陀罗