删除 gemfire 缓存中的查询?

Delete query in gemfire cache?

当我在 gfsh 控制台中 运行 SELECT 查询时,它按预期工作:

query --query="SELECT * FROM /channelProfiles_composite_asrd WHERE profile.channelCode='JJ'"

但类似的 DELETE 查询失败:

query --query="DELETE * FROM /channelProfiles_composite_asrd WHERE profile.channelCode='JJ'"
Result     : false
startCount : 0
endCount   : 20
Message    : Query is invalid due for error : <Syntax error in query:  unexpected token: FROM>

NEXT_STEP_NAME : END

gemfire 支持删除吗?

遗憾的是,Geode / GemFire OQL 不支持删除。您将不得不遍历结果集并删除 'manually'.

在 gfsh 提示符下,您可以使用如下删除命令

removing specific keys
data with String keys
remove --region=RegionName --key=abc

具有其他对象键的数据必须使用 key-class 以及以下

remove --region=RegionName --key-class=java.lang.Integer --key=1

对于大量要删除的记录,我做的是使用Select查询搜索要删除的键列表。然后构建上述删除命令的脚本并将它们 运行 放在一起。

用于删除所有区域数据

remove --region=RegionName --all

否则你需要一个Java程序来使用Gemfire删除区域数据API

我们使用 GemFire,最后我写了一个函数来清除整个区域。这比一个简单的客户端循环一次删除一个条目要快得多,因为函数是分布式的,每个节点只清除该节点本地的条目。

并且函数可以从 gfsh 执行,所以非常容易使用。

如果有用的话,我可以分享这个函数的源代码吗?

这是我编写的一个 GemFire 函数,可以清除一个区域。如果您去掉我们从未使用过的多余功能和额外功能,这段代码可能会短很多。同样事后看来,我不需要 "synchronize" 清除区域列表,因为两个人在同一微秒内调用该函数的几率几乎为零。

我们在 GemFire 7 和 GemFire 8 集群中都使用它。将其放入 jar 并安装后,您可以从 gfsh 调用此函数来清除区域。

import java.util.ArrayList;   
import java.util.Iterator;
import java.util.List;  
import java.util.Properties;    
import java.util.Set;
import com.gemstone.gemfire.LogWriter;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.Declarable;
import com.gemstone.gemfire.cache.EntryNotFoundException;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.execute.Function;
import com.gemstone.gemfire.cache.execute.FunctionContext;
import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
import com.gemstone.gemfire.cache.partition.PartitionRegionHelper;
import com.gemstone.gemfire.distributed.DistributedSystem;

public class ClearRegionFunction implements Function, Declarable {
    private static final long serialVersionUID = 1L;
    private static LogWriter log;
    private static List<String> clearingRegionList = new ArrayList<String>();

    static {    
        DistributedSystem ds = CacheFactory.getAnyInstance().getDistributedSystem();
        log = ds.getLogWriter();   
    }

    @Override   
    public void execute(FunctionContext fc) {   
        RegionFunctionContext rfc = (RegionFunctionContext) fc;  
        Region region = rfc.getDataSet();    
        String regionName = region.getName();           

        //If passed a flag of "true", that says to simulate the clear, but don't actually clear.
        //This is used to test if a clear is already in progress, in which case we'd return false.

        Boolean simulate = (Boolean)rfc.getArguments();    
        log.fine("Argument passed = " + simulate);  
        if (simulate == null) simulate = false;       

        if (simulate) {    
            rfc.getResultSender().lastResult( ! clearingRegionList.contains(regionName));    
            return;   
        }

        log.warning("Clearing region: " + regionName); // Used "warning" because clearing a region is serious.   
        try {          
            // Protect against the same region being cleared twice at the same time.     
            synchronized (clearingRegionList) {   
                if (clearingRegionList.contains(regionName)) {    
                    log.error("Clear of region " + regionName + " is already in progress.  Aborting.");    
                    // Let the client know we ignored their "clear" request.   
                    rfc.getResultSender().lastResult(false);    
                    return;    
                }    
                clearingRegionList.add(regionName);    
            }       

            if (!PartitionRegionHelper.isPartitionedRegion(region)) {    
                region.clear();    
                rfc.getResultSender().lastResult(true);    
            } else {
                // We are going to clear the region in a partitioned manner, each node only clearing    
                // the data in it's own node. So we need to get the "local" region for the node.    
                Region localRegion = PartitionRegionHelper.getLocalDataForContext(rfc);

                // Beware, this keySet() is a reference to the actual LIVE key set in memory. So    
                // we need to clone the set of keys we want to delete, otherwise we'll be looping    
                // through a live list and potentially deleting items that were added after the    
                // delete started.     
                List keyList = new ArrayList(localRegion.keySet());        

                // Once we have the keys, go ahead and set the lastResult to "true" to    
                // unblock the caller, because this could take a while. (The caller doesn't actually    
                // unblock until ALL nodes have returned "true".)    
                rfc.getResultSender().lastResult(true);           
                int regionSize = keyList.size();    
                log.info("Region " + regionName + " has " + regionSize + " entries to clear.");

                int count = 0;            
                for (Object key : keyList) {                                
                    //The "remove" method returns the object removed.  This is bad because it (sometimes?) causes    
                    //GemFire to try and deserialize the object, and that fails because we don't have the class on    
                    //our server classpath.  But if we invalidate first, it destroys the entry object without     
                    //deserializing it.  Then "remove" cleans up the key.   
                    try {    
                        localRegion.invalidate(key);    
                        localRegion.remove(key);    
                    } catch (EntryNotFoundException enfe) { //If the entry has disappeared (or expired) by the time we try to remove it,    
                                                            //then the GemFire API will throw an exception.  But this is okay.    
                        log.warning("Entry not found for key = " + key.toString(), enfe);    
                    }    
                    count++;    
                    // Every 10000 is frequent enough to give you a quick pulse, but    
                    // not so frequent as to spam your log.    
                    if (count % 10000 == 0) {    
                        log.info("Cleared " + count + "/" + regionSize + " entries for region " + regionName);    
                    }    
                }    
            }

            log.warning("Region cleared: " + regionName);            
            synchronized (clearingRegionList) {    
                clearingRegionList.remove(regionName);   
            }    
        } catch (RuntimeException rex) {    
            // Make sure we clean up our tracking list even in the unlikely event of a blowup.    
            clearingRegionList.remove(regionName);    
            log.error(rex.toString(), rex); // Log AND throw is bad, but from my experience, a RunTimeException    
                                            // CAN get sent all the way back to the client and never show   
                                            // up in gemfire.log. (If the exception happens before last result)    
            throw rex;    
        }             
    }

