GemFire CacheWriter 中的并发线程
Concurrent threads in GemFire CacheWriter
我们目前使用 Cassandra 作为 NoSQL 数据库,使用 GemFire 作为内存数据库。我们一直在使用 GemFire CacheWriter 在 Cassandra 中插入记录。关于在 CacheWriter 中使用并发线程到 insert/Update 记录是否是一个好的工程实践,我希望得到您的反馈。您对此的反馈将不胜感激。
public class GenericWriter<K, V> extends CacheWriterAdapter<K, V> implements Declarable {
private static Logger log = LoggerFactory.getLogger(GenericWriter.class);
@Autowired
private CassandraOperations cassandraOperations;
ExecutorService executor = null;
@Override
public void beforeCreate(EntryEvent<K, V> e) {
executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
if (eventOperation.equals("CREATE") || eventOperation.equalsIgnoreCase("PUTALL_CREATE")) {
try {
cassandraOperations.insert(e.getNewValue());
} catch (CassandraConnectionFailureException | CassandraWriteTimeoutException
| CassandraInternalException cassException) {
} catch (Exception ex) {
log.error("Exception in GenericCacheWriter->" + ExceptionUtils.getStackTrace(ex));
throw ex;
}
}
});
executor.shutdown();
}
@Override
public void init(Properties arg0) {
// TODO Auto-generated method stub
}
}
CacheWriter
处理程序被同步调用,因此应用程序不会继续,直到处理程序 returns。因此,不建议在该监听器内部执行 long-运行 操作。如果需要 long-运行 操作,请考虑通过 AsyncEventListener
异步处理操作。
使用 ExecutorService
将执行委托给不同的线程是可能的,但它是一种反模式,因为它不再实现快速失败 属性,并且处理事件不再是同步的,因此相对于应用程序完成事件的时间无法保证。
您可以在 Geode Wiki 中阅读有关此主题的更多信息,特别是在 CacheWrite and CacheListener Best Practices。
希望这对您有所帮助。
此致。
是的,这是一个很好的模式,但删除执行器并对数据进行分区,以便对 GemFire 的所有更新都转到一个且仅一个节点。以相同的方式对 Cassandra 进行分区。在 Cassandra 更新周围放置一个写锁。仅当您的吞吐量较低时才使用它。
如果您需要高吞吐量,请使用 AsyncEventListener 并保证对您的用户的最终一致性。如果您必须在 AEL 中使用执行器,请以在主线程中抛出异常的方式使用它们。如果在多次尝试后更新失败,您将失败的条目写入另一个区域,并在几秒或一分钟后到期。到期后,重试该操作。继续这样做直到成功,然后并且只有这样,删除过期的条目。
无论更新顺序对您是否重要,您都需要跟踪版本号和正在更新的内容,查看旧值/新值。
我们目前使用 Cassandra 作为 NoSQL 数据库,使用 GemFire 作为内存数据库。我们一直在使用 GemFire CacheWriter 在 Cassandra 中插入记录。关于在 CacheWriter 中使用并发线程到 insert/Update 记录是否是一个好的工程实践,我希望得到您的反馈。您对此的反馈将不胜感激。
public class GenericWriter<K, V> extends CacheWriterAdapter<K, V> implements Declarable {
private static Logger log = LoggerFactory.getLogger(GenericWriter.class);
@Autowired
private CassandraOperations cassandraOperations;
ExecutorService executor = null;
@Override
public void beforeCreate(EntryEvent<K, V> e) {
executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
if (eventOperation.equals("CREATE") || eventOperation.equalsIgnoreCase("PUTALL_CREATE")) {
try {
cassandraOperations.insert(e.getNewValue());
} catch (CassandraConnectionFailureException | CassandraWriteTimeoutException
| CassandraInternalException cassException) {
} catch (Exception ex) {
log.error("Exception in GenericCacheWriter->" + ExceptionUtils.getStackTrace(ex));
throw ex;
}
}
});
executor.shutdown();
}
@Override
public void init(Properties arg0) {
// TODO Auto-generated method stub
}
}
CacheWriter
处理程序被同步调用,因此应用程序不会继续,直到处理程序 returns。因此,不建议在该监听器内部执行 long-运行 操作。如果需要 long-运行 操作,请考虑通过 AsyncEventListener
异步处理操作。
使用 ExecutorService
将执行委托给不同的线程是可能的,但它是一种反模式,因为它不再实现快速失败 属性,并且处理事件不再是同步的,因此相对于应用程序完成事件的时间无法保证。
您可以在 Geode Wiki 中阅读有关此主题的更多信息,特别是在 CacheWrite and CacheListener Best Practices。
希望这对您有所帮助。
此致。
是的,这是一个很好的模式,但删除执行器并对数据进行分区,以便对 GemFire 的所有更新都转到一个且仅一个节点。以相同的方式对 Cassandra 进行分区。在 Cassandra 更新周围放置一个写锁。仅当您的吞吐量较低时才使用它。
如果您需要高吞吐量,请使用 AsyncEventListener 并保证对您的用户的最终一致性。如果您必须在 AEL 中使用执行器,请以在主线程中抛出异常的方式使用它们。如果在多次尝试后更新失败,您将失败的条目写入另一个区域,并在几秒或一分钟后到期。到期后,重试该操作。继续这样做直到成功,然后并且只有这样,删除过期的条目。
无论更新顺序对您是否重要,您都需要跟踪版本号和正在更新的内容,查看旧值/新值。