akka http, singleRequest (java), 请求未发送
akka http, singleRequest (java), request is not sent
我或多或少是 akka 和 akka-http 的新手,并且在从演员发送 http singleRequest 时遇到问题。
我想要完成的事情:
- actors 在收到 计算命令后计算值
- 一旦计算完成另一个演员,让我们称他为 ReplyActor,被调用以向另一个系统发送 "done" http 请求
- 消息正在排队,在我的设置中总是只有一个 ReplyActor alive
如果我发送单个 计算订单,一切正常。如果有 10 个 计算命令 ,在某些时候,根据调试消息使系统变慢的程度,请求不会被发送。没有异常,没有超时,注意。
我的 actor 设置的设计类似于 akka 示例中的 分布式 master worker 示例。在试图找出问题所在时,我 运行只有一个工人(CalculationActor 和 ReplyActor)。
好的,我为您准备了更多详细信息。
首先,如果请求端点是用 akka http 编写的,那么一切正常。遗憾的是,它是用依赖jetty的sparkjava写的。但据我所知,这不是端点的错。请求未发送。
在akka.http.impl.engine.client.PoolConductor#apply是命令流图:
Request- +-----------+ +-----------+ Switch- +-------------+ +-----------+ Command
Context | retry | | slot- | Command | doubler | | route +-------------->
+--------->| Merge +---->| Selector +-------------->| (MapConcat) +---->| (Flexi +-------------->
| | | | | | | Route) +-------------->
+----+------+ +-----+-----+ +-------------+ +-----------+ to slots
^ ^
| | SlotEvent
| +----+----+
| | flatten | mapAsync
| +----+----+
| | RawSlotEvent
| Request- |
| Context +---------+
+-------------+ retry |<-------- RawSlotEvent (from slotEventMerge)
| Split |
+---------+
未发送的请求缺少插槽选择器命令。不知道为什么。我已经花了一些时间调试。也许以下是某种帮助:
成功发送请求的命令链:
akka.stream.actor.ActorPublisher#onNext
element: RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"acbacca1-11e1-4f14-a4a6-4b14b53598a4"),HttpProtocol(HTTP/1.1)),List(scala.concurrent.impl.CallbackRunnable@1c81ab77),0)
threadId: 51
akka.stream.actor.ActorPublisher#onNext
element: RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"acbacca1-11e1-4f14-a4a6-4b14b53598a4"),HttpProtocol(HTTP/1.1)),List(scala.concurrent.impl.CallbackRunnable@1c81ab77),0)
threadId: 51
akka.stream.stage.GraphStageLogic#grab (fast path)
element: RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"acbacca1-11e1-4f14-a4a6-4b14b53598a4"),HttpProtocol(HTTP/1.1)),List(scala.concurrent.impl.CallbackRunnable@1c81ab77),0)
threadId: 49
akka.stream.stage.GraphStageLogic#emit (isAvailable(out)) => push
out (class): akka.stream.Outlet
element (class): akka.http.impl.engine.client.PoolFlow$RequestContext
threadId: 49
akka.stream.stage.GraphStageLogic#grab (fast path)
element: RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"acbacca1-11e1-4f14-a4a6-4b14b53598a4"),HttpProtocol(HTTP/1.1)),List(scala.concurrent.impl.CallbackRunnable@1c81ab77),0)
threadId: 49
akka.stream.stage.GraphStageLogic#emit (isAvailable(out)) => push
out (cass): akka.stream.Outlet
element (class): akka.http.impl.engine.client.PoolConductor$SwitchSlotCommand)
threadId: 49
akka.stream.stage.GraphStageLogic#emit (isAvailable(out)) => push
out (class): akka.stream.Outlet
element (class): akka.http.impl.engine.client.PoolConductor$DispatchCommand
threadId: 49
akka.stream.actor.ActorPublisher#onNext
HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"acbacca1-11e1-4f14-a4a6-4b14b53598a4"),HttpProtocol(HTTP/1.1))
threadId: 78
akka.stream.actor.ActorPublisher#onNext
HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"acbacca1-11e1-4f14-a4a6-4b14b53598a4"),HttpProtocol(HTTP/1.1))
threadId: 78
akka.stream.stage.GraphStageLogic#emit (isAvailable(out)) => push
out (class): akka.stream.Outlet
element (class): akka.http.impl.engine.rendering.RequestRenderingContext
threadId: 78
akka.stream.stage.GraphStageLogic#emit (isAvailable(out)) => push
out (class): akka.stream.Outlet
element (class): scala.collection.immutable.$colon$colon
threadId: 78
akka.stream.actor.ActorPublisher#onNext
List(ResponseDelivery(ResponseContext(RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567), ...
threadId: 78
未发送请求的命令链(相同运行):
akka.stream.actor.ActorPublisher#onNext
RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"47bc9378-5166-4a14-920a-8eab53717263"),HttpProtocol(HTTP/1.1)),List(scala.concurrent.impl.CallbackRunnable@29ee365f),0)
threadId: 116
akka.stream.actor.ActorPublisher#onNext
element: RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"47bc9378-5166-4a14-920a-8eab53717263"),HttpProtocol(HTTP/1.1)),List(scala.concurrent.impl.CallbackRunnable@29ee365f),0)
threadId: 116
akka.stream.stage.GraphStageLogic#grab (fast path)
element: RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"47bc9378-5166-4a14-920a-8eab53717263"),HttpProtocol(HTTP/1.1)),List(scala.concurrent.impl.CallbackRunnable@29ee365f),0)
threadId: 78
akka.stream.stage.GraphStageLogic#emit (NOT isAvailable(out)) => setOrAddEmitting
out (class): akka.stream.Outlet
element (class): akka.http.impl.engine.client.PoolFlow$RequestContext
threadId: 78
akka.stream.stage.GraphStageLogic#emit (NOT isAvailable(out)) => setOrAddEmitting
out (class): akka.stream.Outlet
element (class): akka.http.impl.engine.client.PoolFlow$RequestContext
threadId: 78
akka.stream.stage.GraphStageLogic#setOrAddEmitting case e
handler: akka.stream.stage.GraphStageLogic$EagerTerminateOutput$@546413bd
out: MergePreferred.out,
next: akka.stream.stage.GraphStageLogic$EmittingSingle@69621de0
threadId: 78
akka.stream.stage.GraphStageLogic#setOrAddEmitting case e
handler: akka.stream.stage.GraphStageLogic$EagerTerminateOutput$@546413bd
out: MergePreferred.out,
next: akka.stream.stage.GraphStageLogic$EmittingSingle@69621de0
threadId: 78
akka.stream.stage.GraphStageLogic#setOrAddEmitting case _
handler: akka.stream.stage.GraphStageLogic$EagerTerminateOutput$@546413bd
out: MergePreferred.out
next: akka.stream.stage.GraphStageLogic$EmittingSingle@69621de0
threadId: 78
akka.stream.actor.ActorPublisher#onNext
element: List(Disconnected(0,0))
threadId: 117
akka.stream.actor.ActorPublisher#onNext
element: List(Disconnected(0,0))
threadId: 117
akka.stream.actor.ActorPublisher#onNext
element: List(Disconnected(1,0))
threadId: 49
感谢任何帮助。谢谢!
版本为 2.4.8 (2.11)(akka-actor、akka-http-core、akka-http-experimental、akka-stream)
知道了!看起来我正在执行我的请求 "wrong".
~4 个请求后失败:
Http http = Http.get(context().system());
ActorMaterializer materializer = ActorMaterializer.create(context().system());
HttpRequest request = HttpRequestPOST("http://localhost:8091").withEntity(ContentTypes.APPLICATION_JSON, message);
http.singleRequest(request, materializer).whenComplete((r, t) -> log.info("httpResponse: {}, throwable: {}", r, t));
作品:
Http http = Http.get(context().system());
ActorMaterializer materializer = ActorMaterializer.create(context().system());
Flow<HttpRequest, HttpResponse, CompletionStage<OutgoingConnection>> flow = http.outgoingConnection(ConnectHttp.toHost("localhost", 8091));
HttpRequest request = HttpRequest.POST("http://localhost:8091").withEntity(ContentTypes.APPLICATION_JSON, message);
Source.single(request).via(flow).runWith(Sink.head(), materializer).whenComplete((r, t) -> log.info("httpResponse: {}, throwable: {}", r, t));
非常感谢 ,它为我指明了正确的方向。
我或多或少是 akka 和 akka-http 的新手,并且在从演员发送 http singleRequest 时遇到问题。
我想要完成的事情:
- actors 在收到 计算命令后计算值
- 一旦计算完成另一个演员,让我们称他为 ReplyActor,被调用以向另一个系统发送 "done" http 请求
- 消息正在排队,在我的设置中总是只有一个 ReplyActor alive
如果我发送单个 计算订单,一切正常。如果有 10 个 计算命令 ,在某些时候,根据调试消息使系统变慢的程度,请求不会被发送。没有异常,没有超时,注意。
我的 actor 设置的设计类似于 akka 示例中的 分布式 master worker 示例。在试图找出问题所在时,我 运行只有一个工人(CalculationActor 和 ReplyActor)。
好的,我为您准备了更多详细信息。
首先,如果请求端点是用 akka http 编写的,那么一切正常。遗憾的是,它是用依赖jetty的sparkjava写的。但据我所知,这不是端点的错。请求未发送。
在akka.http.impl.engine.client.PoolConductor#apply是命令流图:
Request- +-----------+ +-----------+ Switch- +-------------+ +-----------+ Command
Context | retry | | slot- | Command | doubler | | route +-------------->
+--------->| Merge +---->| Selector +-------------->| (MapConcat) +---->| (Flexi +-------------->
| | | | | | | Route) +-------------->
+----+------+ +-----+-----+ +-------------+ +-----------+ to slots
^ ^
| | SlotEvent
| +----+----+
| | flatten | mapAsync
| +----+----+
| | RawSlotEvent
| Request- |
| Context +---------+
+-------------+ retry |<-------- RawSlotEvent (from slotEventMerge)
| Split |
+---------+
未发送的请求缺少插槽选择器命令。不知道为什么。我已经花了一些时间调试。也许以下是某种帮助:
成功发送请求的命令链:
akka.stream.actor.ActorPublisher#onNext
element: RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"acbacca1-11e1-4f14-a4a6-4b14b53598a4"),HttpProtocol(HTTP/1.1)),List(scala.concurrent.impl.CallbackRunnable@1c81ab77),0)
threadId: 51
akka.stream.actor.ActorPublisher#onNext
element: RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"acbacca1-11e1-4f14-a4a6-4b14b53598a4"),HttpProtocol(HTTP/1.1)),List(scala.concurrent.impl.CallbackRunnable@1c81ab77),0)
threadId: 51
akka.stream.stage.GraphStageLogic#grab (fast path)
element: RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"acbacca1-11e1-4f14-a4a6-4b14b53598a4"),HttpProtocol(HTTP/1.1)),List(scala.concurrent.impl.CallbackRunnable@1c81ab77),0)
threadId: 49
akka.stream.stage.GraphStageLogic#emit (isAvailable(out)) => push
out (class): akka.stream.Outlet
element (class): akka.http.impl.engine.client.PoolFlow$RequestContext
threadId: 49
akka.stream.stage.GraphStageLogic#grab (fast path)
element: RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"acbacca1-11e1-4f14-a4a6-4b14b53598a4"),HttpProtocol(HTTP/1.1)),List(scala.concurrent.impl.CallbackRunnable@1c81ab77),0)
threadId: 49
akka.stream.stage.GraphStageLogic#emit (isAvailable(out)) => push
out (cass): akka.stream.Outlet
element (class): akka.http.impl.engine.client.PoolConductor$SwitchSlotCommand)
threadId: 49
akka.stream.stage.GraphStageLogic#emit (isAvailable(out)) => push
out (class): akka.stream.Outlet
element (class): akka.http.impl.engine.client.PoolConductor$DispatchCommand
threadId: 49
akka.stream.actor.ActorPublisher#onNext
HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"acbacca1-11e1-4f14-a4a6-4b14b53598a4"),HttpProtocol(HTTP/1.1))
threadId: 78
akka.stream.actor.ActorPublisher#onNext
HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"acbacca1-11e1-4f14-a4a6-4b14b53598a4"),HttpProtocol(HTTP/1.1))
threadId: 78
akka.stream.stage.GraphStageLogic#emit (isAvailable(out)) => push
out (class): akka.stream.Outlet
element (class): akka.http.impl.engine.rendering.RequestRenderingContext
threadId: 78
akka.stream.stage.GraphStageLogic#emit (isAvailable(out)) => push
out (class): akka.stream.Outlet
element (class): scala.collection.immutable.$colon$colon
threadId: 78
akka.stream.actor.ActorPublisher#onNext
List(ResponseDelivery(ResponseContext(RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567), ...
threadId: 78
未发送请求的命令链(相同运行):
akka.stream.actor.ActorPublisher#onNext
RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"47bc9378-5166-4a14-920a-8eab53717263"),HttpProtocol(HTTP/1.1)),List(scala.concurrent.impl.CallbackRunnable@29ee365f),0)
threadId: 116
akka.stream.actor.ActorPublisher#onNext
element: RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"47bc9378-5166-4a14-920a-8eab53717263"),HttpProtocol(HTTP/1.1)),List(scala.concurrent.impl.CallbackRunnable@29ee365f),0)
threadId: 116
akka.stream.stage.GraphStageLogic#grab (fast path)
element: RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"47bc9378-5166-4a14-920a-8eab53717263"),HttpProtocol(HTTP/1.1)),List(scala.concurrent.impl.CallbackRunnable@29ee365f),0)
threadId: 78
akka.stream.stage.GraphStageLogic#emit (NOT isAvailable(out)) => setOrAddEmitting
out (class): akka.stream.Outlet
element (class): akka.http.impl.engine.client.PoolFlow$RequestContext
threadId: 78
akka.stream.stage.GraphStageLogic#emit (NOT isAvailable(out)) => setOrAddEmitting
out (class): akka.stream.Outlet
element (class): akka.http.impl.engine.client.PoolFlow$RequestContext
threadId: 78
akka.stream.stage.GraphStageLogic#setOrAddEmitting case e
handler: akka.stream.stage.GraphStageLogic$EagerTerminateOutput$@546413bd
out: MergePreferred.out,
next: akka.stream.stage.GraphStageLogic$EmittingSingle@69621de0
threadId: 78
akka.stream.stage.GraphStageLogic#setOrAddEmitting case e
handler: akka.stream.stage.GraphStageLogic$EagerTerminateOutput$@546413bd
out: MergePreferred.out,
next: akka.stream.stage.GraphStageLogic$EmittingSingle@69621de0
threadId: 78
akka.stream.stage.GraphStageLogic#setOrAddEmitting case _
handler: akka.stream.stage.GraphStageLogic$EagerTerminateOutput$@546413bd
out: MergePreferred.out
next: akka.stream.stage.GraphStageLogic$EmittingSingle@69621de0
threadId: 78
akka.stream.actor.ActorPublisher#onNext
element: List(Disconnected(0,0))
threadId: 117
akka.stream.actor.ActorPublisher#onNext
element: List(Disconnected(0,0))
threadId: 117
akka.stream.actor.ActorPublisher#onNext
element: List(Disconnected(1,0))
threadId: 49
感谢任何帮助。谢谢!
版本为 2.4.8 (2.11)(akka-actor、akka-http-core、akka-http-experimental、akka-stream)
知道了!看起来我正在执行我的请求 "wrong".
~4 个请求后失败:
Http http = Http.get(context().system());
ActorMaterializer materializer = ActorMaterializer.create(context().system());
HttpRequest request = HttpRequestPOST("http://localhost:8091").withEntity(ContentTypes.APPLICATION_JSON, message);
http.singleRequest(request, materializer).whenComplete((r, t) -> log.info("httpResponse: {}, throwable: {}", r, t));
作品:
Http http = Http.get(context().system());
ActorMaterializer materializer = ActorMaterializer.create(context().system());
Flow<HttpRequest, HttpResponse, CompletionStage<OutgoingConnection>> flow = http.outgoingConnection(ConnectHttp.toHost("localhost", 8091));
HttpRequest request = HttpRequest.POST("http://localhost:8091").withEntity(ContentTypes.APPLICATION_JSON, message);
Source.single(request).via(flow).runWith(Sink.head(), materializer).whenComplete((r, t) -> log.info("httpResponse: {}, throwable: {}", r, t));
非常感谢