    @Override    
    public String getId() {    
        return "clear-region-function";    
    }        

    @Override    
    public void init(Properties arg0) { }

    @Override
    public boolean hasResult() { return true; }        

    @Override    
    public boolean isHA() { return true; }

    @Override    
    public boolean optimizeForWrite() {return true;}

}

您需要使用 GemFire 功能来清除区域,它是从 gemfire 区域中删除记录的非常快速和优化的方法,请在以下 github repo

中找到详细代码

POM:

       <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>GemFireRemoveAllDataFunction</groupId>
        <artifactId>GemFireRemoveAllDataFunction</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <build>
            <sourceDirectory>src</sourceDirectory>
            <plugins>
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.7.0</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
            </plugins>
        </build>

    <dependencies>
        <dependency>
            <groupId>io.pivotal.gemfire</groupId>
            <artifactId>geode-core</artifactId>
            <version>9.6.0</version>
        </dependency>
    </dependencies>

</project>

函数:

package com.khan.viquar.gemfire;

import java.util.ArrayList;
import java.util.List;

import org.apache.geode.cache.Declarable;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionContext;
import org.apache.geode.cache.execute.RegionFunctionContext;
import org.apache.geode.cache.partition.PartitionRegionHelper;


@SuppressWarnings("rawtypes")
public class ClearRegionRemoveAllDataFunction implements Function, Declarable {

    private static final long serialVersionUID = 11L;

    private static final int batchSize = 30000;

    @SuppressWarnings("unchecked")
    public void execute(final FunctionContext ctx) {
        if (ctx instanceof RegionFunctionContext) {
            final RegionFunctionContext rfc = (RegionFunctionContext) ctx;
            try {
                final Region<Object, Object> region = rfc.getDataSet();
                if (PartitionRegionHelper.isPartitionedRegion(region)) {
                    clear(PartitionRegionHelper.getLocalDataForContext(rfc));
                } else {
                    clear(region);
                }
                ctx.getResultSender().lastResult("Success");
            } catch (final Throwable t) {
                rfc.getResultSender().sendException(t);
            }
        } else {
            ctx.getResultSender().lastResult("ERROR: The function must be executed on region!");
        }
    }

    private void clear(final Region<Object, Object> localRegion) {
        int numLocalEntries = localRegion.keySet().size();
        if (numLocalEntries <= batchSize) {
            localRegion.removeAll(localRegion.keySet());
        } else {
            final List<Object> buffer = new ArrayList<Object>(batchSize);
            int count = 0;
            for (final Object k : localRegion.keySet()) {
                buffer.add(k);
                count++;
                if (count == batchSize) {
                    localRegion.removeAll(buffer);
                    buffer.clear();
                    count = 0;
                } else {
                    continue;
                }
            }
            localRegion.removeAll(buffer);
        }
    }

    public boolean hasResult() {
        return true;
    }

    public String getId() {
        return ClearRegionRemoveAllFunction.class.getSimpleName();
    }

    public boolean optimizeForWrite() {
        return true;
    }

    public boolean isHA() {
        return true;
    }
}