Apache Ignite 缓存存储 + HikariCP 数据源
Apache Ignite Cache Store + HikariCP DataSource
我正在尝试使用 PostgreSQL 作为外部存储设置 Apache Ignite 缓存存储。
public class MyCacheStore extends CacheStoreAdapter<String, MyCache> {
private static final String GET_QUERY= "SELECT * FROM ..";
private static final String UPDATE_QUERY = "UPDATE ...";
private static final String DELETE_QUERY = "DELETE FROM ..";
@CacheStoreSessionResource
private CacheStoreSession session;
@Override
public MyCache load(String key) throws CacheLoaderException {
Connection connection = session.attachment();
try (PreparedStatement preparedStatement = connection.prepareStatement(GET_QUERY)) {
// some stuff
}
}
@Override
public void loadCache(IgniteBiInClosure<String, MyCache> clo, Object... args) {
super.loadCache(clo, args);
}
@Override
public void write(Cache.Entry<? extends String, ? extends MyCache> entry) throws CacheWriterException {
Connection connection = session.attachment();
try (PreparedStatement preparedStatement = connection.prepareStatement(UPDATE_QUERY)) {
// some stuff
}
}
@Override
public void delete(Object key) throws CacheWriterException {
Connection connection = session.attachment();
try (PreparedStatement preparedStatement = connection.prepareStatement(DELETE_QUERY)) {
// some stuff
}
}
}
MyCache
是标准 class:
public class MyCache implements Serializable {
@QuerySqlField(index = true, name = "id")
private String id;
public MyCache() {
}
public MyCache(String id) {
this.id = id;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
}
这里是一个配置class
import javax.cache.configuration.Factory;
import javax.cache.configuration.FactoryBuilder;
@Configuration
public class ServiceConfig {
// no problems here
@Bean
@ConfigurationProperties(prefix = "postgre")
DataSource dataSource() {
return DataSourceBuilder
.create()
.build();
}
@Bean
public Ignite igniteInstance(IgniteConfiguration igniteConfiguration) {
return Ignition.start(igniteConfiguration);
}
@Bean
public IgniteConfiguration igniteCfg () {
// some other stuff here
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setClientMode(true);
CacheConfiguration myCacheConfiguration = new CacheConfiguration("MY_CACHE")
.setIndexedTypes(String.class, MyCache.class)
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
.setReadThrough(true)
.setReadThrough(true)
.setCacheStoreSessionListenerFactories(new MyCacheStoreSessionListenerFactory(dataSource))
.setCacheStoreFactory(FactoryBuilder.factoryOf(MyCacheStore.class));
cfg.setCacheConfiguration(myCacheConfiguration);
return cfg;
}
private static class MyCacheStoreSessionListenerFactory implements Factory {
DataSource dataSource;
MyCacheStoreSessionListenerFactory(DataSource dataSource) {
this.dataSource = dataSource;
}
@Override
public CacheStoreSessionListener create() {
// Data Source
CacheJdbcStoreSessionListener listener = new CacheJdbcStoreSessionListener();
listener.setDataSource(dataSource);
return listener;
}
}
}
这是我在日志中得到的:
...
Caused by: class org.apache.ignite.IgniteCheckedException: Failed to validate cache configuration
(make sure all objects in cache configuration are serializable): MyCache
at org.apache.ignite.internal.processors.cache.GridCacheProcessor.applyx(GridCacheProcessor.java:4766)
at org.apache.ignite.internal.processors.cache.GridCacheProcessor.applyx(GridCacheProcessor.java:4743)
at org.apache.ignite.internal.processors.cache.GridCacheProcessor.withBinaryContext(GridCacheProcessor.java:4788)
at org.apache.ignite.internal.processors.cache.GridCacheProcessor.cloneCheckSerializable(GridCacheProcessor.java:4743)
at org.apache.ignite.internal.processors.cache.GridCacheProcessor.addCacheOnJoin(GridCacheProcessor.java:818)
at org.apache.ignite.internal.processors.cache.GridCacheProcessor.addCacheOnJoinFromConfig(GridCacheProcessor.java:891)
at org.apache.ignite.internal.processors.cache.GridCacheProcessor.startCachesOnStart(GridCacheProcessor.java:753)
at org.apache.ignite.internal.processors.cache.GridCacheProcessor.start(GridCacheProcessor.java:795)
at org.apache.ignite.internal.IgniteKernal.startProcessor(IgniteKernal.java:1700)
... 77 more
Caused by: class org.apache.ignite.IgniteCheckedException: Failed to serialize object: CacheConfiguration [name=MyCache, grpName=null, memPlcName=null, storeConcurrentLoadAllThreshold=5, rebalancePoolSize=2, rebalanceTimeout=10000, evictPlc=null, evictPlcFactory=null, onheapCache=false, sqlOnheapCache=false, sqlOnheapCacheMaxSize=0, evictFilter=null, eagerTtl=true, dfltLockTimeout=0, nearCfg=null, writeSync=null, storeFactory=javax.cache.configuration.FactoryBuilder$ClassFactory@d87782a1, storeKeepBinary=false, loadPrevVal=false, aff=null, cacheMode=PARTITIONED, atomicityMode=TRANSACTIONAL, backups=0, invalidate=false, tmLookupClsName=null, rebalanceMode=ASYNC, rebalanceOrder=0, rebalanceBatchSize=524288, rebalanceBatchesPrefetchCnt=2, maxConcurrentAsyncOps=500, sqlIdxMaxInlineSize=-1, writeBehindEnabled=false, writeBehindFlushSize=10240, writeBehindFlushFreq=5000, writeBehindFlushThreadCnt=1, writeBehindBatchSize=512, writeBehindCoalescing=true, maxQryIterCnt=1024, affMapper=null, rebalanceDelay=0, rebalanceThrottle=0, interceptor=null, longQryWarnTimeout=3000, qryDetailMetricsSz=0, readFromBackup=true, nodeFilter=null, sqlSchema=null, sqlEscapeAll=false, cpOnRead=true, topValidator=null, partLossPlc=IGNORE, qryParallelism=1, evtsDisabled=false, encryptionEnabled=false]
at org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:103)
at org.apache.ignite.marshaller.AbstractNodeNameAwareMarshaller.marshal(AbstractNodeNameAwareMarshaller.java:70)
at org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:117)
at org.apache.ignite.marshaller.AbstractNodeNameAwareMarshaller.marshal(AbstractNodeNameAwareMarshaller.java:58)
at org.apache.ignite.internal.util.IgniteUtils.marshal(IgniteUtils.java:10250)
at org.apache.ignite.internal.processors.cache.GridCacheProcessor.applyx(GridCacheProcessor.java:4762)
... 85 more
Caused by: java.io.NotSerializableException: com.zaxxer.hikari.HikariDataSource
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
我已经阅读了有关它的所有官方文档并检查了许多其他示例,但无法做到 运行。
HikariCP
是最流行的连接池库,我不明白为什么Ignite
会抛出无法序列化的异常DataSource
。
如有任何建议或想法,我们将不胜感激,谢谢!
由于您的缓存存储不可序列化,因此您不应使用 Factory.factoryOf
(这是一个无操作包装器),而是提供一个真正的可序列化工厂实现,它将在节点上获取本地 HikariCP,然后构造缓存存储。
我正在尝试使用 PostgreSQL 作为外部存储设置 Apache Ignite 缓存存储。
public class MyCacheStore extends CacheStoreAdapter<String, MyCache> {
private static final String GET_QUERY= "SELECT * FROM ..";
private static final String UPDATE_QUERY = "UPDATE ...";
private static final String DELETE_QUERY = "DELETE FROM ..";
@CacheStoreSessionResource
private CacheStoreSession session;
@Override
public MyCache load(String key) throws CacheLoaderException {
Connection connection = session.attachment();
try (PreparedStatement preparedStatement = connection.prepareStatement(GET_QUERY)) {
// some stuff
}
}
@Override
public void loadCache(IgniteBiInClosure<String, MyCache> clo, Object... args) {
super.loadCache(clo, args);
}
@Override
public void write(Cache.Entry<? extends String, ? extends MyCache> entry) throws CacheWriterException {
Connection connection = session.attachment();
try (PreparedStatement preparedStatement = connection.prepareStatement(UPDATE_QUERY)) {
// some stuff
}
}
@Override
public void delete(Object key) throws CacheWriterException {
Connection connection = session.attachment();
try (PreparedStatement preparedStatement = connection.prepareStatement(DELETE_QUERY)) {
// some stuff
}
}
}
MyCache
是标准 class:
public class MyCache implements Serializable {
@QuerySqlField(index = true, name = "id")
private String id;
public MyCache() {
}
public MyCache(String id) {
this.id = id;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
}
这里是一个配置class
import javax.cache.configuration.Factory;
import javax.cache.configuration.FactoryBuilder;
@Configuration
public class ServiceConfig {
// no problems here
@Bean
@ConfigurationProperties(prefix = "postgre")
DataSource dataSource() {
return DataSourceBuilder
.create()
.build();
}
@Bean
public Ignite igniteInstance(IgniteConfiguration igniteConfiguration) {
return Ignition.start(igniteConfiguration);
}
@Bean
public IgniteConfiguration igniteCfg () {
// some other stuff here
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setClientMode(true);
CacheConfiguration myCacheConfiguration = new CacheConfiguration("MY_CACHE")
.setIndexedTypes(String.class, MyCache.class)
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
.setReadThrough(true)
.setReadThrough(true)
.setCacheStoreSessionListenerFactories(new MyCacheStoreSessionListenerFactory(dataSource))
.setCacheStoreFactory(FactoryBuilder.factoryOf(MyCacheStore.class));
cfg.setCacheConfiguration(myCacheConfiguration);
return cfg;
}
private static class MyCacheStoreSessionListenerFactory implements Factory {
DataSource dataSource;
MyCacheStoreSessionListenerFactory(DataSource dataSource) {
this.dataSource = dataSource;
}
@Override
public CacheStoreSessionListener create() {
// Data Source
CacheJdbcStoreSessionListener listener = new CacheJdbcStoreSessionListener();
listener.setDataSource(dataSource);
return listener;
}
}
}
这是我在日志中得到的:
...
Caused by: class org.apache.ignite.IgniteCheckedException: Failed to validate cache configuration
(make sure all objects in cache configuration are serializable): MyCache
at org.apache.ignite.internal.processors.cache.GridCacheProcessor.applyx(GridCacheProcessor.java:4766)
at org.apache.ignite.internal.processors.cache.GridCacheProcessor.applyx(GridCacheProcessor.java:4743)
at org.apache.ignite.internal.processors.cache.GridCacheProcessor.withBinaryContext(GridCacheProcessor.java:4788)
at org.apache.ignite.internal.processors.cache.GridCacheProcessor.cloneCheckSerializable(GridCacheProcessor.java:4743)
at org.apache.ignite.internal.processors.cache.GridCacheProcessor.addCacheOnJoin(GridCacheProcessor.java:818)
at org.apache.ignite.internal.processors.cache.GridCacheProcessor.addCacheOnJoinFromConfig(GridCacheProcessor.java:891)
at org.apache.ignite.internal.processors.cache.GridCacheProcessor.startCachesOnStart(GridCacheProcessor.java:753)
at org.apache.ignite.internal.processors.cache.GridCacheProcessor.start(GridCacheProcessor.java:795)
at org.apache.ignite.internal.IgniteKernal.startProcessor(IgniteKernal.java:1700)
... 77 more
Caused by: class org.apache.ignite.IgniteCheckedException: Failed to serialize object: CacheConfiguration [name=MyCache, grpName=null, memPlcName=null, storeConcurrentLoadAllThreshold=5, rebalancePoolSize=2, rebalanceTimeout=10000, evictPlc=null, evictPlcFactory=null, onheapCache=false, sqlOnheapCache=false, sqlOnheapCacheMaxSize=0, evictFilter=null, eagerTtl=true, dfltLockTimeout=0, nearCfg=null, writeSync=null, storeFactory=javax.cache.configuration.FactoryBuilder$ClassFactory@d87782a1, storeKeepBinary=false, loadPrevVal=false, aff=null, cacheMode=PARTITIONED, atomicityMode=TRANSACTIONAL, backups=0, invalidate=false, tmLookupClsName=null, rebalanceMode=ASYNC, rebalanceOrder=0, rebalanceBatchSize=524288, rebalanceBatchesPrefetchCnt=2, maxConcurrentAsyncOps=500, sqlIdxMaxInlineSize=-1, writeBehindEnabled=false, writeBehindFlushSize=10240, writeBehindFlushFreq=5000, writeBehindFlushThreadCnt=1, writeBehindBatchSize=512, writeBehindCoalescing=true, maxQryIterCnt=1024, affMapper=null, rebalanceDelay=0, rebalanceThrottle=0, interceptor=null, longQryWarnTimeout=3000, qryDetailMetricsSz=0, readFromBackup=true, nodeFilter=null, sqlSchema=null, sqlEscapeAll=false, cpOnRead=true, topValidator=null, partLossPlc=IGNORE, qryParallelism=1, evtsDisabled=false, encryptionEnabled=false]
at org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:103)
at org.apache.ignite.marshaller.AbstractNodeNameAwareMarshaller.marshal(AbstractNodeNameAwareMarshaller.java:70)
at org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:117)
at org.apache.ignite.marshaller.AbstractNodeNameAwareMarshaller.marshal(AbstractNodeNameAwareMarshaller.java:58)
at org.apache.ignite.internal.util.IgniteUtils.marshal(IgniteUtils.java:10250)
at org.apache.ignite.internal.processors.cache.GridCacheProcessor.applyx(GridCacheProcessor.java:4762)
... 85 more
Caused by: java.io.NotSerializableException: com.zaxxer.hikari.HikariDataSource
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
我已经阅读了有关它的所有官方文档并检查了许多其他示例,但无法做到 运行。
HikariCP
是最流行的连接池库,我不明白为什么Ignite
会抛出无法序列化的异常DataSource
。
如有任何建议或想法,我们将不胜感激,谢谢!
由于您的缓存存储不可序列化,因此您不应使用 Factory.factoryOf
(这是一个无操作包装器),而是提供一个真正的可序列化工厂实现,它将在节点上获取本地 HikariCP,然后构造缓存存储。