Ignite Write 内部结构
Ignite Write Behind Internals
我正在使用 Ignite 1.7.0 并正在测试 Apache Ignite 的后写功能。问这个问题的目的是为了更好地了解在 Apache Ignite 中启用 write behind 功能时幕后发生的事情。
我有一个 Ignite 客户端程序,它将 20 个条目插入测试缓存(称之为 "test_cache")。
Ignite 服务器 运行 在同一台机器上,但在不同的 JVM 上。
Ignite Cache 具有以下配置设置:
- Read through、Write Through 和 Write behind 已启用。
- 同花大小为 13
- 刷新线程数为 1
所有其他属性都设置为默认值。
除此之外还有一个为缓存配置的缓存存储,代码如下:
package com.ignite.genericpoc;
import java.util.Collection;
import java.util.Map;
import javax.cache.Cache.Entry;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.resources.CacheNameResource;
import org.apache.ignite.resources.IgniteInstanceResource;
public class IgniteStoreTest implements CacheStore<String, String> {
@IgniteInstanceResource
Ignite gridReference;
@CacheNameResource
String cacheName;
@Override
public String load(String key) throws CacheLoaderException {
System.out.println("load method called for the key [ " + key + " ] and cache [ " + cacheName + " ] ");
return null;
}
@Override
public Map<String, String> loadAll(Iterable<? extends String> keys) throws CacheLoaderException {
IgniteCache<String, String> ic = gridReference.cache(cacheName);
int currentKeyNo = 0;
for (String key : keys) {
ic.put(key, "Value:" + currentKeyNo);
currentKeyNo++;
}
System.out.println("Got " + currentKeyNo + " entries");
return null;
}
@Override
public void write(Entry<? extends String, ? extends String> entry) throws CacheWriterException {
System.out.println("Write method called");
}
@Override
public void writeAll(Collection<Entry<? extends String, ? extends String>> entries) throws CacheWriterException {
System.out.println("Write all method called for [ " + entries.size() + " ] entries in the thread "
+ Thread.currentThread().getName());
System.out.println("Entries recieved by " + Thread.currentThread().getName() + " : " + entries.toString());
try {
Thread.sleep(60000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void delete(Object key) throws CacheWriterException {
System.out.println("Delete method called");
}
@Override
public void deleteAll(Collection<?> keys) throws CacheWriterException {
System.out.println("Delete All method called");
}
@Override
public void loadCache(IgniteBiInClosure<String, String> clo, Object... args) throws CacheLoaderException {
System.out.println("Load cache method called with " + args[0].toString());
}
@Override
public void sessionEnd(boolean commit) throws CacheWriterException {
System.out.println("Session End called");
}
}
我特意按顺序在 writeAll() 方法中调用了 Thread.sleep() 方法,以模拟缓慢的数据库写入。
将数据加载到缓存中的 Ignite 客户端代码如下:
package com.ignite.genericpoc;
import java.util.ArrayList;
import java.util.List;
import javax.cache.configuration.FactoryBuilder;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.CacheConfiguration;
public class IgnitePersistentStoreClientTest {
public static void main(String[] args) throws InterruptedException {
List<String> addressess = new ArrayList<>();
addressess.add("*.*.*.*:47500"); // Hiding the IP
Ignition.setClientMode(true);
Ignite i = IgniteConfigurationUtil.startIgniteServer(
IgniteConfigurationUtil.getIgniteConfiguration(false, IgniteTestConstants.GRID_NAME, addressess));
System.out.println("Client Started");
CacheConfiguration<String, String> ccfg = new CacheConfiguration<>();
ccfg.setName("Persistent_Store_Test_Cache");
ccfg.setCacheStoreFactory(FactoryBuilder.factoryOf(IgniteStoreTest.class));
ccfg.setReadThrough(true);
ccfg.setWriteThrough(true);
ccfg.setWriteBehindEnabled(true);
ccfg.setWriteBehindFlushSize(13);
ccfg.setWriteBehindFlushThreadCount(1);
System.out.println(ccfg.getWriteBehindBatchSize());
IgniteCache<String, String> ic = i.getOrCreateCache(ccfg);
System.out.println("Cache Created");
for (int t = 1; t <= 20; t++) {
System.out.println("Loading key "+t);
ic.put("Key:" + t,"Value: "+t);
System.out.println("Key "+ t + " loaded ");
}
System.out.println("Cache Loaded");
i.close();
}
}
执行如下:
首先启动 Ignite 服务器。
加载数据的Ignite Client在服务端启动后
因为在 writeAll() 方法上定义了 60 秒的休眠,Ignite 客户端在写入第 20 个条目时卡住了。
此外,我可以在服务器日志中看到为两个线程调用了 writeAll() 方法,其中 Flush 线程已收到 15 个要写入存储的条目和一个系统线程已收到 1 个写入商店的条目。 Ignite 服务器日志如下:
在线程 flusher-0-#66%test_grid%
中写入为 [ 15 ] 个条目调用的所有方法
在线程 sys-#22%test_grid%
中写入为 [ 1 ] 个条目调用的所有方法
我可以理解 Ignite Client put 在写入 20 条目时卡住了,因为 Write Behind 缓存已满并且所有 Flush 线程也都在忙于写入数据。
以下是我需要弄清楚的几点:
为什么客户端在插入第20个条目时被阻塞,它应该在插入第14个条目时被阻塞(基于13个条目的最大缓存大小)
为什么Flush线程只调用了15个条目而不是所有19个条目,因为我没有设置批处理大小,它默认为512。
使用 writeAll() 方法调用的系统线程是否与处理来自 Ignite 客户端的请求以放置第 20 个条目的线程相同。
考虑到我的缓存已启用后写且写入顺序模式为 PRIMARY_SYNC(默认)并且缓存中没有备份,应阻止对缓存的任何 put 调用,直到主节点能够提交写入。这是否也意味着能够将条目放入 Write Behind 缓存中。
在服务器中存储条目的情况下,Ignite Server 是否会制作条目的两份副本,一份用于存储,一份用于后写缓存。或者是否使用了相同条目的引用。
感谢您耐心看完问题。如果问题太长,我深表歉意,但内容对于向相关听众详细说明情况至关重要。
后写存储在引擎盖下有背压控制。这意味着如果系统无法处理所有异步操作,可以即时将其转换为同步操作。
如果底层write-behind cache的大小超过临界大小(flushSize * 1.5),将使用正在执行写操作的线程而不是flusherThread。
这就是您在日志中看到这些线程的原因:
- flusher-0-#66%test_grid%(普通冲洗器螺纹)
- sys-#22%test_grid%(背压控制为运行,操作使用客户端线程)
Considering my Cache has write behind enabled and Write Order Mode is
PRIMARY_SYNC ( default ) and there are no backups in the cache, any
put call to the cache should be blocked until the primary node is able
to commit the write. Does this also mean able to put the entry in the
Write Behind cache.
是的,确实如此。
In case of storing an entry in the server, does Ignite Server makes
two copies of the entry one for storage and one for the write behind
cache. Or is the same entry's reference used.
应使用同一条目的引用。
让我们逐步考虑这种情况:
客户端线程已上传 14 个条目。 GridCacheWriteBehindStore
检测到底层缓存中的条目数量超过刷新大小并发送信号以唤醒刷新线程。
请参阅 GridCacheWriteBehindStore#updateCache()
flusher 线程唤醒并尝试通过 write-behind-cache.entrySet().iterator()
从后写缓存(即 ConcurrentLinkedHashMap
)获取数据。
此迭代器提供弱一致性遍历,即不保证它反映构造后的任何修改。
重要的是客户端线程并行放置新条目。
客户端线程放入最后一个值 [key=Key:20, val=Value: 20]
。同时flusher线程被writeAll()
方法中的Thread.sleep()
阻塞。
GridCacheWriteBehindStore
检测到write-behind cache的当前大小超过了临界大小(flush size * 1.5),因此应该使用背压机制。
GridCacheWriteBehindStore
调用 flushSingleValue()
方法以从后写缓存中刷新最旧的值(当然,此值不应被刷新线程之前获取)。
flushSingleValue()
方法在客户端线程的上下文中被调用。
之后,flusher 线程唤醒并处理剩余的条目。
希望对理解后写存储实现有所帮助。
谢谢!
我正在使用 Ignite 1.7.0 并正在测试 Apache Ignite 的后写功能。问这个问题的目的是为了更好地了解在 Apache Ignite 中启用 write behind 功能时幕后发生的事情。
我有一个 Ignite 客户端程序,它将 20 个条目插入测试缓存(称之为 "test_cache")。
Ignite 服务器 运行 在同一台机器上,但在不同的 JVM 上。
Ignite Cache 具有以下配置设置:
- Read through、Write Through 和 Write behind 已启用。
- 同花大小为 13
- 刷新线程数为 1
所有其他属性都设置为默认值。
除此之外还有一个为缓存配置的缓存存储,代码如下:
package com.ignite.genericpoc;
import java.util.Collection;
import java.util.Map;
import javax.cache.Cache.Entry;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.resources.CacheNameResource;
import org.apache.ignite.resources.IgniteInstanceResource;
public class IgniteStoreTest implements CacheStore<String, String> {
@IgniteInstanceResource
Ignite gridReference;
@CacheNameResource
String cacheName;
@Override
public String load(String key) throws CacheLoaderException {
System.out.println("load method called for the key [ " + key + " ] and cache [ " + cacheName + " ] ");
return null;
}
@Override
public Map<String, String> loadAll(Iterable<? extends String> keys) throws CacheLoaderException {
IgniteCache<String, String> ic = gridReference.cache(cacheName);
int currentKeyNo = 0;
for (String key : keys) {
ic.put(key, "Value:" + currentKeyNo);
currentKeyNo++;
}
System.out.println("Got " + currentKeyNo + " entries");
return null;
}
@Override
public void write(Entry<? extends String, ? extends String> entry) throws CacheWriterException {
System.out.println("Write method called");
}
@Override
public void writeAll(Collection<Entry<? extends String, ? extends String>> entries) throws CacheWriterException {
System.out.println("Write all method called for [ " + entries.size() + " ] entries in the thread "
+ Thread.currentThread().getName());
System.out.println("Entries recieved by " + Thread.currentThread().getName() + " : " + entries.toString());
try {
Thread.sleep(60000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void delete(Object key) throws CacheWriterException {
System.out.println("Delete method called");
}
@Override
public void deleteAll(Collection<?> keys) throws CacheWriterException {
System.out.println("Delete All method called");
}
@Override
public void loadCache(IgniteBiInClosure<String, String> clo, Object... args) throws CacheLoaderException {
System.out.println("Load cache method called with " + args[0].toString());
}
@Override
public void sessionEnd(boolean commit) throws CacheWriterException {
System.out.println("Session End called");
}
}
我特意按顺序在 writeAll() 方法中调用了 Thread.sleep() 方法,以模拟缓慢的数据库写入。
将数据加载到缓存中的 Ignite 客户端代码如下:
package com.ignite.genericpoc;
import java.util.ArrayList;
import java.util.List;
import javax.cache.configuration.FactoryBuilder;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.CacheConfiguration;
public class IgnitePersistentStoreClientTest {
public static void main(String[] args) throws InterruptedException {
List<String> addressess = new ArrayList<>();
addressess.add("*.*.*.*:47500"); // Hiding the IP
Ignition.setClientMode(true);
Ignite i = IgniteConfigurationUtil.startIgniteServer(
IgniteConfigurationUtil.getIgniteConfiguration(false, IgniteTestConstants.GRID_NAME, addressess));
System.out.println("Client Started");
CacheConfiguration<String, String> ccfg = new CacheConfiguration<>();
ccfg.setName("Persistent_Store_Test_Cache");
ccfg.setCacheStoreFactory(FactoryBuilder.factoryOf(IgniteStoreTest.class));
ccfg.setReadThrough(true);
ccfg.setWriteThrough(true);
ccfg.setWriteBehindEnabled(true);
ccfg.setWriteBehindFlushSize(13);
ccfg.setWriteBehindFlushThreadCount(1);
System.out.println(ccfg.getWriteBehindBatchSize());
IgniteCache<String, String> ic = i.getOrCreateCache(ccfg);
System.out.println("Cache Created");
for (int t = 1; t <= 20; t++) {
System.out.println("Loading key "+t);
ic.put("Key:" + t,"Value: "+t);
System.out.println("Key "+ t + " loaded ");
}
System.out.println("Cache Loaded");
i.close();
}
}
执行如下:
首先启动 Ignite 服务器。
加载数据的Ignite Client在服务端启动后
因为在 writeAll() 方法上定义了 60 秒的休眠,Ignite 客户端在写入第 20 个条目时卡住了。
此外,我可以在服务器日志中看到为两个线程调用了 writeAll() 方法,其中 Flush 线程已收到 15 个要写入存储的条目和一个系统线程已收到 1 个写入商店的条目。 Ignite 服务器日志如下:
在线程 flusher-0-#66%test_grid%
中写入为 [ 15 ] 个条目调用的所有方法在线程 sys-#22%test_grid%
中写入为 [ 1 ] 个条目调用的所有方法
我可以理解 Ignite Client put 在写入 20 条目时卡住了,因为 Write Behind 缓存已满并且所有 Flush 线程也都在忙于写入数据。
以下是我需要弄清楚的几点:
为什么客户端在插入第20个条目时被阻塞,它应该在插入第14个条目时被阻塞(基于13个条目的最大缓存大小)
为什么Flush线程只调用了15个条目而不是所有19个条目,因为我没有设置批处理大小,它默认为512。
使用 writeAll() 方法调用的系统线程是否与处理来自 Ignite 客户端的请求以放置第 20 个条目的线程相同。
考虑到我的缓存已启用后写且写入顺序模式为 PRIMARY_SYNC(默认)并且缓存中没有备份,应阻止对缓存的任何 put 调用,直到主节点能够提交写入。这是否也意味着能够将条目放入 Write Behind 缓存中。
在服务器中存储条目的情况下,Ignite Server 是否会制作条目的两份副本,一份用于存储,一份用于后写缓存。或者是否使用了相同条目的引用。
感谢您耐心看完问题。如果问题太长,我深表歉意,但内容对于向相关听众详细说明情况至关重要。
后写存储在引擎盖下有背压控制。这意味着如果系统无法处理所有异步操作,可以即时将其转换为同步操作。
如果底层write-behind cache的大小超过临界大小(flushSize * 1.5),将使用正在执行写操作的线程而不是flusherThread。
这就是您在日志中看到这些线程的原因:
- flusher-0-#66%test_grid%(普通冲洗器螺纹)
- sys-#22%test_grid%(背压控制为运行,操作使用客户端线程)
Considering my Cache has write behind enabled and Write Order Mode is PRIMARY_SYNC ( default ) and there are no backups in the cache, any put call to the cache should be blocked until the primary node is able to commit the write. Does this also mean able to put the entry in the Write Behind cache.
是的,确实如此。
In case of storing an entry in the server, does Ignite Server makes two copies of the entry one for storage and one for the write behind cache. Or is the same entry's reference used.
应使用同一条目的引用。
让我们逐步考虑这种情况:
客户端线程已上传 14 个条目。
GridCacheWriteBehindStore
检测到底层缓存中的条目数量超过刷新大小并发送信号以唤醒刷新线程。 请参阅GridCacheWriteBehindStore#updateCache()
flusher 线程唤醒并尝试通过
write-behind-cache.entrySet().iterator()
从后写缓存(即ConcurrentLinkedHashMap
)获取数据。 此迭代器提供弱一致性遍历,即不保证它反映构造后的任何修改。 重要的是客户端线程并行放置新条目。客户端线程放入最后一个值
[key=Key:20, val=Value: 20]
。同时flusher线程被writeAll()
方法中的Thread.sleep()
阻塞。GridCacheWriteBehindStore
检测到write-behind cache的当前大小超过了临界大小(flush size * 1.5),因此应该使用背压机制。GridCacheWriteBehindStore
调用flushSingleValue()
方法以从后写缓存中刷新最旧的值(当然,此值不应被刷新线程之前获取)。flushSingleValue()
方法在客户端线程的上下文中被调用。之后,flusher 线程唤醒并处理剩余的条目。
希望对理解后写存储实现有所帮助。
谢谢!