GemFire CacheWriter 中的并发线程

Concurrent threads in GemFire CacheWriter

我们目前使用 Cassandra 作为 NoSQL 数据库,使用 GemFire 作为内存数据库。我们一直在使用 GemFire CacheWriter 在 Cassandra 中插入记录。关于在 CacheWriter 中使用并发线程到 insert/Update 记录是否是一个好的工程实践,我希望得到您的反馈。您对此的反馈将不胜感激。

public class GenericWriter<K, V> extends CacheWriterAdapter<K, V> implements Declarable {

    private static Logger log = LoggerFactory.getLogger(GenericWriter.class);

    @Autowired
    private CassandraOperations cassandraOperations;

    ExecutorService executor = null;

    @Override
    public void beforeCreate(EntryEvent<K, V> e) {

    executor = Executors.newSingleThreadExecutor();

        executor.submit(() -> {
            if (eventOperation.equals("CREATE") || eventOperation.equalsIgnoreCase("PUTALL_CREATE")) {
                try {
                    cassandraOperations.insert(e.getNewValue());
                } catch (CassandraConnectionFailureException | CassandraWriteTimeoutException
                        | CassandraInternalException cassException) {
                } catch (Exception ex) {
                    log.error("Exception in GenericCacheWriter->" + ExceptionUtils.getStackTrace(ex));
                    throw ex;
                }
            }
        });
        executor.shutdown();
    }

    @Override
    public void init(Properties arg0) {
        // TODO Auto-generated method stub

    }
}

CacheWriter 处理程序被同步调用,因此应用程序不会继续,直到处理程序 returns。因此,不建议在该监听器内部执行 long-运行 操作。如果需要 long-运行 操作,请考虑通过 AsyncEventListener 异步处理操作。

使用 ExecutorService 将执行委托给不同的线程是可能的,但它是一种反模式,因为它不再实现快速失败 属性,并且处理事件不再是同步的,因此相对于应用程序完成事件的时间无法保证。

您可以在 Geode Wiki 中阅读有关此主题的更多信息,特别是在 CacheWrite and CacheListener Best Practices

希望这对您有所帮助。

此致。

是的,这是一个很好的模式,但删除执行器并对数据进行分区,以便对 GemFire 的所有更新都转到一个且仅一个节点。以相同的方式对 Cassandra 进行分区。在 Cassandra 更新周围放置一个写锁。仅当您的吞吐量较低时才使用它。

如果您需要高吞吐量,请使用 AsyncEventListener 并保证对您的用户的最终一致性。如果您必须在 AEL 中使用执行器,请以在主线程中抛出异常的方式使用它们。如果在多次尝试后更新失败,您将失败的条目写入另一个区域,并在几秒或一分钟后到期。到期后,重试该操作。继续这样做直到成功,然后并且只有这样,删除过期的条目。

无论更新顺序对您是否重要,您都需要跟踪版本号和正在更新的内容,查看旧值/新值。