更新 Pivotal GemFire 中的单个列

Updating individual column in Pivotal GemFire

据我所知,在 gemfire 中没有使用查询更新单个列的选项。要更新单个列,我目前正在获取整个旧对象并修改更改后的值并存储它。如果有人在更新各个列方面实施了任何措施,请分享。

@Region("tracking")
public class Tracking implements Serializable {
public String id;
public String status;
public String program;
}



@Region("tracking")
public interface TrackingQueryRepository extends CrudRepository<Tracking, String> {
}

我是 Delta 传播实施的新手。我已阅读用户指南并尝试实施并收到下面给出的异常。你能分享一下你的想法吗 这个。

Another.java – 域名class

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
import org.springframework.data.annotation.Id;
import org.springframework.data.gemfire.mapping.Region;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.gemstone.gemfire.Delta;
import com.gemstone.gemfire.InvalidDeltaException;


@Region("delta")
public class Another implements Delta, Serializable {

    private static final long serialVersionUID = 1L;

    @Id
    private String anotherId;

    @JsonProperty("anotherProgramId")
    private String anotherProgramId;

    public Another() {
    }

    public Another(String anotherId, String anotherProgramId) {
        this.anotherId = anotherId;
        this.anotherProgramId = anotherProgramId;
    }

    public String getAnotherId() {
        return anotherId;
    }

    public void setAnotherId(String anotherId) {
        this.anotherIdChd = true;
        this.anotherId = anotherId;
    }

    public String getAnotherProgramId() {
        return anotherProgramId;
    }

    public void setAnotherProgramId(String anotherProgramId) {
        this.anotherProgramIdChd = true;
        this.anotherProgramId = anotherProgramId;
    }

    private transient boolean anotherIdChd = false;
    private transient boolean anotherProgramIdChd = false;

    @Override
    public String toString() {
        return "Another [anotherId=" + anotherId + ", anotherProgramId=" + anotherProgramId + "]";
    }

    @Override
    public void fromDelta(DataInput in) throws IOException, InvalidDeltaException {

        if (in.readBoolean()) {
            // Read the change and apply it to the object
            this.anotherId = in.toString();
            System.out.println(" Applied delta to field 'anotherId' = " + this.anotherId);
        }
        if (in.readBoolean()) {
            this.anotherProgramId = in.toString();
            System.out.println(" Applied delta to field 'anotherProgramId' = " + this.anotherProgramId);
        }
    }

    @Override
    public boolean hasDelta() {
        return this.anotherIdChd || this.anotherProgramIdChd;

    }

    @Override
    public void toDelta(DataOutput out) throws IOException {
        System.out.println("Extracting delta from " + this.toString());
        out.writeBoolean(anotherIdChd);
        if (anotherIdChd) {
            // Write just the changes into the data stream

            out.writeUTF(this.anotherId);
            // Once the delta information is written, reset the delta status
            // field
            this.anotherIdChd = false;
            System.out.println(" Extracted delta from field 'anotherId' = " + this.anotherId);
        }
        out.writeBoolean(anotherProgramIdChd);
        if (anotherProgramIdChd) {
            out.writeUTF(this.anotherProgramId);
            this.anotherProgramIdChd = false;
            System.out.println(" Extracted delta from field 'anotherProgramId' = " + this.anotherProgramId);
        }

    }

}

客户端-cache.xml

<pdx>
        <pdx-serializer>
            <class-name>com.gemstone.gemfire.pdx.ReflectionBasedAutoSerializer</class-name>
            <parameter name="classes">
                <string>com\.rs\.main\..+</string>
            </parameter>
        </pdx-serializer>
    </pdx>

Spring XML 命名空间

<util:properties id="gemfire-props">
<prop key="delta-propagation">true</prop>
</util:properties>
<gfe:client-cache pool-name="serverPool" cache-xml-location="classpath:client-cache.xml" properties-ref="gemfire-props"/>
<gfe:client-region id="delta" pool-name="serverPool" shortcut="PROXY" cloning-enabled="true">

本地 gemfire 实例版本 - pivotal-gemfire-9.0.1

区域创建 创建区域 –name=delta –type=REPLICATE

异常:

2017-05-08 22:17:12.370 ERROR 14696 --- [nio-8080-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.dao.DataAccessResourceFailureException: remote server on 10.148.210.249(:loner):53784:e10627eb: com.gemstone.gemfire.pdx.PdxSerializationException: Could not create an instance of a class com.rs.main.Another; nested exception is com.gemstone.gemfire.cache.client.ServerOperationException: remote server on 10.148.210.249(:loner):53784:e10627eb: com.gemstone.gemfire.pdx.PdxSerializationException: Could not create an instance of a class com.rs.main.Another] with root cause

