在不启用 p2p class 加载的情况下让 apache ignite 连续查询工作

getting apache ignite continuous query to work without enabling p2p class loading

我一直在尝试让我的 ignite 连续查询代码工作 ,而无需 将对等 class 加载设置为启用。但是我发现代码没有 work.I 尝试调试并意识到对 cache.query(qry) 的调用出错并显示消息 "Failed to marshal custom event" 错误。当我启用 peer class loading 时,代码按预期工作。有人可以指导我如何在没有 peer class 加载的情况下完成这项工作吗? 以下是调用连续查询的代码片段。

   public  void subscribeEvent(IgniteCache<String,String> cache,String inKeyStr,ServerWebSocket websocket ){
    System.out.println("in thread "+Thread.currentThread().getId()+"-->"+"subscribe event");
     //ArrayList<String> inKeys = new ArrayList<String>(Arrays.asList(inKeyStr.split(",")));

     ContinuousQuery<String, String> qry = new ContinuousQuery<>();

    /****
     * Continuous Query Impl
     */

    inKeys = ","+inKeyStr+",";
    qry.setInitialQuery(new ScanQuery<String, String>((k, v) -> inKeys.contains(","+k+",")));
    qry.setTimeInterval(1000);
    qry.setPageSize(1);

    // Callback that is called locally when update notifications are received.
  //  Factory<CacheEntryEventFilter<String, String>> rmtFilterFactory = new com.ccx.ignite.cqfilter.FilterFactory().init(inKeyStr);      

    qry.setLocalListener(new CacheEntryUpdatedListener<String, String>() {

          @Override public void onUpdated(Iterable<CacheEntryEvent<? extends String, ? extends String>> evts) {
            for (CacheEntryEvent<? extends String, ? extends String> e : evts)
            {

                          System.out.println("websocket locallsnr data in thread "+Thread.currentThread().getId()+"-->"+"key=" + e.getKey() + ", val=" + e.getValue());
                          try{
                          websocket.writeTextMessage("key=" + e.getKey() + ", val=" + e.getValue());
                          }
                          catch (Exception e1){
                              System.out.println("exception local listener "+e1.getMessage());
                              qry.setLocalListener(null) ;                }

            }

          }
        } );

   qry.setRemoteFilterFactory( new com.ccx.ignite.cqfilter.FilterFactory().init(inKeys));

    try{        

          cur = cache.query(qry);
          for (Cache.Entry<String, String> e : cur)
         { 
           System.out.println("websocket initialqry data in thread "+Thread.currentThread().getId()+"-->"+"key=" + e.getKey() + ", val=" + e.getValue());
           websocket.writeTextMessage("key=" + e.getKey() + ", val=" + e.getValue());
         }
    }
    catch (Exception e){
          System.out.println("exception cache.query "+e.getMessage());

          }        
    }

以下是远程过滤器class,我已经将其制作成一个独立的jar 并推送到ignite 的libs 文件夹中,以便服务器节点可以获取它

public class FilterFactory
{

public Factory<CacheEntryEventFilter<String, String>> init(String inKeyStr ){
System.out.println("factory init called jun22 ");

return new Factory <CacheEntryEventFilter<String, String>>() {
    private static final long serialVersionUID = 5906783589263492617L;

        @Override public CacheEntryEventFilter<String, String> create() {
            return new CacheEntryEventFilter<String, String>() {
                @Override  public boolean evaluate(CacheEntryEvent<? extends String, ? extends String> e) {
                    //List inKeys = new ArrayList<String>(Arrays.asList(inKeyStr.split(",")));
                    System.out.println("inside remote filter factory ");
                    String inKeys = ","+inKeyStr+",";

                    return inKeys.contains(","+e.getKey()+",");
                }
            };
        }
    };
}
}

我试图实现的总体逻辑是让 websocket 客户端通过指定缓存名称和感兴趣的键来订阅事件。 调用订阅事件代码,它创建连续查询并为感兴趣的键上的任何更新事件注册本地侦听器回调。 远程过滤器应该根据作为字符串传递给它的键来过滤更新事件,如果过滤器事件成功,则调用本地侦听器。本地侦听器将更新的键值写入传递给订阅事件代码的 Web 套接字引用。

我使用的ignite版本是1.8.0。然而,行为在 2.0 中也是相同的。 非常感谢任何帮助!

这是包含相关错误的日志片段

factory init called jun22 exception cache.query class org.apache.ignite.spi.IgniteSpiException: Failed to marshal custom event: StartRoutineDiscoveryMessage [startReqData=StartRequestData [prjPred=org.apache.ignite.configuration.CacheConfiguration$IgniteAllNodesPredicate@269707de, clsName=null, depInfo=null, hnd=CacheContinuousQueryHandlerV2 [rmtFilterFactory=com.ccx.ignite.cqfilter.FilterFactory@5dc301ed, rmtFilterFactoryDep=null, types=0], bufSize=1, interval=1000, autoUnsubscribe=true], keepBinary=false, routineId=b40ada9f-552d-41eb-90b5-3384526eb7b9]

您需要在拓扑中的所有节点上显式部署 CQ 类(特别是远程过滤器)。只需用它们创建一个 JAR 文件并在启动节点之前放入 libs 文件夹。

您从 FilterFactory 返回一个匿名 class 的实例,该实例又引用不可序列化的封闭 FilterFactory。 只需将返回的基于 class 的匿名 CacheEntryEventFilter 替换为相应的嵌套静态 class.