如何在 ignite-messaging lambda 函数的 remotelisten 中调用非静态函数?

How to call non-static function in remotelisten of ignite-messaging lambda function?

我有一个 class (TopicListenerImp) 用于侦听主题,我想获取来自 remoteListen 函数的消息并将其作为参数提供给其他服务 class 中的另一个非静态函数。

     @Component
    public class TopicListenerImp implements TopicListener {


      private NotificationService notificationService;
      private SubscriptionRepository subscriptionRepository;
      private SubscriptionRules subscriptionRules;
      private NFInstancesService nfInstancesService;
      private Ignite ignite;

      public TopicListenerImp(
          SubscriptionRules subscriptionRules,
          NotificationService notificationService,
          SubscriptionRepository subscriptionRepository,
          Ignite ignite,
          NFInstancesService nfInstancesService) {
        this.subscriptionRules = subscriptionRules;
        this.notificationService = notificationService;
        this.subscriptionRepository = subscriptionRepository;
        this.nfInstancesService = nfInstancesService;
        this.ignite = ignite;
      }

      @Bean
      public void startTopicListening() {

        IgniteMessaging rmtMsg = ignite.message(ignite.cluster().forLocal());
        rmtMsg.remoteListen(
            "SUSPEND",
            (nodeId, msg) -> {

              notifyIfSubscriptionExist((String) msg); //here where I used the message that comes from topic
              return true; 
            });
      }

      public void notifyIfSubscriptionExist(String msg) {

        List<String> nfInstanceIdSubscriptionId = parseNfInstanceIdSubscriptionId(msg);
        Optional<NFProfile> nfProfile =
            nfInstancesService.getNFInstance(nfInstanceIdSubscriptionId.get(0));
        Optional<SubscriptionData> subscriptionDataOptional =
            subscriptionRepository.getSubscriptionData(nfInstanceIdSubscriptionId.get(1));

        subscriptionDataOptional.ifPresent(
            subscriptionData -> {
              if (subscriptionRules.checkRules(subscriptionData) && nfProfile.isPresent())
                notificationService.sendEventNotification(
                    subscriptionData, nfProfile.get(), NF_PROFILE_CHANGED);
            });
      }

      private List<String> parseNfInstanceIdSubscriptionId(String msg) {
        List<String> values = asList(msg.split(", "));
        return asList(getIdFromMessage(values.get(0)), getIdFromMessage(values.get(1)));
      }

      private String getIdFromMessage(String msg) {
        return msg.substring(msg.indexOf('[') + 1, msg.length() - 1);
      }
    }

但是我得到以下错误;

    Caused by: org.apache.ignite.binary.BinaryObjectException: Failed to serialize object etc.

当我静态注入字段时,它起作用了。但是当我这样做时,IDE 发出警告 "don't update the static variable from the constructor method".

remoteListen 采用谓词,该谓词将被序列化并通过网络发送到远程节点。当您在其实现中调用非静态方法时,它会使整个 this 对象被序列化。它可能会导致意外行为和通过网络发送大量数据。

一般来说,不建议通过网络发送 lambda 函数,因为它们的序列化依赖于 VM,并且您无法控制实际发送的内容。如果你想通过网络发送谓词,那么创建一个 class 实现谓词接口并使用这个 class.

的实例

如果不需要集群所有节点都订阅该主题,那么localListen就够了。