akka http, singleRequest (java), 请求未发送

akka http, singleRequest (java), request is not sent

我或多或少是 akka 和 akka-http 的新手,并且在从演员发送 http singleRequest 时遇到问题。

我想要完成的事情:

如果我发送单个 计算订单,一切正常。如果有 10 个 计算命令 ,在某些时候,根据调试消息使系统变慢的程度,请求不会被发送。没有异常,没有超时,注意。

我的 actor 设置的设计类似于 akka 示例中的 分布式 master worker 示例。在试图找出问题所在时,我 运行只有一个工人(CalculationActorReplyActor)。

好的,我为您准备了更多详细信息。

首先,如果请求端点是用 akka http 编写的,那么一切正常。遗憾的是,它是用依赖jetty的sparkjava写的。但据我所知,这不是端点的错。请求未发送。

akka.http.impl.engine.client.PoolConductor#apply是命令流图:

Request-   +-----------+     +-----------+    Switch-    +-------------+     +-----------+    Command
Context    |   retry   |     |   slot-   |    Command    |   doubler   |     |   route   +-------------->
+--------->|   Merge   +---->| Selector  +-------------->| (MapConcat) +---->|  (Flexi   +-------------->
           |           |     |           |               |             |     |   Route)  +-------------->
           +----+------+     +-----+-----+               +-------------+     +-----------+       to slots
                ^                  ^
                |                  | SlotEvent
                |             +----+----+
                |             | flatten | mapAsync
                |             +----+----+
                |                  | RawSlotEvent
                | Request-         |
                | Context     +---------+
                +-------------+  retry  |<-------- RawSlotEvent (from slotEventMerge)
                              |  Split  |
                              +---------+

未发送的请求缺少插槽选择器命令。不知道为什么。我已经花了一些时间调试。也许以下是某种帮助:

成功发送请求的命令链:

akka.stream.actor.ActorPublisher#onNext
    element: RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"acbacca1-11e1-4f14-a4a6-4b14b53598a4"),HttpProtocol(HTTP/1.1)),List(scala.concurrent.impl.CallbackRunnable@1c81ab77),0)
    threadId: 51
akka.stream.actor.ActorPublisher#onNext
    element: RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"acbacca1-11e1-4f14-a4a6-4b14b53598a4"),HttpProtocol(HTTP/1.1)),List(scala.concurrent.impl.CallbackRunnable@1c81ab77),0)
    threadId: 51
akka.stream.stage.GraphStageLogic#grab (fast path)
    element: RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"acbacca1-11e1-4f14-a4a6-4b14b53598a4"),HttpProtocol(HTTP/1.1)),List(scala.concurrent.impl.CallbackRunnable@1c81ab77),0)
    threadId: 49
akka.stream.stage.GraphStageLogic#emit (isAvailable(out)) => push
    out (class): akka.stream.Outlet
    element (class): akka.http.impl.engine.client.PoolFlow$RequestContext
    threadId: 49
akka.stream.stage.GraphStageLogic#grab (fast path)
    element: RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"acbacca1-11e1-4f14-a4a6-4b14b53598a4"),HttpProtocol(HTTP/1.1)),List(scala.concurrent.impl.CallbackRunnable@1c81ab77),0)
    threadId: 49
akka.stream.stage.GraphStageLogic#emit (isAvailable(out)) => push
    out (cass): akka.stream.Outlet
    element (class): akka.http.impl.engine.client.PoolConductor$SwitchSlotCommand)
    threadId: 49
akka.stream.stage.GraphStageLogic#emit (isAvailable(out)) => push
    out (class): akka.stream.Outlet
    element (class): akka.http.impl.engine.client.PoolConductor$DispatchCommand
    threadId: 49
akka.stream.actor.ActorPublisher#onNext
    HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"acbacca1-11e1-4f14-a4a6-4b14b53598a4"),HttpProtocol(HTTP/1.1))
    threadId: 78
akka.stream.actor.ActorPublisher#onNext
    HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"acbacca1-11e1-4f14-a4a6-4b14b53598a4"),HttpProtocol(HTTP/1.1))
    threadId: 78
akka.stream.stage.GraphStageLogic#emit (isAvailable(out)) => push
    out (class): akka.stream.Outlet
    element (class): akka.http.impl.engine.rendering.RequestRenderingContext
    threadId: 78
