Native Client - 执行连续查询时出现序列化异常

Native Client - Serialization Exception when executing Continuous Query

我正在尝试使用 Apache Geode 设置一个简单的 Java <-> #C/.NET 概念证明,特别是使用 .NET 本机客户端测试连续查询功能。在 .NET 中使用常规查询效果很好,只有连续查询有问题。当我在连续查询对象上调用 Execute() 方法时,我 运行 遇到了我的问题。我得到的具体错误是

处​​理响应时收到未处理的消息类型 26,可能序列化不匹配

我只在缓存区域中存储简单的字符串,所以我有点惊讶我遇到了序列化问题。我已经尝试在两侧启用 PDX 序列化(并且 运行ning 没有它),它似乎没有什么不同。有什么想法吗?

这是我的双方代码:

Java

启动服务器,放入一些数据,然后不断更新给定的缓存条目。

public class GeodePoc {

    public static void main(String[] args) throws Exception {

        ServerLauncher serverLauncher = new ServerLauncher.Builder().setMemberName("server1")
                .setServerBindAddress("localhost").setServerPort(10334).set("start-locator", "localhost[20341]")
                .set(ConfigurationProperties.LOG_LEVEL, "trace")
                .setPdxReadSerialized(true)
                .set(ConfigurationProperties.CACHE_XML_FILE, "cache.xml").build();

        serverLauncher.start();
        Cache c = CacheFactory.getAnyInstance();
        Region<String, String> r = c.getRegion("example_region");
        r.put("test1", "value1");
        r.put("test2", "value2");

        System.out.println("Cache server successfully started");

        int i = 0;
        while (true) {
            r.put("test1", "value" + i);
            System.out.println(r.get("test1"));
            Thread.sleep(3000);
            i++;
        }

    }

}

服务器cache.xml

<?xml version="1.0" encoding="UTF-8"?>
<cache xmlns="http://geode.apache.org/schema/cache" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://geode.apache.org/schema/cache http://geode.apache.org/schema/cache/cache-1.0.xsd"
    version="1.0">

    <cache-server bind-address="localhost" port="40404"
        max-connections="100" />
    <pdx>
        <pdx-serializer>
            <class-name>org.apache.geode.pdx.ReflectionBasedAutoSerializer</class-name>
            <parameter name="classes">
                <string>java.lang.String</string>
            </parameter>
        </pdx-serializer>
    </pdx>

    <region name="example_region">
        <region-attributes refid="REPLICATE"  />
    </region>


</cache>

.NET 客户端

public static void GeodeTest()
        {
            Properties<string, string> props = Properties<string, string>.Create();
            props.Insert("cache-xml-file", "<path-to-cache.xml>");

            CacheFactory cacheFactory = new CacheFactory(props)
                .SetPdxReadSerialized(true).SetPdxIgnoreUnreadFields(true)
                .Set("log-level", "info");
            Cache cache = cacheFactory.Create();
            cache.TypeRegistry.PdxSerializer = new ReflectionBasedAutoSerializer();

            IRegion<string, string> region = cache.GetRegion<string, string>("example_region");

            Console.WriteLine(region.Get("test2", null));
            PoolManager pManager = cache.GetPoolManager();
            Pool pool = pManager.Find("serverPool");

            QueryService qs = pool.GetQueryService();

            // Regular query example (works)
            Query<string> q = qs.NewQuery<string>("select * from /example_region");
            ISelectResults<string> results = q.Execute();
            Console.WriteLine("Finished query");
            foreach (string result in results)
            {
                Console.WriteLine(result);
            }

            // Continuous Query (does not work)
            CqAttributesFactory<string, object> cqAttribsFactory = new CqAttributesFactory<string, object>();
            ICqListener<string, object> listener = new CacheListener<string, object>();
            cqAttribsFactory.InitCqListeners(new ICqListener<string, object>[] { listener });
            cqAttribsFactory.AddCqListener(listener);
            CqAttributes<string, object> cqAttribs = cqAttribsFactory.Create();

            CqQuery<string, object> cquery = qs.NewCq<string, object>("select * from /example_region", cqAttribs, false);
            Console.WriteLine(cquery.GetState());
            Console.WriteLine(cquery.QueryString);

            Console.WriteLine(">>> Cache query example started.");
            cquery.Execute();

            Console.WriteLine();
            Console.WriteLine(">>> Example finished, press any key to exit ...");
            Console.ReadKey();
        }

.NET 缓存侦听器

public class CacheListener<TKey, TResult> : ICqListener<TKey, TResult>
    {
        public virtual void OnEvent(CqEvent<TKey, TResult> ev)
        {
            object val = ev.getNewValue() as object;
            TKey key = ev.getKey();
            CqOperation opType = ev.getQueryOperation();
            string opStr = "DESTROY";
            if (opType == CqOperation.OP_TYPE_CREATE)
                opStr = "CREATE";
            else if (opType == CqOperation.OP_TYPE_UPDATE)
                opStr = "UPDATE";
            Console.WriteLine("MyCqListener::OnEvent called with key {0}, op {1}.", key, opStr);
        }
        public virtual void OnError(CqEvent<TKey, TResult> ev)
        {
            Console.WriteLine("MyCqListener::OnError called");
        }
        public virtual void Close()
        {
            Console.WriteLine("MyCqListener::close called");
        }
    }

.NET 客户端 cache.xml

<client-cache
    xmlns="http://geode.apache.org/schema/cache"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://geode.apache.org/schema/cache http://geode.apache.org/schema/cache/cache-1.0.xsd"
    version="1.0">

  <pool name="serverPool" subscription-enabled="true">
    <locator host="localhost" port="20341"/>
  </pool>

  <region name="example_region">
    <region-attributes refid="CACHING_PROXY" pool-name="serverPool" />
  </region>
</client-cache>

这最终只是我的一个简单疏忽。为了使连续查询正常运行,您必须在 Java 端包含 geode-cq 依赖项。我没有这样做,这导致了异常。