如何在 ignite-messaging lambda 函数的 remotelisten 中调用非静态函数?
How to call non-static function in remotelisten of ignite-messaging lambda function?
我有一个 class (TopicListenerImp) 用于侦听主题,我想获取来自 remoteListen 函数的消息并将其作为参数提供给其他服务 class 中的另一个非静态函数。
@Component
public class TopicListenerImp implements TopicListener {
private NotificationService notificationService;
private SubscriptionRepository subscriptionRepository;
private SubscriptionRules subscriptionRules;
private NFInstancesService nfInstancesService;
private Ignite ignite;
public TopicListenerImp(
SubscriptionRules subscriptionRules,
NotificationService notificationService,
SubscriptionRepository subscriptionRepository,
Ignite ignite,
NFInstancesService nfInstancesService) {
this.subscriptionRules = subscriptionRules;
this.notificationService = notificationService;
this.subscriptionRepository = subscriptionRepository;
this.nfInstancesService = nfInstancesService;
this.ignite = ignite;
}
@Bean
public void startTopicListening() {
IgniteMessaging rmtMsg = ignite.message(ignite.cluster().forLocal());
rmtMsg.remoteListen(
"SUSPEND",
(nodeId, msg) -> {
notifyIfSubscriptionExist((String) msg); //here where I used the message that comes from topic
return true;
});
}
public void notifyIfSubscriptionExist(String msg) {
List<String> nfInstanceIdSubscriptionId = parseNfInstanceIdSubscriptionId(msg);
Optional<NFProfile> nfProfile =
nfInstancesService.getNFInstance(nfInstanceIdSubscriptionId.get(0));
Optional<SubscriptionData> subscriptionDataOptional =
subscriptionRepository.getSubscriptionData(nfInstanceIdSubscriptionId.get(1));
subscriptionDataOptional.ifPresent(
subscriptionData -> {
if (subscriptionRules.checkRules(subscriptionData) && nfProfile.isPresent())
notificationService.sendEventNotification(
subscriptionData, nfProfile.get(), NF_PROFILE_CHANGED);
});
}
private List<String> parseNfInstanceIdSubscriptionId(String msg) {
List<String> values = asList(msg.split(", "));
return asList(getIdFromMessage(values.get(0)), getIdFromMessage(values.get(1)));
}
private String getIdFromMessage(String msg) {
return msg.substring(msg.indexOf('[') + 1, msg.length() - 1);
}
}
但是我得到以下错误;
Caused by: org.apache.ignite.binary.BinaryObjectException: Failed to serialize object etc.
当我静态注入字段时,它起作用了。但是当我这样做时,IDE 发出警告 "don't update the static variable from the constructor method".
remoteListen 采用谓词,该谓词将被序列化并通过网络发送到远程节点。当您在其实现中调用非静态方法时,它会使整个 this
对象被序列化。它可能会导致意外行为和通过网络发送大量数据。
一般来说,不建议通过网络发送 lambda 函数,因为它们的序列化依赖于 VM,并且您无法控制实际发送的内容。如果你想通过网络发送谓词,那么创建一个 class 实现谓词接口并使用这个 class.
的实例
如果不需要集群所有节点都订阅该主题,那么localListen就够了。
我有一个 class (TopicListenerImp) 用于侦听主题,我想获取来自 remoteListen 函数的消息并将其作为参数提供给其他服务 class 中的另一个非静态函数。
@Component
public class TopicListenerImp implements TopicListener {
private NotificationService notificationService;
private SubscriptionRepository subscriptionRepository;
private SubscriptionRules subscriptionRules;
private NFInstancesService nfInstancesService;
private Ignite ignite;
public TopicListenerImp(
SubscriptionRules subscriptionRules,
NotificationService notificationService,
SubscriptionRepository subscriptionRepository,
Ignite ignite,
NFInstancesService nfInstancesService) {
this.subscriptionRules = subscriptionRules;
this.notificationService = notificationService;
this.subscriptionRepository = subscriptionRepository;
this.nfInstancesService = nfInstancesService;
this.ignite = ignite;
}
@Bean
public void startTopicListening() {
IgniteMessaging rmtMsg = ignite.message(ignite.cluster().forLocal());
rmtMsg.remoteListen(
"SUSPEND",
(nodeId, msg) -> {
notifyIfSubscriptionExist((String) msg); //here where I used the message that comes from topic
return true;
});
}
public void notifyIfSubscriptionExist(String msg) {
List<String> nfInstanceIdSubscriptionId = parseNfInstanceIdSubscriptionId(msg);
Optional<NFProfile> nfProfile =
nfInstancesService.getNFInstance(nfInstanceIdSubscriptionId.get(0));
Optional<SubscriptionData> subscriptionDataOptional =
subscriptionRepository.getSubscriptionData(nfInstanceIdSubscriptionId.get(1));
subscriptionDataOptional.ifPresent(
subscriptionData -> {
if (subscriptionRules.checkRules(subscriptionData) && nfProfile.isPresent())
notificationService.sendEventNotification(
subscriptionData, nfProfile.get(), NF_PROFILE_CHANGED);
});
}
private List<String> parseNfInstanceIdSubscriptionId(String msg) {
List<String> values = asList(msg.split(", "));
return asList(getIdFromMessage(values.get(0)), getIdFromMessage(values.get(1)));
}
private String getIdFromMessage(String msg) {
return msg.substring(msg.indexOf('[') + 1, msg.length() - 1);
}
}
但是我得到以下错误;
Caused by: org.apache.ignite.binary.BinaryObjectException: Failed to serialize object etc.
当我静态注入字段时,它起作用了。但是当我这样做时,IDE 发出警告 "don't update the static variable from the constructor method".
remoteListen 采用谓词,该谓词将被序列化并通过网络发送到远程节点。当您在其实现中调用非静态方法时,它会使整个 this
对象被序列化。它可能会导致意外行为和通过网络发送大量数据。
一般来说,不建议通过网络发送 lambda 函数,因为它们的序列化依赖于 VM,并且您无法控制实际发送的内容。如果你想通过网络发送谓词,那么创建一个 class 实现谓词接口并使用这个 class.
的实例如果不需要集群所有节点都订阅该主题,那么localListen就够了。