akka.stream.stage.GraphStageLogic#emit (isAvailable(out)) => push
    out (class): akka.stream.Outlet
    element (class): scala.collection.immutable.$colon$colon
    threadId: 78
akka.stream.actor.ActorPublisher#onNext
    List(ResponseDelivery(ResponseContext(RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567), ...
    threadId: 78

未发送请求的命令链(相同运行):

akka.stream.actor.ActorPublisher#onNext
    RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"47bc9378-5166-4a14-920a-8eab53717263"),HttpProtocol(HTTP/1.1)),List(scala.concurrent.impl.CallbackRunnable@29ee365f),0)
    threadId: 116
akka.stream.actor.ActorPublisher#onNext
    element: RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"47bc9378-5166-4a14-920a-8eab53717263"),HttpProtocol(HTTP/1.1)),List(scala.concurrent.impl.CallbackRunnable@29ee365f),0)
    threadId: 116
akka.stream.stage.GraphStageLogic#grab (fast path)
    element: RequestContext(HttpRequest(HttpMethod(POST),/example/reply/uri,List(Host: localhost:4567),HttpEntity.Strict(application/json,"47bc9378-5166-4a14-920a-8eab53717263"),HttpProtocol(HTTP/1.1)),List(scala.concurrent.impl.CallbackRunnable@29ee365f),0)
    threadId: 78
akka.stream.stage.GraphStageLogic#emit (NOT isAvailable(out)) => setOrAddEmitting
    out (class): akka.stream.Outlet
    element (class): akka.http.impl.engine.client.PoolFlow$RequestContext
    threadId: 78
akka.stream.stage.GraphStageLogic#emit (NOT isAvailable(out)) => setOrAddEmitting
    out (class): akka.stream.Outlet
    element (class): akka.http.impl.engine.client.PoolFlow$RequestContext
    threadId: 78
akka.stream.stage.GraphStageLogic#setOrAddEmitting case e
    handler: akka.stream.stage.GraphStageLogic$EagerTerminateOutput$@546413bd
    out: MergePreferred.out,
    next: akka.stream.stage.GraphStageLogic$EmittingSingle@69621de0
    threadId: 78
akka.stream.stage.GraphStageLogic#setOrAddEmitting case e
    handler: akka.stream.stage.GraphStageLogic$EagerTerminateOutput$@546413bd
    out: MergePreferred.out,
    next: akka.stream.stage.GraphStageLogic$EmittingSingle@69621de0
    threadId: 78
akka.stream.stage.GraphStageLogic#setOrAddEmitting case _
    handler: akka.stream.stage.GraphStageLogic$EagerTerminateOutput$@546413bd
    out: MergePreferred.out
    next: akka.stream.stage.GraphStageLogic$EmittingSingle@69621de0
    threadId: 78
akka.stream.actor.ActorPublisher#onNext
    element: List(Disconnected(0,0))
    threadId: 117
akka.stream.actor.ActorPublisher#onNext
    element: List(Disconnected(0,0))
    threadId: 117
akka.stream.actor.ActorPublisher#onNext
    element: List(Disconnected(1,0))
    threadId: 49

感谢任何帮助。谢谢!

版本为 2.4.8 (2.11)(akka-actor、akka-http-core、akka-http-experimental、akka-stream)

知道了!看起来我正在执行我的请求 "wrong".

~4 个请求后失败:

Http http = Http.get(context().system());
ActorMaterializer materializer = ActorMaterializer.create(context().system());
HttpRequest request = HttpRequestPOST("http://localhost:8091").withEntity(ContentTypes.APPLICATION_JSON, message);

http.singleRequest(request, materializer).whenComplete((r, t) -> log.info("httpResponse: {}, throwable: {}", r, t));

作品:

Http http = Http.get(context().system());
ActorMaterializer materializer = ActorMaterializer.create(context().system());
Flow<HttpRequest, HttpResponse, CompletionStage<OutgoingConnection>> flow = http.outgoingConnection(ConnectHttp.toHost("localhost", 8091));
HttpRequest request = HttpRequest.POST("http://localhost:8091").withEntity(ContentTypes.APPLICATION_JSON, message);

Source.single(request).via(flow).runWith(Sink.head(), materializer).whenComplete((r, t) -> log.info("httpResponse: {}, throwable: {}", r, t));

非常感谢 ,它为我指明了正确的方向。