publish/subscribe 在 Java 中使用 Azure Cosmos DB 的示例
Sample for publish/subscribe with Azure Cosmos DB in Java
我需要一个带有 Azure Cosmos DB 的 pub/sub 事件消息系统。我使用 Azure Cosmos DB Java SDK v4.
我尝试使用基于此示例的 ChangeFeedProcessor https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples/blob/main/src/main/java/com/azure/cosmos/examples/changefeed/SampleChangeFeedProcessor.java,但它没有像预期的那样工作。
我的问题:
- 饲料 collection/container 继续成长。如何在所有活动节点都收到事件后删除事件?
- 活动的延迟似乎比较大。大约一分钟。
- 只有一个节点收到事件。这对于负载平衡来说似乎很有趣,但这不是我的用例。
使用 Java SDK 的 4.12.0 版,以下代码片段适用于我。但它使用来自驱动程序的 beta 代码。它可以在未来改变。
private static final String CHANNEL = "events";
private CosmosContainer collection;
private boolean stopped;
void start( String clientID ) {
CosmosContainerProperties props = new CosmosContainerProperties( CHANNEL, "/type" );
// delete all events after 60 seconds. All nodes should receive it in the meantime.
props.setDefaultTimeToLiveInSeconds( 60 );
collection = getOrCreateContainer( props );
Thread thread = new Thread( () -> {
String[] continuation = new String[1];
try {
while( !stopped ) {
// sample code: https://github.com/Azure/azure-sdk-for-java/blob/master/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java
CosmosChangeFeedRequestOptions options = continuation[0] == null ? //
CosmosChangeFeedRequestOptions.createForProcessingFromNow( FeedRange.forFullRange() ) : // initial value
CosmosChangeFeedRequestOptions.createForProcessingFromContinuation( continuation[0] ); // continue value
Iterator<EventPOJO> it = collection //
.queryChangeFeed( options, EventPOJO.class ) //
.handle( ( response ) -> continuation[0] = response.getContinuationToken() ) //
.iterator();
while( it.hasNext() ) {
EventPOJO event = it.next();
if( event.client != clientID ) {
// filter the own events
onMessage( event );
}
}
// poll interval
Thread.sleep( 1000 );
}
} catch( Throwable th ) {
if( !stopped ) {
PersistenceLogger.LOGGER.error( th );
}
}
}, CHANNEL );
thread.setDaemon( true );
thread.start();
}
<T> void send( T event, String clientID ) {
EventPOJO evt = new EventPOJO();
evt.id = ...
evt.client = clientID;
evt.type = event.getClass().getName();
evt.message = ...
collection.createItem( evt );
}
我需要一个带有 Azure Cosmos DB 的 pub/sub 事件消息系统。我使用 Azure Cosmos DB Java SDK v4.
我尝试使用基于此示例的 ChangeFeedProcessor https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples/blob/main/src/main/java/com/azure/cosmos/examples/changefeed/SampleChangeFeedProcessor.java,但它没有像预期的那样工作。
我的问题:
- 饲料 collection/container 继续成长。如何在所有活动节点都收到事件后删除事件?
- 活动的延迟似乎比较大。大约一分钟。
- 只有一个节点收到事件。这对于负载平衡来说似乎很有趣,但这不是我的用例。
使用 Java SDK 的 4.12.0 版,以下代码片段适用于我。但它使用来自驱动程序的 beta 代码。它可以在未来改变。
private static final String CHANNEL = "events";
private CosmosContainer collection;
private boolean stopped;
void start( String clientID ) {
CosmosContainerProperties props = new CosmosContainerProperties( CHANNEL, "/type" );
// delete all events after 60 seconds. All nodes should receive it in the meantime.
props.setDefaultTimeToLiveInSeconds( 60 );
collection = getOrCreateContainer( props );
Thread thread = new Thread( () -> {
String[] continuation = new String[1];
try {
while( !stopped ) {
// sample code: https://github.com/Azure/azure-sdk-for-java/blob/master/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java
CosmosChangeFeedRequestOptions options = continuation[0] == null ? //
CosmosChangeFeedRequestOptions.createForProcessingFromNow( FeedRange.forFullRange() ) : // initial value
CosmosChangeFeedRequestOptions.createForProcessingFromContinuation( continuation[0] ); // continue value
Iterator<EventPOJO> it = collection //
.queryChangeFeed( options, EventPOJO.class ) //
.handle( ( response ) -> continuation[0] = response.getContinuationToken() ) //
.iterator();
while( it.hasNext() ) {
EventPOJO event = it.next();
if( event.client != clientID ) {
// filter the own events
onMessage( event );
}
}
// poll interval
Thread.sleep( 1000 );
}
} catch( Throwable th ) {
if( !stopped ) {
PersistenceLogger.LOGGER.error( th );
}
}
}, CHANNEL );
thread.setDaemon( true );
thread.start();
}
<T> void send( T event, String clientID ) {
EventPOJO evt = new EventPOJO();
evt.id = ...
evt.client = clientID;
evt.type = event.getClass().getName();
evt.message = ...
collection.createItem( evt );
}