删除 gemfire 缓存中的查询?
Delete query in gemfire cache?
当我在 gfsh 控制台中 运行 SELECT 查询时,它按预期工作:
query --query="SELECT * FROM /channelProfiles_composite_asrd WHERE profile.channelCode='JJ'"
但类似的 DELETE 查询失败:
query --query="DELETE * FROM /channelProfiles_composite_asrd WHERE profile.channelCode='JJ'"
Result : false
startCount : 0
endCount : 20
Message : Query is invalid due for error : <Syntax error in query: unexpected token: FROM>
NEXT_STEP_NAME : END
gemfire 支持删除吗?
遗憾的是,Geode / GemFire OQL 不支持删除。您将不得不遍历结果集并删除 'manually'.
在 gfsh 提示符下,您可以使用如下删除命令
removing specific keys
data with String keys
remove --region=RegionName --key=abc
具有其他对象键的数据必须使用 key-class
以及以下
remove --region=RegionName --key-class=java.lang.Integer --key=1
对于大量要删除的记录,我做的是使用Select查询搜索要删除的键列表。然后构建上述删除命令的脚本并将它们 运行 放在一起。
用于删除所有区域数据
remove --region=RegionName --all
否则你需要一个Java程序来使用Gemfire删除区域数据API
我们使用 GemFire,最后我写了一个函数来清除整个区域。这比一个简单的客户端循环一次删除一个条目要快得多,因为函数是分布式的,每个节点只清除该节点本地的条目。
并且函数可以从 gfsh 执行,所以非常容易使用。
如果有用的话,我可以分享这个函数的源代码吗?
这是我编写的一个 GemFire 函数,可以清除一个区域。如果您去掉我们从未使用过的多余功能和额外功能,这段代码可能会短很多。同样事后看来,我不需要 "synchronize" 清除区域列表,因为两个人在同一微秒内调用该函数的几率几乎为零。
我们在 GemFire 7 和 GemFire 8 集群中都使用它。将其放入 jar 并安装后,您可以从 gfsh 调用此函数来清除区域。
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import com.gemstone.gemfire.LogWriter;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.Declarable;
import com.gemstone.gemfire.cache.EntryNotFoundException;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.execute.Function;
import com.gemstone.gemfire.cache.execute.FunctionContext;
import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
import com.gemstone.gemfire.cache.partition.PartitionRegionHelper;
import com.gemstone.gemfire.distributed.DistributedSystem;
public class ClearRegionFunction implements Function, Declarable {
private static final long serialVersionUID = 1L;
private static LogWriter log;
private static List<String> clearingRegionList = new ArrayList<String>();
static {
DistributedSystem ds = CacheFactory.getAnyInstance().getDistributedSystem();
log = ds.getLogWriter();
}
@Override
public void execute(FunctionContext fc) {
RegionFunctionContext rfc = (RegionFunctionContext) fc;
Region region = rfc.getDataSet();
String regionName = region.getName();
//If passed a flag of "true", that says to simulate the clear, but don't actually clear.
//This is used to test if a clear is already in progress, in which case we'd return false.
Boolean simulate = (Boolean)rfc.getArguments();
log.fine("Argument passed = " + simulate);
if (simulate == null) simulate = false;
if (simulate) {
rfc.getResultSender().lastResult( ! clearingRegionList.contains(regionName));
return;
}
log.warning("Clearing region: " + regionName); // Used "warning" because clearing a region is serious.
try {
// Protect against the same region being cleared twice at the same time.
synchronized (clearingRegionList) {
if (clearingRegionList.contains(regionName)) {
log.error("Clear of region " + regionName + " is already in progress. Aborting.");
// Let the client know we ignored their "clear" request.
rfc.getResultSender().lastResult(false);
return;
}
clearingRegionList.add(regionName);
}
if (!PartitionRegionHelper.isPartitionedRegion(region)) {
region.clear();
rfc.getResultSender().lastResult(true);
} else {
// We are going to clear the region in a partitioned manner, each node only clearing
// the data in it's own node. So we need to get the "local" region for the node.
Region localRegion = PartitionRegionHelper.getLocalDataForContext(rfc);
// Beware, this keySet() is a reference to the actual LIVE key set in memory. So
// we need to clone the set of keys we want to delete, otherwise we'll be looping
// through a live list and potentially deleting items that were added after the
// delete started.
List keyList = new ArrayList(localRegion.keySet());
// Once we have the keys, go ahead and set the lastResult to "true" to
// unblock the caller, because this could take a while. (The caller doesn't actually
// unblock until ALL nodes have returned "true".)
rfc.getResultSender().lastResult(true);
int regionSize = keyList.size();
log.info("Region " + regionName + " has " + regionSize + " entries to clear.");
int count = 0;
for (Object key : keyList) {
//The "remove" method returns the object removed. This is bad because it (sometimes?) causes
//GemFire to try and deserialize the object, and that fails because we don't have the class on
//our server classpath. But if we invalidate first, it destroys the entry object without
//deserializing it. Then "remove" cleans up the key.
try {
localRegion.invalidate(key);
localRegion.remove(key);
} catch (EntryNotFoundException enfe) { //If the entry has disappeared (or expired) by the time we try to remove it,
//then the GemFire API will throw an exception. But this is okay.
log.warning("Entry not found for key = " + key.toString(), enfe);
}
count++;
// Every 10000 is frequent enough to give you a quick pulse, but
// not so frequent as to spam your log.
if (count % 10000 == 0) {
log.info("Cleared " + count + "/" + regionSize + " entries for region " + regionName);
}
}
}
log.warning("Region cleared: " + regionName);
synchronized (clearingRegionList) {
clearingRegionList.remove(regionName);
}
} catch (RuntimeException rex) {
// Make sure we clean up our tracking list even in the unlikely event of a blowup.
clearingRegionList.remove(regionName);
log.error(rex.toString(), rex); // Log AND throw is bad, but from my experience, a RunTimeException
// CAN get sent all the way back to the client and never show
// up in gemfire.log. (If the exception happens before last result)
throw rex;
}
}
@Override
public String getId() {
return "clear-region-function";
}
@Override
public void init(Properties arg0) { }
@Override
public boolean hasResult() { return true; }
@Override
public boolean isHA() { return true; }
@Override
public boolean optimizeForWrite() {return true;}
}
您需要使用 GemFire 功能来清除区域,它是从 gemfire 区域中删除记录的非常快速和优化的方法,请在以下 github repo
中找到详细代码
POM:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>GemFireRemoveAllDataFunction</groupId>
<artifactId>GemFireRemoveAllDataFunction</artifactId>
<version>0.0.1-SNAPSHOT</version>
<build>
<sourceDirectory>src</sourceDirectory>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>io.pivotal.gemfire</groupId>
<artifactId>geode-core</artifactId>
<version>9.6.0</version>
</dependency>
</dependencies>
</project>
函数:
package com.khan.viquar.gemfire;
import java.util.ArrayList;
import java.util.List;
import org.apache.geode.cache.Declarable;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionContext;
import org.apache.geode.cache.execute.RegionFunctionContext;
import org.apache.geode.cache.partition.PartitionRegionHelper;
@SuppressWarnings("rawtypes")
public class ClearRegionRemoveAllDataFunction implements Function, Declarable {
private static final long serialVersionUID = 11L;
private static final int batchSize = 30000;
@SuppressWarnings("unchecked")
public void execute(final FunctionContext ctx) {
if (ctx instanceof RegionFunctionContext) {
final RegionFunctionContext rfc = (RegionFunctionContext) ctx;
try {
final Region<Object, Object> region = rfc.getDataSet();
if (PartitionRegionHelper.isPartitionedRegion(region)) {
clear(PartitionRegionHelper.getLocalDataForContext(rfc));
} else {
clear(region);
}
ctx.getResultSender().lastResult("Success");
} catch (final Throwable t) {
rfc.getResultSender().sendException(t);
}
} else {
ctx.getResultSender().lastResult("ERROR: The function must be executed on region!");
}
}
private void clear(final Region<Object, Object> localRegion) {
int numLocalEntries = localRegion.keySet().size();
if (numLocalEntries <= batchSize) {
localRegion.removeAll(localRegion.keySet());
} else {
final List<Object> buffer = new ArrayList<Object>(batchSize);
int count = 0;
for (final Object k : localRegion.keySet()) {
buffer.add(k);
count++;
if (count == batchSize) {
localRegion.removeAll(buffer);
buffer.clear();
count = 0;
} else {
continue;
}
}
localRegion.removeAll(buffer);
}
}
public boolean hasResult() {
return true;
}
public String getId() {
return ClearRegionRemoveAllFunction.class.getSimpleName();
}
public boolean optimizeForWrite() {
return true;
}
public boolean isHA() {
return true;
}
}
当我在 gfsh 控制台中 运行 SELECT 查询时,它按预期工作:
query --query="SELECT * FROM /channelProfiles_composite_asrd WHERE profile.channelCode='JJ'"
但类似的 DELETE 查询失败:
query --query="DELETE * FROM /channelProfiles_composite_asrd WHERE profile.channelCode='JJ'"
Result : false
startCount : 0
endCount : 20
Message : Query is invalid due for error : <Syntax error in query: unexpected token: FROM>
NEXT_STEP_NAME : END
gemfire 支持删除吗?
遗憾的是,Geode / GemFire OQL 不支持删除。您将不得不遍历结果集并删除 'manually'.
在 gfsh 提示符下,您可以使用如下删除命令
removing specific keys
data with String keys
remove --region=RegionName --key=abc
具有其他对象键的数据必须使用 key-class
以及以下
remove --region=RegionName --key-class=java.lang.Integer --key=1
对于大量要删除的记录,我做的是使用Select查询搜索要删除的键列表。然后构建上述删除命令的脚本并将它们 运行 放在一起。
用于删除所有区域数据
remove --region=RegionName --all
否则你需要一个Java程序来使用Gemfire删除区域数据API
我们使用 GemFire,最后我写了一个函数来清除整个区域。这比一个简单的客户端循环一次删除一个条目要快得多,因为函数是分布式的,每个节点只清除该节点本地的条目。
并且函数可以从 gfsh 执行,所以非常容易使用。
如果有用的话,我可以分享这个函数的源代码吗?
这是我编写的一个 GemFire 函数,可以清除一个区域。如果您去掉我们从未使用过的多余功能和额外功能,这段代码可能会短很多。同样事后看来,我不需要 "synchronize" 清除区域列表,因为两个人在同一微秒内调用该函数的几率几乎为零。
我们在 GemFire 7 和 GemFire 8 集群中都使用它。将其放入 jar 并安装后,您可以从 gfsh 调用此函数来清除区域。
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import com.gemstone.gemfire.LogWriter;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.Declarable;
import com.gemstone.gemfire.cache.EntryNotFoundException;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.execute.Function;
import com.gemstone.gemfire.cache.execute.FunctionContext;
import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
import com.gemstone.gemfire.cache.partition.PartitionRegionHelper;
import com.gemstone.gemfire.distributed.DistributedSystem;
public class ClearRegionFunction implements Function, Declarable {
private static final long serialVersionUID = 1L;
private static LogWriter log;
private static List<String> clearingRegionList = new ArrayList<String>();
static {
DistributedSystem ds = CacheFactory.getAnyInstance().getDistributedSystem();
log = ds.getLogWriter();
}
@Override
public void execute(FunctionContext fc) {
RegionFunctionContext rfc = (RegionFunctionContext) fc;
Region region = rfc.getDataSet();
String regionName = region.getName();
//If passed a flag of "true", that says to simulate the clear, but don't actually clear.
//This is used to test if a clear is already in progress, in which case we'd return false.
Boolean simulate = (Boolean)rfc.getArguments();
log.fine("Argument passed = " + simulate);
if (simulate == null) simulate = false;
if (simulate) {
rfc.getResultSender().lastResult( ! clearingRegionList.contains(regionName));
return;
}
log.warning("Clearing region: " + regionName); // Used "warning" because clearing a region is serious.
try {
// Protect against the same region being cleared twice at the same time.
synchronized (clearingRegionList) {
if (clearingRegionList.contains(regionName)) {
log.error("Clear of region " + regionName + " is already in progress. Aborting.");
// Let the client know we ignored their "clear" request.
rfc.getResultSender().lastResult(false);
return;
}
clearingRegionList.add(regionName);
}
if (!PartitionRegionHelper.isPartitionedRegion(region)) {
region.clear();
rfc.getResultSender().lastResult(true);
} else {
// We are going to clear the region in a partitioned manner, each node only clearing
// the data in it's own node. So we need to get the "local" region for the node.
Region localRegion = PartitionRegionHelper.getLocalDataForContext(rfc);
// Beware, this keySet() is a reference to the actual LIVE key set in memory. So
// we need to clone the set of keys we want to delete, otherwise we'll be looping
// through a live list and potentially deleting items that were added after the
// delete started.
List keyList = new ArrayList(localRegion.keySet());
// Once we have the keys, go ahead and set the lastResult to "true" to
// unblock the caller, because this could take a while. (The caller doesn't actually
// unblock until ALL nodes have returned "true".)
rfc.getResultSender().lastResult(true);
int regionSize = keyList.size();
log.info("Region " + regionName + " has " + regionSize + " entries to clear.");
int count = 0;
for (Object key : keyList) {
//The "remove" method returns the object removed. This is bad because it (sometimes?) causes
//GemFire to try and deserialize the object, and that fails because we don't have the class on
//our server classpath. But if we invalidate first, it destroys the entry object without
//deserializing it. Then "remove" cleans up the key.
try {
localRegion.invalidate(key);
localRegion.remove(key);
} catch (EntryNotFoundException enfe) { //If the entry has disappeared (or expired) by the time we try to remove it,
//then the GemFire API will throw an exception. But this is okay.
log.warning("Entry not found for key = " + key.toString(), enfe);
}
count++;
// Every 10000 is frequent enough to give you a quick pulse, but
// not so frequent as to spam your log.
if (count % 10000 == 0) {
log.info("Cleared " + count + "/" + regionSize + " entries for region " + regionName);
}
}
}
log.warning("Region cleared: " + regionName);
synchronized (clearingRegionList) {
clearingRegionList.remove(regionName);
}
} catch (RuntimeException rex) {
// Make sure we clean up our tracking list even in the unlikely event of a blowup.
clearingRegionList.remove(regionName);
log.error(rex.toString(), rex); // Log AND throw is bad, but from my experience, a RunTimeException
// CAN get sent all the way back to the client and never show
// up in gemfire.log. (If the exception happens before last result)
throw rex;
}
}
@Override
public String getId() {
return "clear-region-function";
}
@Override
public void init(Properties arg0) { }
@Override
public boolean hasResult() { return true; }
@Override
public boolean isHA() { return true; }
@Override
public boolean optimizeForWrite() {return true;}
}
您需要使用 GemFire 功能来清除区域,它是从 gemfire 区域中删除记录的非常快速和优化的方法,请在以下 github repo
中找到详细代码POM:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>GemFireRemoveAllDataFunction</groupId>
<artifactId>GemFireRemoveAllDataFunction</artifactId>
<version>0.0.1-SNAPSHOT</version>
<build>
<sourceDirectory>src</sourceDirectory>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>io.pivotal.gemfire</groupId>
<artifactId>geode-core</artifactId>
<version>9.6.0</version>
</dependency>
</dependencies>
</project>
函数:
package com.khan.viquar.gemfire;
import java.util.ArrayList;
import java.util.List;
import org.apache.geode.cache.Declarable;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionContext;
import org.apache.geode.cache.execute.RegionFunctionContext;
import org.apache.geode.cache.partition.PartitionRegionHelper;
@SuppressWarnings("rawtypes")
public class ClearRegionRemoveAllDataFunction implements Function, Declarable {
private static final long serialVersionUID = 11L;
private static final int batchSize = 30000;
@SuppressWarnings("unchecked")
public void execute(final FunctionContext ctx) {
if (ctx instanceof RegionFunctionContext) {
final RegionFunctionContext rfc = (RegionFunctionContext) ctx;
try {
final Region<Object, Object> region = rfc.getDataSet();
if (PartitionRegionHelper.isPartitionedRegion(region)) {
clear(PartitionRegionHelper.getLocalDataForContext(rfc));
} else {
clear(region);
}
ctx.getResultSender().lastResult("Success");
} catch (final Throwable t) {
rfc.getResultSender().sendException(t);
}
} else {
ctx.getResultSender().lastResult("ERROR: The function must be executed on region!");
}
}
private void clear(final Region<Object, Object> localRegion) {
int numLocalEntries = localRegion.keySet().size();
if (numLocalEntries <= batchSize) {
localRegion.removeAll(localRegion.keySet());
} else {
final List<Object> buffer = new ArrayList<Object>(batchSize);
int count = 0;
for (final Object k : localRegion.keySet()) {
buffer.add(k);
count++;
if (count == batchSize) {
localRegion.removeAll(buffer);
buffer.clear();
count = 0;
} else {
continue;
}
}
localRegion.removeAll(buffer);
}
}
public boolean hasResult() {
return true;
}
public String getId() {
return ClearRegionRemoveAllFunction.class.getSimpleName();
}
public boolean optimizeForWrite() {
return true;
}
public boolean isHA() {
return true;
}
}