重构遗留 SOA 系统以使用非阻塞异步微服务
Refactor Legacy SOA system to use non-blocking async microservices
我刚刚开始尝试了解 RxJava,以便我可以使用项目反应堆重构遗留 SOA 系统来使用非阻塞异步微服务。
目前我正在进行可行性研究,并考虑使用像勺子这样的东西来转换遗留服务代码(但这与这个问题无关)
我想知道如何使用 reactor-bus Request/Reply 语法来替换这个同步服务代码。或者即使我应该使用完全不同的反应器构造。
这里是一个遗留 soa 服务的例子,它是人为设计的,所以它可能不完全有意义,但基本上每个服务都依赖于最后一个服务的结果。
public static Map<String, Object> createAccount(DispatchContext dctx, Map<String, Object> context) {
LocalDispatcher dispatcher = dctx.getDispatcher();
String accountPartyId = (String) context.get("partyId");
Map<String, Object> input = UtilMisc.toMap("groupName", context.get("accountName"), "groupNameLocal", context.get("groupNameLocal"), "officeSiteName", context.get("officeSiteName"), "description", context.get("description"), "partyId", accountPartyId);
Map<String, Object> serviceResults1 = dispatcher.runSync("createPartyGroup", input);
Map<String, Object> serviceResults2 = dispatcher.runSync("createPartyRole", UtilMisc.toMap("partyId", (String) serviceResults1.get("partyId"), "roleTypeId", "ACCOUNT"));
String dataSourceId = (String) context.get("dataSourceId");
Map<String, Object> serviceResults3 = null;
if (dataSourceId != null) {
serviceResults3 = dispatcher.runSync("crmsfa.addAccountDataSource", UtilMisc.toMap("partyId", (String) serviceResults2.get("partyId"), "dataSourceId", dataSourceId));
}
String marketingCampaignId = (String) context.get("marketingCampaignId");
Map<String, Object> serviceResults4 = null;
if (marketingCampaignId != null) {
serviceResults4 = dispatcher.runSync("crmsfa.addAccountMarketingCampaign", UtilMisc.toMap("partyId", (String) serviceResults3.get("partyId"), "marketingCampaignId", marketingCampaignId));
}
String initialTeamPartyId = (String) context.get("initialTeamPartyId");
Map<String, Object> serviceResults5 = null;
if (initialTeamPartyId != null) {
serviceResults5 = dispatcher.runSync("crmsfa.assignTeamToAccount", UtilMisc.toMap("accountPartyId", (String) serviceResults4.get("partyId"), "teamPartyId", initialTeamPartyId, "userLogin", userLogin));
}
Map<String, Object> results = ServiceUtil.returnSuccess();
results.put("groupId", (String) serviceResults1.get("groupId"));
results.put("roleId", (String) serviceResults2.get("roleId"));
results.put("dataSourceId", (String) serviceResults3.get("dataSourceId"));
results.put("marketingCampaignId", (String) serviceResults4.get("marketingCampaignId"));
results.put("teamPartyId", (String) serviceResults5.get("teamPartyId"));
return results;
}
基本上这是一个使用 dispatcher.runSync 调用其他服务的服务...我只是在寻找一个起点来研究如何使用反应堆甚至另一个库来转换这种类型的语法转换为异步非阻塞代码。
在这一点上,我在考虑 callbacks/some 类 Promise 类型结构的非常模糊的术语。
比如第一次调用另一个服务是
Map<String, Object> serviceResults = dispatcher.runSync("createPartyGroup", input);
如果这改为返回包含 serviceResults 映射的 Promise 对象,则可以将方法的其余部分移到 Promise onComplete 块中,结果将是构成此服务方法的一组深度嵌套的 onComplete 代码块。
Promise p = task {
// createPartyGroup service call
}
p.onComplete { result ->
Promise p2 = task {
// createPartyRole sevice call
}
p2.onComplete { result ->
//next service call
}
}
}
或者查看像下面这样的 reactor-bus 文档,它在很多层面上都没有意义,我只是对 reactor 了解不够,不知道为什么它没有意义或接下来要学习什么我明白为什么它没有意义
bus.send("service.createPartyGroup", Event.wrap(input, "reply.service.createPartyGroup"));
bus.receive($("reply.service.createPartyGroup"), ev -> {
Map<?> input2 = UtilMisc.toMap("partyId", (String) ev.get("partyId"), "roleTypeId", "ACCOUNT")
bus.send("service.createPartyRole", Event.wrap(input2, "reply.service.createPartyRole"));
});
我意识到开始研究反应式编程范式是一个相当奇怪的地方。但是替换此同步服务代码是我的终极目标 objective,如果我至少理解了语法,我可以从中逆向工作。
您只需要使用 Observable,在您的流中,一个发出的项目通过流传递。
查看文档 https://github.com/ReactiveX/RxJava
这将是一个顺序流程
Observable.just(methodThatCallFirstServiceAndReturnObservable(params))
.flatMap(resul1 -> methodThatCallSecondAndReturnObservable(resul1))
.flatMap(resul2 -> methodThatCallThirdAndReturnObservable(resul2))
.subscribe(result3->"Last value emmited here:");
您可以 运行 并行调用三个服务,并使用 Observable.zip 或合并将所有值放在一起。但我相信这不是你需要的。
我刚刚开始尝试了解 RxJava,以便我可以使用项目反应堆重构遗留 SOA 系统来使用非阻塞异步微服务。
目前我正在进行可行性研究,并考虑使用像勺子这样的东西来转换遗留服务代码(但这与这个问题无关)
我想知道如何使用 reactor-bus Request/Reply 语法来替换这个同步服务代码。或者即使我应该使用完全不同的反应器构造。
这里是一个遗留 soa 服务的例子,它是人为设计的,所以它可能不完全有意义,但基本上每个服务都依赖于最后一个服务的结果。
public static Map<String, Object> createAccount(DispatchContext dctx, Map<String, Object> context) {
LocalDispatcher dispatcher = dctx.getDispatcher();
String accountPartyId = (String) context.get("partyId");
Map<String, Object> input = UtilMisc.toMap("groupName", context.get("accountName"), "groupNameLocal", context.get("groupNameLocal"), "officeSiteName", context.get("officeSiteName"), "description", context.get("description"), "partyId", accountPartyId);
Map<String, Object> serviceResults1 = dispatcher.runSync("createPartyGroup", input);
Map<String, Object> serviceResults2 = dispatcher.runSync("createPartyRole", UtilMisc.toMap("partyId", (String) serviceResults1.get("partyId"), "roleTypeId", "ACCOUNT"));
String dataSourceId = (String) context.get("dataSourceId");
Map<String, Object> serviceResults3 = null;
if (dataSourceId != null) {
serviceResults3 = dispatcher.runSync("crmsfa.addAccountDataSource", UtilMisc.toMap("partyId", (String) serviceResults2.get("partyId"), "dataSourceId", dataSourceId));
}
String marketingCampaignId = (String) context.get("marketingCampaignId");
Map<String, Object> serviceResults4 = null;
if (marketingCampaignId != null) {
serviceResults4 = dispatcher.runSync("crmsfa.addAccountMarketingCampaign", UtilMisc.toMap("partyId", (String) serviceResults3.get("partyId"), "marketingCampaignId", marketingCampaignId));
}
String initialTeamPartyId = (String) context.get("initialTeamPartyId");
Map<String, Object> serviceResults5 = null;
if (initialTeamPartyId != null) {
serviceResults5 = dispatcher.runSync("crmsfa.assignTeamToAccount", UtilMisc.toMap("accountPartyId", (String) serviceResults4.get("partyId"), "teamPartyId", initialTeamPartyId, "userLogin", userLogin));
}
Map<String, Object> results = ServiceUtil.returnSuccess();
results.put("groupId", (String) serviceResults1.get("groupId"));
results.put("roleId", (String) serviceResults2.get("roleId"));
results.put("dataSourceId", (String) serviceResults3.get("dataSourceId"));
results.put("marketingCampaignId", (String) serviceResults4.get("marketingCampaignId"));
results.put("teamPartyId", (String) serviceResults5.get("teamPartyId"));
return results;
}
基本上这是一个使用 dispatcher.runSync 调用其他服务的服务...我只是在寻找一个起点来研究如何使用反应堆甚至另一个库来转换这种类型的语法转换为异步非阻塞代码。
在这一点上,我在考虑 callbacks/some 类 Promise 类型结构的非常模糊的术语。
比如第一次调用另一个服务是
Map<String, Object> serviceResults = dispatcher.runSync("createPartyGroup", input);
如果这改为返回包含 serviceResults 映射的 Promise 对象,则可以将方法的其余部分移到 Promise onComplete 块中,结果将是构成此服务方法的一组深度嵌套的 onComplete 代码块。
Promise p = task {
// createPartyGroup service call
}
p.onComplete { result ->
Promise p2 = task {
// createPartyRole sevice call
}
p2.onComplete { result ->
//next service call
}
}
}
或者查看像下面这样的 reactor-bus 文档,它在很多层面上都没有意义,我只是对 reactor 了解不够,不知道为什么它没有意义或接下来要学习什么我明白为什么它没有意义
bus.send("service.createPartyGroup", Event.wrap(input, "reply.service.createPartyGroup"));
bus.receive($("reply.service.createPartyGroup"), ev -> {
Map<?> input2 = UtilMisc.toMap("partyId", (String) ev.get("partyId"), "roleTypeId", "ACCOUNT")
bus.send("service.createPartyRole", Event.wrap(input2, "reply.service.createPartyRole"));
});
我意识到开始研究反应式编程范式是一个相当奇怪的地方。但是替换此同步服务代码是我的终极目标 objective,如果我至少理解了语法,我可以从中逆向工作。
您只需要使用 Observable,在您的流中,一个发出的项目通过流传递。 查看文档 https://github.com/ReactiveX/RxJava
这将是一个顺序流程
Observable.just(methodThatCallFirstServiceAndReturnObservable(params))
.flatMap(resul1 -> methodThatCallSecondAndReturnObservable(resul1))
.flatMap(resul2 -> methodThatCallThirdAndReturnObservable(resul2))
.subscribe(result3->"Last value emmited here:");
您可以 运行 并行调用三个服务,并使用 Observable.zip 或合并将所有值放在一起。但我相信这不是你需要的。