向 Azure 事件中心发送消息
Sending Message to Azure Event Hub
我正在尝试按照 here 所述连接到 Azure 事件中心。但出现以下错误:
Exception in thread "main" java.lang.NoSuchMethodError: reactor.core.publisher.Flux.retryWhen(Ljava/util/function/Function;)Lreactor/core/publisher/Flux;
at com.azure.core.amqp.implementation.RetryUtil.withRetry(RetryUtil.java:58)
at com.azure.core.amqp.implementation.ReactorConnection.getClaimsBasedSecurityNode(ReactorConnection.java:142)
at com.azure.messaging.eventhubs.implementation.EventHubReactorAmqpConnection.createSession(EventHubReactorAmqpConnection.java:148)
at com.azure.core.amqp.implementation.ReactorConnection.lambda$createSession(ReactorConnection.java:203)
at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
at com.azure.core.amqp.implementation.ReactorConnection.lambda$createSession(ReactorConnection.java:197)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
at reactor.core.publisher.MonoCallable.subscribe(MonoCallable.java:61)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
at com.azure.core.amqp.implementation.AmqpChannelProcessor$ChannelSubscriber.onNext(AmqpChannelProcessor.java:310)
at com.azure.core.amqp.implementation.AmqpChannelProcessor.lambda$onNext[=10=](AmqpChannelProcessor.java:87)
at java.lang.Iterable.forEach(Iterable.java:75)
at com.azure.core.amqp.implementation.AmqpChannelProcessor.onNext(AmqpChannelProcessor.java:87)
at reactor.core.publisher.FluxRepeatPredicate$RepeatPredicateSubscriber.onNext(FluxRepeatPredicate.java:85)
at reactor.core.publisher.Operators$MonoSubscriber.request(Operators.java:1871)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2118)
at com.azure.core.amqp.implementation.AmqpChannelProcessor.requestUpstream(AmqpChannelProcessor.java:257)
at com.azure.core.amqp.implementation.AmqpChannelProcessor.subscribe(AmqpChannelProcessor.java:210)
at reactor.core.publisher.Mono.subscribe(Mono.java:4046)
at reactor.core.publisher.Mono.block(Mono.java:1726)
at com.azure.messaging.eventhubs.EventHubProducerClient.createBatch(EventHubProducerClient.java:127)
at com.sample.eventhub.eventhub.EventhubDemoApplication.main(EventhubDemoApplication.java:63)
抛出错误的行(EventhubDemoApplication.java:63)有“EventDataBatch batch = producer.createBatch();”
我的Pom.xml如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.1</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<groupId>com.sample.eventhub</groupId>
<artifactId>eventhub</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>eventhub-demo</name>
<description>Demo project for eevnt hub</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.0.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
尝试更改 Spring 引导版本 (2.3.7),然后出现以下错误:
Exception in thread "restartedMain" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49)
Caused by: com.azure.core.amqp.exception.AmqpException: The messaging entity '************************' could not be found. To know more visit https://aka.ms/sbResourceMgrExceptions. TrackingId:"*********", SystemTracker:**********************, Timestamp:2021-01-11T09:40:57, errorContext[NAMESPACE: ******************, PATH: $cbs, REFERENCE_ID: cbs:receiver, LINK_CREDIT: 0]
at com.azure.core.amqp.implementation.ExceptionUtil.distinguishNotFound(ExceptionUtil.java:114)
at com.azure.core.amqp.implementation.ExceptionUtil.amqpResponseCodeToException(ExceptionUtil.java:101)
at com.azure.core.amqp.implementation.RequestResponseChannel.settleMessage(RequestResponseChannel.java:274)
at com.azure.core.amqp.implementation.RequestResponseChannel.lambda$new[=12=](RequestResponseChannel.java:124)
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:793)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:718)
at reactor.core.publisher.FluxCreate$SerializedSink.next(FluxCreate.java:153)
at com.azure.core.amqp.implementation.handler.ReceiveLinkHandler.onDelivery(ReceiveLinkHandler.java:95)
at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:185)
at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291)
at com.azure.core.amqp.implementation.ReactorExecutor.run(ReactorExecutor.java:82)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:139)
at reactor.core.publisher.Mono.block(Mono.java:1709)
at com.azure.messaging.eventhubs.EventHubProducerClient.createBatch(EventHubProducerClient.java:127)
at com.example.Sender.SenderApplication.main(SenderApplication.java:30)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49)
尝试查看答案 ,但没有成功。
另外,我只尝试发送微软官方文档中提到的部分。这是导致问题的原因吗?
在这方面的任何输入都会很棒。
NoSuchMethodError
是冲突依赖项的结果,它解决了一个与 azure-messaging-eventhubs 中使用的版本不匹配的项目反应器版本。问题是它正在解析“reactor-core”版本 3.4.1
,其中 azure-messaging-eventhubs 使用版本 3.3.0.RELEASE
。在这些版本之间,Flux.retryWhen
的方法更改为使用 Retry
class 而不是函数和发布者。
你可以看到这个依赖冲突,如果你执行:
.\mvnw.cmd org.apache.maven.plugins:maven-dependency-plugin:2.10:tree -Dverbose=true -Dincludes=*reactor*
一种解决方案是将您的依赖项升级到 5.3.1。
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.3.1</version>
</dependency>
或者,您可以将 spring-boot 的版本降级到适用于 reactor-core 3.3 的版本。0.RELEASE(如果您想继续使用 azure-messaging-eventhubs 5.0.1 ).或者在你的 pom.xml 中明确地 select 一个版本的 reactor-core... 不过,你可能会遇到其他 NoSuchMethodError
s.
谢谢@Connie 的回复。
发现使用了旧版事件中心。按照 this Microsoft 文档帮助我连接到事件中心。
我正在尝试按照 here 所述连接到 Azure 事件中心。但出现以下错误:
Exception in thread "main" java.lang.NoSuchMethodError: reactor.core.publisher.Flux.retryWhen(Ljava/util/function/Function;)Lreactor/core/publisher/Flux;
at com.azure.core.amqp.implementation.RetryUtil.withRetry(RetryUtil.java:58)
at com.azure.core.amqp.implementation.ReactorConnection.getClaimsBasedSecurityNode(ReactorConnection.java:142)
at com.azure.messaging.eventhubs.implementation.EventHubReactorAmqpConnection.createSession(EventHubReactorAmqpConnection.java:148)
at com.azure.core.amqp.implementation.ReactorConnection.lambda$createSession(ReactorConnection.java:203)
at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
at com.azure.core.amqp.implementation.ReactorConnection.lambda$createSession(ReactorConnection.java:197)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
at reactor.core.publisher.MonoCallable.subscribe(MonoCallable.java:61)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784)
at com.azure.core.amqp.implementation.AmqpChannelProcessor$ChannelSubscriber.onNext(AmqpChannelProcessor.java:310)
at com.azure.core.amqp.implementation.AmqpChannelProcessor.lambda$onNext[=10=](AmqpChannelProcessor.java:87)
at java.lang.Iterable.forEach(Iterable.java:75)
at com.azure.core.amqp.implementation.AmqpChannelProcessor.onNext(AmqpChannelProcessor.java:87)
at reactor.core.publisher.FluxRepeatPredicate$RepeatPredicateSubscriber.onNext(FluxRepeatPredicate.java:85)
at reactor.core.publisher.Operators$MonoSubscriber.request(Operators.java:1871)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2118)
at com.azure.core.amqp.implementation.AmqpChannelProcessor.requestUpstream(AmqpChannelProcessor.java:257)
at com.azure.core.amqp.implementation.AmqpChannelProcessor.subscribe(AmqpChannelProcessor.java:210)
at reactor.core.publisher.Mono.subscribe(Mono.java:4046)
at reactor.core.publisher.Mono.block(Mono.java:1726)
at com.azure.messaging.eventhubs.EventHubProducerClient.createBatch(EventHubProducerClient.java:127)
at com.sample.eventhub.eventhub.EventhubDemoApplication.main(EventhubDemoApplication.java:63)
抛出错误的行(EventhubDemoApplication.java:63)有“EventDataBatch batch = producer.createBatch();”
我的Pom.xml如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.1</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<groupId>com.sample.eventhub</groupId>
<artifactId>eventhub</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>eventhub-demo</name>
<description>Demo project for eevnt hub</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.0.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
尝试更改 Spring 引导版本 (2.3.7),然后出现以下错误:
Exception in thread "restartedMain" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49)
Caused by: com.azure.core.amqp.exception.AmqpException: The messaging entity '************************' could not be found. To know more visit https://aka.ms/sbResourceMgrExceptions. TrackingId:"*********", SystemTracker:**********************, Timestamp:2021-01-11T09:40:57, errorContext[NAMESPACE: ******************, PATH: $cbs, REFERENCE_ID: cbs:receiver, LINK_CREDIT: 0]
at com.azure.core.amqp.implementation.ExceptionUtil.distinguishNotFound(ExceptionUtil.java:114)
at com.azure.core.amqp.implementation.ExceptionUtil.amqpResponseCodeToException(ExceptionUtil.java:101)
at com.azure.core.amqp.implementation.RequestResponseChannel.settleMessage(RequestResponseChannel.java:274)
at com.azure.core.amqp.implementation.RequestResponseChannel.lambda$new[=12=](RequestResponseChannel.java:124)
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:793)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:718)
at reactor.core.publisher.FluxCreate$SerializedSink.next(FluxCreate.java:153)
at com.azure.core.amqp.implementation.handler.ReceiveLinkHandler.onDelivery(ReceiveLinkHandler.java:95)
at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:185)
at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291)
at com.azure.core.amqp.implementation.ReactorExecutor.run(ReactorExecutor.java:82)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:139)
at reactor.core.publisher.Mono.block(Mono.java:1709)
at com.azure.messaging.eventhubs.EventHubProducerClient.createBatch(EventHubProducerClient.java:127)
at com.example.Sender.SenderApplication.main(SenderApplication.java:30)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49)
尝试查看答案
另外,我只尝试发送微软官方文档中提到的部分。这是导致问题的原因吗? 在这方面的任何输入都会很棒。
NoSuchMethodError
是冲突依赖项的结果,它解决了一个与 azure-messaging-eventhubs 中使用的版本不匹配的项目反应器版本。问题是它正在解析“reactor-core”版本 3.4.1
,其中 azure-messaging-eventhubs 使用版本 3.3.0.RELEASE
。在这些版本之间,Flux.retryWhen
的方法更改为使用 Retry
class 而不是函数和发布者。
你可以看到这个依赖冲突,如果你执行:
.\mvnw.cmd org.apache.maven.plugins:maven-dependency-plugin:2.10:tree -Dverbose=true -Dincludes=*reactor*
一种解决方案是将您的依赖项升级到 5.3.1。
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.3.1</version>
</dependency>
或者,您可以将 spring-boot 的版本降级到适用于 reactor-core 3.3 的版本。0.RELEASE(如果您想继续使用 azure-messaging-eventhubs 5.0.1 ).或者在你的 pom.xml 中明确地 select 一个版本的 reactor-core... 不过,你可能会遇到其他 NoSuchMethodError
s.
谢谢@Connie 的回复。
发现使用了旧版事件中心。按照 this Microsoft 文档帮助我连接到事件中心。