java.lang.ClassNotFoundException: com.rs.main.Another
    at org.apache.geode.internal.ClassPathLoader.forName(ClassPathLoader.java:437) ~[na:na]
    at org.apache.geode.internal.InternalDataSerializer.getCachedClass(InternalDataSerializer.java:4010) ~[na:na]
    at org.apache.geode.pdx.internal.PdxType.getPdxClass(PdxType.java:235) ~[na:na]
    at org.apache.geode.pdx.internal.PdxReaderImpl.basicGetObject(PdxReaderImpl.java:687) ~[na:na]
    at org.apache.geode.pdx.internal.PdxReaderImpl.getObject(PdxReaderImpl.java:682) ~[na:na]
    at org.apache.geode.internal.InternalDataSerializer.readPdxSerializable(InternalDataSerializer.java:3218) ~[na:na]
    at org.apache.geode.internal.InternalDataSerializer.basicReadObject(InternalDataSerializer.java:3005) ~[na:na]
    at org.apache.geode.DataSerializer.readObject(DataSerializer.java:2897) ~[na:na]
    at org.apache.geode.internal.util.BlobHelper.deserializeBlob(BlobHelper.java:90) ~[na:na]
    at org.apache.geode.internal.cache.EntryEventImpl.deserialize(EntryEventImpl.java:1891) ~[na:na]
    at org.apache.geode.internal.cache.EntryEventImpl.deserialize(EntryEventImpl.java:1884) ~[na:na]
    at org.apache.geode.internal.cache.VMCachedDeserializable.getDeserializedValue(VMCachedDeserializable.java:134) ~[na:na]
    at org.apache.geode.internal.cache.EntryEventImpl.processDeltaBytes(EntryEventImpl.java:1687) ~[na:na]
    at org.apache.geode.internal.cache.EntryEventImpl.setNewValueInRegion(EntryEventImpl.java:1558) ~[na:na]
    at org.apache.geode.internal.cache.EntryEventImpl.putExistingEntry(EntryEventImpl.java:1504) ~[na:na]
    at org.apache.geode.internal.cache.AbstractRegionMap.updateEntry(AbstractRegionMap.java:2959) ~[na:na]
    at org.apache.geode.internal.cache.AbstractRegionMap.basicPut(AbstractRegionMap.java:2782) ~[na:na]
    at org.apache.geode.internal.cache.LocalRegion.virtualPut(LocalRegion.java:5750) ~[na:na]
    at org.apache.geode.internal.cache.DistributedRegion.virtualPut(DistributedRegion.java:337) ~[na:na]
    at org.apache.geode.internal.cache.LocalRegionDataView.putEntry(LocalRegionDataView.java:151) ~[na:na]
    at org.apache.geode.internal.cache.LocalRegion.basicUpdate(LocalRegion.java:5730) ~[na:na]
    at org.apache.geode.internal.cache.LocalRegion.basicBridgePut(LocalRegion.java:5374) ~[na:na]
    at org.apache.geode.internal.cache.tier.sockets.command.Put65.cmdExecute(Put65.java:381) ~[na:na]
    at org.apache.geode.internal.cache.tier.sockets.BaseCommand.execute(BaseCommand.java:141) ~[na:na]
    at org.apache.geode.internal.cache.tier.sockets.ServerConnection.doNormalMsg(ServerConnection.java:776) ~[na:na]
    at org.apache.geode.internal.cache.tier.sockets.ServerConnection.doOneMessage(ServerConnection.java:904) ~[na:na]
    at org.apache.geode.internal.cache.tier.sockets.ServerConnection.run(ServerConnection.java:1160) ~[na:na]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_121]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_121]
    at org.apache.geode.internal.cache.tier.sockets.AcceptorImpl.run(AcceptorImpl.java:519) ~[na:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]

您不能使用 Query 服务更新列。我建议您考虑使用 Function 服务来实现事务一致性。使区域分区并使用 .onRegion().withFilter(key).withArgs(columnsAndValuesMap) 调用函数。

您的函数将读取对象、应用更新并放置。

通过这种方式,您的读取和更新将在服务器上的单个线程中进行,确保事务的一致性,而不是在客户端读取对象、更改值、执行放置并希望没有其他人在下面溜走你。

嗨(再次)Vigneshwaran-

对,所以 GemFire 的 Query capabilities (via the QueryService) is strictly for running queries (i.e. SELECT statements). There is no equivalent in GemFire OQL for UPDATES and DELETES. GemFire is a Key/Value store with Map-like operations (e.g. get(key), put(key, value), etc), where you are generally working with the entire application domain object. However, there are a few features of GemFire that can help you regardless whether your application is a peer cache (i.e. member of the cluster) or cache client. Typically, applications are cache clients and have/use a ClientCache 集群是独立的,客户端连接到集群很像 RDBMS。

