HystrixObservableCommand 执行未显示在 dashboard/hystrix.stream 中

HystrixObservableCommand executions don't show up in dashboard/hystrix.stream

问题

Hystrix 仪表板显示 HystrixCommands 的执行,但不显示 HystrixObservableCommand 的执行。我们需要 HystrixObservableCommand,因为我们正在包装一个异步 HTTP 调用。下面的代码显示了我们在仪表板中跟踪的示例。集成测试显示调用已正确执行,流提到 AsyncHttpCommand,但从未跟踪任何命中。

void aMethod(A requestHeaders, B asyncContext, C message) {
    // this is tracked
    new DummyCommand().execute();

    // this not
    Observable<Response> observable = new AsyncHttpCommand(builder.setHeaders(requestHeaders), message).construct();
    observable.subscribe(createObserver(asyncContext, message));
}

// we added and removed some properties without any change on the tracking
private final HystrixCommandProperties.Setter defaultProperties = HystrixCommandProperties.Setter()
                  .withExecutionIsolationSemaphoreMaxConcurrentRequests(400)
                  .withFallbackIsolationSemaphoreMaxConcurrentRequests(400)
                  .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE)
                  .withExecutionTimeoutInMilliseconds(10000)
                  .withExecutionTimeoutEnabled(true)
                  .withFallbackEnabled(true)
                  .withCircuitBreakerEnabled(true)
                  .withCircuitBreakerErrorThresholdPercentage(50)
                  .withCircuitBreakerRequestVolumeThreshold(20)
                  .withCircuitBreakerSleepWindowInMilliseconds(5000)
                  .withCircuitBreakerForceOpen(false)
                  .withCircuitBreakerForceClosed(false);

private class DummyCommand extends HystrixCommand<String> {

    protected DummyCommand() {
        super(HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("default"))
                                                 .andCommandPropertiesDefaults(defaultProperties));
    }

    @Override
    protected String run() throws Exception {
        return "test";
    }
}

private class AsyncHttpCommand extends HystrixObservableCommand<Response> {

    private BoundRequestBuilder builder;

    protected AsyncHttpCommand(final BoundRequestBuilder builder) {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("default2"))
             .andCommandPropertiesDefaults(defaultProperties));
        this.builder = builder;
    }

    @Override
    protected Observable<Response> construct() {
        return Observable.from(builder.execute()).subscribeOn(Schedulers.io());
    }
}

private Observer<? super Response> createObserver(final AsyncContext asyncContext, final SentRequestMessage message) {
    return new Observer<Response>() {
        @Override
        public void onCompleted() {
            // was never reached
        }

        @Override
        public void onError(final Throwable throwable) {
            // should not be reached, as fallback kicks in
        }

        @Override
        public void onNext(final Response response) {
            // omitted result handling ...
        }
    };
}

Hystrix.stream

web.xml

<servlet>
    <description></description>
    <display-name>HystrixMetricsStreamServlet</display-name>
    <servlet-name>HystrixMetricsStreamServlet</servlet-name>
    <servlet-class>com.netflix.hystrix.contrib.metrics.eventstream.HystrixMetricsStreamServlet</servlet-class>
</servlet>

<servlet-mapping>
    <servlet-name>HystrixMetricsStreamServlet</servlet-name>
    <url-pattern>/hystrix.stream</url-pattern>
</servlet-mapping>

pom.xml

<dependency>
    <groupId>com.netflix.hystrix</groupId>
    <artifactId>hystrix-core</artifactId>
    <version>RELEASE</version>
</dependency>
<dependency>
    <groupId>com.netflix.hystrix</groupId>
    <artifactId>hystrix-metrics-event-stream</artifactId>
    <version>1.4.10</version>
</dependency>

问题

我对 AsyncHttpCommand 做错了吗?需要什么 changed/added,以便 hystrix 流也将显示该命令的命中。

命令是正确的,但我们必须使用 observe() 而不是 construct()

Observable<Response> observable = new AsyncHttpCommand(builder.setHeaders(requestHeaders), message)
                                         .observe();
observable.subscribe(createObserver(asyncContext, message));