IgniteCache 不调用 loadCache()
IgniteCache does not call loadCache()
我已经关注 ignite documentation 并创建了我的商店适配器:
import java.io.Serializable;
import java.util.Map;
import java.util.Optional;
import javax.cache.Cache;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
import org.apache.ignite.Ignite;
import org.apache.ignite.cache.store.CacheStoreAdapter;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.my.calendar.dao.jpa.ChannelDetailsJpaRepository;
import com.my.calendar.entity.ChannelDetails;
@Service
public class CacheChannelStoreAdapter extends CacheStoreAdapter<Long, ChannelDetails> implements Serializable {
private static final Logger LOGGER = LogManager.getLogger(CacheChannelStoreAdapter.class);
private static ChannelDetailsJpaRepository channelDetailsJpaRepository;
// Will be automatically injected. !!! apacheignite info, but it DOES NOT
@IgniteInstanceResource
private Ignite ignite;
public Ignite getIgnite() {
return ignite;
}
public void setIgnite(Ignite ignite) {
this.ignite = ignite;
}
@Autowired
public CacheChannelStoreAdapter(ChannelDetailsJpaRepository channelDetailsJpaRepository) {
this.channelDetailsJpaRepository = channelDetailsJpaRepository;
}
@Override
public void loadCache(IgniteBiInClosure<Long, ChannelDetails> clo, Object... args) {
LOGGER.trace("load cache...");
super.loadCache(clo, args);
}
@Override
public Map<Long, ChannelDetails> loadAll(Iterable<? extends Long> keys) {
LOGGER.trace("load all. keys: {}", keys);
return super.loadAll(keys);
}
@Override
public ChannelDetails load(Long key) throws CacheLoaderException {
LOGGER.debug("object to load: {}", key);
return channelDetailsJpaRepository.findOne(key);
}
@Override
public void write(Cache.Entry<? extends Long, ? extends ChannelDetails> entry) throws CacheWriterException {
LOGGER.debug("object to save: {}", entry);
Optional.ofNullable(entry)
.map(Cache.Entry::getValue)
.ifPresent(channelDetailsJpaRepository::save);
}
@Override
public void delete(Object key) throws CacheWriterException {
LOGGER.debug("object to delete: {}", key);
Optional.ofNullable(key)
.filter(Long.class::isInstance)
.map(Long.class::cast )
.ifPresent(channelDetailsJpaRepository::delete);
}
}
缓存适配器已注入到缓存配置中 cacheConfiguration.setCacheStoreFactory(new FactoryBuilder.SingletonFactory<>(cacheStoreAdapter));
我同时遇到了几个问题:
- 没有看到
CacheStore.loadCache()
的呼叫 | IgniteCache.loadCache()
(应调用以下文档 @Override public ChannelDetails load(Long key)
)
- 没有看到 ignite 注入的
@IgniteInstanceResource private Ignite ignite;
有任何修复或检查的想法吗? ...可能是我错过了重要的部分,我应该在 startTime 或其他什么地方直接调用缓存加载?
添加更多详细信息
所有缓存数据都在缓存时保存到我的数据库中。在服务器启动服务器上,没有从数据库加载数据。
我的适配器的缓存配置部分:
cacheConfiguration.setCacheStoreFactory(new FactoryBuilder.SingletonFactory<>(cacheStoreAdapter));
// Configure cache to use store.
cacheConfiguration.setReadThrough(true);
cacheConfiguration.setWriteThrough(true);
// Enable database batching.
cacheConfiguration.setWriteBehindEnabled(true);
Ignite 从网络服务器开始(在同一个 jvm
,我知道它很丑,但这不是我测试失败的原因)。
Ignite 不会隐式调用 IgniteCache#loadCache
方法。你应该自己做,否则只能通过 get 操作从商店加载条目。
我还检查并确保 Ignite 将本地实例注入商店。你如何检查它?请注意,集群中所有节点上的存储实例不能相同。 Ignite 是分布式系统,这个对象将在所有节点上被序列化和反序列化,而这个对象(cacheStoreAdapter)可能会被归档 null
。当 store 被 Ignite 调用时,你应该看到这个注入。
感谢尼古拉的建议。他没有提供代码,所以我想知道我的情况是如何解决的。我有完整的解决方案。
在 Ignite
配置部分我们应该调用
igniteConcreteCache.loadCache(
(IgniteBiPredicate) (key, val) -> {
System.out.println(" =========>>>> Loading [key=" + key + ", val=" + val + ']');
return true;
});
要从存储中加载数据,我们需要覆盖 StoreAdapter
中的相应方法:
@Override
public void loadCache(IgniteBiInClosure<Long, ChannelDetails> clo, Object... args) {
LOGGER.trace("load cache...");
super.loadCache(clo, args);
}
应该是:
@Override
public void loadCache(IgniteBiInClosure<Long, ChannelDetails> clo, Object... args) {
LOGGER.trace("Load all cache data from db. this part should be used if Ignite starts.");
if (args == null || args.length == 0) {
channelDetailsJpaRepository.findAll()
.forEach(channelDetails -> clo.apply(channelDetails.getId(), channelDetails));
} else {
throw new NotImplementedException("Handle to load objects by id collection.");
}
}
这个解决方案并不完美(它有很多地方需要改进)但它有效,我们可以使用它。
我已经关注 ignite documentation 并创建了我的商店适配器:
import java.io.Serializable;
import java.util.Map;
import java.util.Optional;
import javax.cache.Cache;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
import org.apache.ignite.Ignite;
import org.apache.ignite.cache.store.CacheStoreAdapter;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.my.calendar.dao.jpa.ChannelDetailsJpaRepository;
import com.my.calendar.entity.ChannelDetails;
@Service
public class CacheChannelStoreAdapter extends CacheStoreAdapter<Long, ChannelDetails> implements Serializable {
private static final Logger LOGGER = LogManager.getLogger(CacheChannelStoreAdapter.class);
private static ChannelDetailsJpaRepository channelDetailsJpaRepository;
// Will be automatically injected. !!! apacheignite info, but it DOES NOT
@IgniteInstanceResource
private Ignite ignite;
public Ignite getIgnite() {
return ignite;
}
public void setIgnite(Ignite ignite) {
this.ignite = ignite;
}
@Autowired
public CacheChannelStoreAdapter(ChannelDetailsJpaRepository channelDetailsJpaRepository) {
this.channelDetailsJpaRepository = channelDetailsJpaRepository;
}
@Override
public void loadCache(IgniteBiInClosure<Long, ChannelDetails> clo, Object... args) {
LOGGER.trace("load cache...");
super.loadCache(clo, args);
}
@Override
public Map<Long, ChannelDetails> loadAll(Iterable<? extends Long> keys) {
LOGGER.trace("load all. keys: {}", keys);
return super.loadAll(keys);
}
@Override
public ChannelDetails load(Long key) throws CacheLoaderException {
LOGGER.debug("object to load: {}", key);
return channelDetailsJpaRepository.findOne(key);
}
@Override
public void write(Cache.Entry<? extends Long, ? extends ChannelDetails> entry) throws CacheWriterException {
LOGGER.debug("object to save: {}", entry);
Optional.ofNullable(entry)
.map(Cache.Entry::getValue)
.ifPresent(channelDetailsJpaRepository::save);
}
@Override
public void delete(Object key) throws CacheWriterException {
LOGGER.debug("object to delete: {}", key);
Optional.ofNullable(key)
.filter(Long.class::isInstance)
.map(Long.class::cast )
.ifPresent(channelDetailsJpaRepository::delete);
}
}
缓存适配器已注入到缓存配置中 cacheConfiguration.setCacheStoreFactory(new FactoryBuilder.SingletonFactory<>(cacheStoreAdapter));
我同时遇到了几个问题:
- 没有看到
CacheStore.loadCache()
的呼叫 |IgniteCache.loadCache()
(应调用以下文档@Override public ChannelDetails load(Long key)
) - 没有看到 ignite 注入的
@IgniteInstanceResource private Ignite ignite;
有任何修复或检查的想法吗? ...可能是我错过了重要的部分,我应该在 startTime 或其他什么地方直接调用缓存加载?
添加更多详细信息
所有缓存数据都在缓存时保存到我的数据库中。在服务器启动服务器上,没有从数据库加载数据。
我的适配器的缓存配置部分:
cacheConfiguration.setCacheStoreFactory(new FactoryBuilder.SingletonFactory<>(cacheStoreAdapter));
// Configure cache to use store.
cacheConfiguration.setReadThrough(true);
cacheConfiguration.setWriteThrough(true);
// Enable database batching.
cacheConfiguration.setWriteBehindEnabled(true);
Ignite 从网络服务器开始(在同一个 jvm
,我知道它很丑,但这不是我测试失败的原因)。
Ignite 不会隐式调用 IgniteCache#loadCache
方法。你应该自己做,否则只能通过 get 操作从商店加载条目。
我还检查并确保 Ignite 将本地实例注入商店。你如何检查它?请注意,集群中所有节点上的存储实例不能相同。 Ignite 是分布式系统,这个对象将在所有节点上被序列化和反序列化,而这个对象(cacheStoreAdapter)可能会被归档 null
。当 store 被 Ignite 调用时,你应该看到这个注入。
感谢尼古拉的建议。他没有提供代码,所以我想知道我的情况是如何解决的。我有完整的解决方案。
在 Ignite
配置部分我们应该调用
igniteConcreteCache.loadCache(
(IgniteBiPredicate) (key, val) -> {
System.out.println(" =========>>>> Loading [key=" + key + ", val=" + val + ']');
return true;
});
要从存储中加载数据,我们需要覆盖 StoreAdapter
中的相应方法:
@Override
public void loadCache(IgniteBiInClosure<Long, ChannelDetails> clo, Object... args) {
LOGGER.trace("load cache...");
super.loadCache(clo, args);
}
应该是:
@Override
public void loadCache(IgniteBiInClosure<Long, ChannelDetails> clo, Object... args) {
LOGGER.trace("Load all cache data from db. this part should be used if Ignite starts.");
if (args == null || args.length == 0) {
channelDetailsJpaRepository.findAll()
.forEach(channelDetails -> clo.apply(channelDetails.getId(), channelDetails));
} else {
throw new NotImplementedException("Handle to load objects by id collection.");
}
}
这个解决方案并不完美(它有很多地方需要改进)但它有效,我们可以使用它。