我还要说,虽然函数服务很有用,但它不是唯一的选择,实际上在代码方面可能会产生更多开销。

正如 Wes 上面提到的,使用分区区域是非常典型的,特别是对于 "transactional" 数据(注意:REPLICATE 区域更适用于不经常更改的参考数据)。

"Function" 可以帮助您的地方是,您可以使用一种有针对性的方法(例如 [onRegion("tracking")][7])对 Function to take the update to the application domain object. The "update" could be passed in the Function's "arguments". To invoke a Function, you use the GemFire's FunctionService to get an Execution 进行编码。

NOTE: the other targeting methods (namely, onMember(s) and onServer(s)) are specific to whether your application is a "peer" or "client", respectively. For instance, you cannot call onMember(s) if your application is a client as it assumes your application is a "peer". Likewise, you cannot call onServer(s) if your application is a peer as it assumes your application is a "client". onRegion(..) works whether the application is a peer or a client. While you might think why not use onRegion all the time, there are technical advantages to using the other forms of targeting depending on your UC (e.g. think server groups and routing). Anyway...

当Region是一个PARTITION时,你也可以设置Function的[optimizeForWrite()][8],这意味着Function将更新Region数据,因此,将被路由到PARTITION的key的主桶,当密钥指定为 using the filtering 选项,如上文 Wes 所述。

分区区域的一致性来自这样一个事实,即所有更新首先被路由并写入 "primary"(无论哪个服务器接收到客户端的更新,这可能是一个甚至不托管该区域的服务器或有问题的 data/key;即不同的分片)。更新主节点后,数据更改将传播(分发)到集群中托管 partition/sharded 数据集的辅助节点的其他节点。这就是 Wes 上面提到的 "transactional" 一致性。

NOTE: PARTITION is just another word for SHARDING the data, where the data is spread evenly across the cluster of available nodes. When nodes are added/removed, the data is rebalanced. A PARTITION can also have redundancy. These are referred to as secondaries. PARTITION Regions help with latency and throughput since data is divided (into 113 Buckets by default), where each bucket has primary and maybe 1 or more copies (secondaries, for redundancy; HA) thereby improving both read and write throughput.

另外,如果数据一定要粘,那你也可以设置Function的HA属性。这将允许在失败的情况下重试。

然而,尽管有所有这些优点,您仍然需要处理 "how-to" 在服务器上的 Function 中更新您的应用程序域对象。您还必须处理 "mapping",因为在像 GemFire 这样的 Key/Value 商店中确实没有等同于 ORM 的东西。当然,这并不难,但也许有更好的方法。

还有一个功能,叫做Delta Propagation。本质上,无论何时进行更新,您总是在 GemFire 中获取和更新完整值。

NOTE: it is possible to query select fields of an object, in sort of a projection like fashion, but it is not a proxy or related to the actual object in anyway.

当您利用 GemFire 的 Serialization capabilities, you can leverage Delta Propagation.

在实施 "Deltas" 时,只有应用程序域对象中的差异实际上被序列化,通过网络发送,无论是在客户端和服务器之间,还是在支持冗余策略的对等点之间。这对您来说是完全无缝的。你得到你的对象(客户端),更新它,然后放置它。 GemFire 为您处理发送 "delta" 的逻辑。

此外,当使用 client/server topology and PARTITION Regions on the servers in the cluster, you can enable Single-Hop access 时,它有效地将数据路由到包含 "primary" 存储桶的服务器,从而避免了额外的网络跃点,这将影响您对每个操作的感知延迟。

因此,在增量和单跳之间,您最终得到了一个非常高效的解决方案,并且仍然可以利用面向对象的方法,使用您期望的应用程序域对象 API。

但是请注意 pitfalls of using Deltas

总之,深思。您通常总是有不止一种方法来完成一项任务,但更好的方法并不总是显而易见的,直到您根据您的 UC/goal.

衡量和评估所需的效果。

干杯, 约翰

我们实现相同目标的另一种方法是使用自定义函数,该函数在获取分布式锁后更新值 (BeanUtils)。

这可能会增加性能开销,但可以保证数据完整性。这就是权衡。

见下面一段伪代码

 try{

 //this can be regionName 
 dls = DistributedLockService.getServiceNamed(arbitrary-lock-name) 
 //the key is normally the object @Id
 dls.lock(some-key, waitTimeOut, leaseTimeOut)

 row = region.get(id)
 //Here we copy the desired value (input to function) to the latest value
 BeanUtils.copyProperty(row, key, value);      
 //Insert the modified record to Gemfire - now this becomes equivalent of update <region> set value =  for a specific property.
 region.put(id, row) 

 } finally{

 dls.unlock(some-key);

 }