Java SDK 中的 EventHub 'send' 挂起,从未发生发送
EventHub 'send' in Java SDK hangs, send never occurs
我正在尝试使用示例代码将简单事件发送到 Azure EventHub (https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-java-get-started-send)。看起来很顺利,我都配置好了,但是当我到达:
ehClient.sendSync(sendEvent);
部分代码,它只是挂在那里,永远不会通过 sendAsync 方法。我使用的是个人电脑,没有防火墙 运行。我是否必须在 Azure 中进行一些网络配置才能允许进行这种简单的发送?有人有运气使这项工作成功吗?
final ConnectionStringBuilder connStr = new ConnectionStringBuilder()
.setNamespaceName("mynamespace")
.setEventHubName("myeventhubname")
.setSasKeyName("mysaskename")
.setSasKey("mysaskey");
final Gson gson = new GsonBuilder().create();
final ExecutorService executorService = Executors.newSingleThreadExecutor();
final EventHubClient ehClient = EventHubClient.createSync(connStr.toString(), executorService);
print("Event Hub Client Created");
try {
for (int i = 0; i < 100; i++) {
String payload = "Message " + Integer.toString(i);
byte[] payloadBytes = gson.toJson(payload).getBytes(Charset.defaultCharset());
EventData sendEvent = EventData.create(payloadBytes);
// HANGS HERE - NEVER GETS PAST THIS CALL
ehClient.sendSync(sendEvent);
}
} finally {
ehClient.closeSync();
executorService.shutdown();
}
尝试使用不同的执行器服务。例如'work-stealing thread'
final ExecutorService executorService = Executors.newWorkStealingPool();
下面的代码应该适合你。
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-spark_2.11</artifactId>
<version>2.3.7</version>
</dependency>
import java.util.concurrent.{Executors, ScheduledExecutorService}
import com.google.gson.Gson
import com.microsoft.azure.eventhubs.{EventData, EventHubClient}
object callToPushMessage{
private var executorService : ScheduledExecutorService = null
def writeMsgToSink(message: PushMessage):Unit={
val connStr = ConnectionStringBuilder()
.setNamespaceName("namespace")
.setEventHubName("name")
.setSasKeyName("policyname")
.setSasKey("policykey").build
// The Executor handles all asynchronous tasks and this is passed to the EventHubClient instance.
// This enables the user to segregate their thread pool based on the work load.
// This pool can then be shared across multiple EventHubClient instances.
// The following code uses a single thread executor, as there is only one EventHubClient instance,
// handling different flavors of ingestion to Event Hubs here.
if (executorService == null) {
executorService = Executors.newSingleThreadScheduledExecutor()
}
val ehclient = EventHubClient.createSync(connStr,executorService)
try {
val jsonMessage = new Gson().toJson(message,classOf[PushMessage])
val eventData: EventData = EventData.create(jsonMessage.getBytes())
ehclient.sendSync(eventData)
}
finally {
ehclient.close()
executorService.shutdown()
}
}}
我正在尝试使用示例代码将简单事件发送到 Azure EventHub (https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-java-get-started-send)。看起来很顺利,我都配置好了,但是当我到达:
ehClient.sendSync(sendEvent);
部分代码,它只是挂在那里,永远不会通过 sendAsync 方法。我使用的是个人电脑,没有防火墙 运行。我是否必须在 Azure 中进行一些网络配置才能允许进行这种简单的发送?有人有运气使这项工作成功吗?
final ConnectionStringBuilder connStr = new ConnectionStringBuilder()
.setNamespaceName("mynamespace")
.setEventHubName("myeventhubname")
.setSasKeyName("mysaskename")
.setSasKey("mysaskey");
final Gson gson = new GsonBuilder().create();
final ExecutorService executorService = Executors.newSingleThreadExecutor();
final EventHubClient ehClient = EventHubClient.createSync(connStr.toString(), executorService);
print("Event Hub Client Created");
try {
for (int i = 0; i < 100; i++) {
String payload = "Message " + Integer.toString(i);
byte[] payloadBytes = gson.toJson(payload).getBytes(Charset.defaultCharset());
EventData sendEvent = EventData.create(payloadBytes);
// HANGS HERE - NEVER GETS PAST THIS CALL
ehClient.sendSync(sendEvent);
}
} finally {
ehClient.closeSync();
executorService.shutdown();
}
尝试使用不同的执行器服务。例如'work-stealing thread'
final ExecutorService executorService = Executors.newWorkStealingPool();
下面的代码应该适合你。
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-spark_2.11</artifactId>
<version>2.3.7</version>
</dependency>
import java.util.concurrent.{Executors, ScheduledExecutorService}
import com.google.gson.Gson
import com.microsoft.azure.eventhubs.{EventData, EventHubClient}
object callToPushMessage{
private var executorService : ScheduledExecutorService = null
def writeMsgToSink(message: PushMessage):Unit={
val connStr = ConnectionStringBuilder()
.setNamespaceName("namespace")
.setEventHubName("name")
.setSasKeyName("policyname")
.setSasKey("policykey").build
// The Executor handles all asynchronous tasks and this is passed to the EventHubClient instance.
// This enables the user to segregate their thread pool based on the work load.
// This pool can then be shared across multiple EventHubClient instances.
// The following code uses a single thread executor, as there is only one EventHubClient instance,
// handling different flavors of ingestion to Event Hubs here.
if (executorService == null) {
executorService = Executors.newSingleThreadScheduledExecutor()
}
val ehclient = EventHubClient.createSync(connStr,executorService)
try {
val jsonMessage = new Gson().toJson(message,classOf[PushMessage])
val eventData: EventData = EventData.create(jsonMessage.getBytes())
ehclient.sendSync(eventData)
}
finally {
ehclient.close()
executorService.shutdown()
}
}}