IllegalStateException: 不能 Subscrie.Processor 已经终止
IllegalStateException: Cannot Subscrie.Processor is already terminated
我创建了一个新的 eventhub 并尝试将消息发布到 eventHubA。当我尝试向 eventhub 发送消息时,出现以下错误:
java.lang.IllegalStateException: namespace[xxxxx] entityPath[xxxxx]:
Cannot subscribe. Processor is already terminated at
com.azure.core.amqp.implementation.AmqpChannelProcessor.subscribe(AmqpChannelProcessor.java:217)
下面是我正在使用的代码片段:
public void send(Response response) {
String responseInString = JsonHandlingUtil.objectToJsonString(response);
EventData eventData = new EventData(responseInString);
// create a batch
EventDataBatch eventDataBatch = producer.createBatch();
// try to add the event from the array to the batch
if (!eventDataBatch.tryAdd(eventData)) {
// if the batch is full, send it and then create a new batch
producer.send(eventDataBatch);
eventDataBatch = producer.createBatch();
// Try to add that event that couldn't fit before.
if (!eventDataBatch.tryAdd(eventData)) {
throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
+ eventDataBatch.getMaxSizeInBytes());
}
}
// send the last batch of remaining events
if (eventDataBatch.getCount() > 0) {
producer.send(eventDataBatch);
}
producer.close();
}
我已经将 eventhubProducerClient 定义为一个 Bean。
@Bean
public EventHubProducerClient eventHubProducerClient() {
return new EventHubClientBuilder()
.transportType(AmqpTransportType.AMQP_WEB_SOCKETS)
.connectionString(connectionString, eventHubName)
.buildProducerClient();
}
下面是我的 gradle 依赖项
> //eventhub
> implementation 'com.azure:azure-messaging-eventhubs:5.7.0'
> implementation group: 'io.projectreactor', name: 'reactor-core', version: '3.4.6'
从 follow-up 问题看来,根本原因似乎已被确认为 send
方法中的 producer.close()
调用。
由于应用程序似乎将生产者作为单例进行管理,因此缓解措施是在事件不再发布时调用 close
,例如当应用程序正在关闭。
我创建了一个新的 eventhub 并尝试将消息发布到 eventHubA。当我尝试向 eventhub 发送消息时,出现以下错误:
java.lang.IllegalStateException: namespace[xxxxx] entityPath[xxxxx]: Cannot subscribe. Processor is already terminated at com.azure.core.amqp.implementation.AmqpChannelProcessor.subscribe(AmqpChannelProcessor.java:217)
下面是我正在使用的代码片段:
public void send(Response response) {
String responseInString = JsonHandlingUtil.objectToJsonString(response);
EventData eventData = new EventData(responseInString);
// create a batch
EventDataBatch eventDataBatch = producer.createBatch();
// try to add the event from the array to the batch
if (!eventDataBatch.tryAdd(eventData)) {
// if the batch is full, send it and then create a new batch
producer.send(eventDataBatch);
eventDataBatch = producer.createBatch();
// Try to add that event that couldn't fit before.
if (!eventDataBatch.tryAdd(eventData)) {
throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
+ eventDataBatch.getMaxSizeInBytes());
}
}
// send the last batch of remaining events
if (eventDataBatch.getCount() > 0) {
producer.send(eventDataBatch);
}
producer.close();
}
我已经将 eventhubProducerClient 定义为一个 Bean。
@Bean
public EventHubProducerClient eventHubProducerClient() {
return new EventHubClientBuilder()
.transportType(AmqpTransportType.AMQP_WEB_SOCKETS)
.connectionString(connectionString, eventHubName)
.buildProducerClient();
}
下面是我的 gradle 依赖项
> //eventhub
> implementation 'com.azure:azure-messaging-eventhubs:5.7.0'
> implementation group: 'io.projectreactor', name: 'reactor-core', version: '3.4.6'
从 follow-up 问题看来,根本原因似乎已被确认为 send
方法中的 producer.close()
调用。
由于应用程序似乎将生产者作为单例进行管理,因此缓解措施是在事件不再发布时调用 close
,例如当应用程序正在关闭。