使用 Apache Camel AHC-WS Websocket 组件的 JWT Bearer Token 身份验证

JWT Bearer Token Authentication with Apache Camel AHC-WS Websocket Component

上下文

我们目前正在建立一个使用 Apache Camel 作为集成框架的 Springboot 项目。

在路由的最后,我们需要使用 WSS websocket 协议发送消息,我们的应用程序在其中充当客户端,需要将消息发送到公开 websocket 端点的远程服务器。

但是远程服务器首先需要在 WSS 握手期间进行身份验证。事实上,它需要一些特定的 HTTP headers,包括 Authorization: Bearer {jwt-bearer-token} header。仅在握手期间需要令牌。打开 websocket 后,不再需要进一步的身份验证。

方法

因为它似乎是 Apache Camel 的唯一 websocket 客户端组件,我们打算使用 AHC-WS component,如下面的 POM 所示:

...

<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-spring-boot-starter</artifactId>
    <version>3.0.0-M2</version>
</dependency>
<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-ahc-ws</artifactId>
    <version>3.0.0-M2</version>
</dependency>

...

在我们的路由定义中,我们将所需的 header 附加到消息中(例如 Authorization header 和 JWT 不记名令牌)。此外,我们记录令牌以查看它是否真的被添加为消息的 header(确实如此)。

<?xml version="1.0" encoding="UTF-8"?>
<routes xmlns="http://camel.apache.org/schema/spring">
    <route id="distributorRoute">
        <from uri="seda:distributorEntryPoint"/>
        <setHeader headerName="Authorization">
            <simple>Bearer {jwt-bearer-token}</simple>
        </setHeader>
        <log message="${header.Authorization}"/>
        <to uri="ahc-wss://{remote-server-wss-endpoint}>
    </route>
</routes>

问题

简而言之,无法通过 WSS 发送消息,似乎从未建立连接。

应用程序产生以下 NPE 异常:

2019-06-07 15:52:00.304  INFO 17164 --- [butorEntryPoint] o.a.camel.component.ahc.ws.WsEndpoint    : Reconnecting websocket: wss://{remote-server-wss-endpoint}

...

java.lang.NullPointerException: null
    at org.apache.camel.component.ahc.ws.WsProducer.sendMessage(WsProducer.java:76) ~[camel-ahc-ws-3.0.0-M2.jar:3.0.0-M2]
    at org.apache.camel.component.ahc.ws.WsProducer.process(WsProducer.java:51) ~[camel-ahc-ws-3.0.0-M2.jar:3.0.0-M2]
    at org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:66) ~[camel-support-3.0.0-M2.jar:3.0.0-M2]
    at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:130) ~[camel-core-3.0.0-M2.jar:3.0.0-M2]
    at org.apache.camel.processor.RedeliveryErrorHandler$RedeliveryState.run(RedeliveryErrorHandler.java:482) ~[camel-core-3.0.0-M2.jar:3.0.0-M2]
    at org.apache.camel.support.ReactiveHelper$Worker.schedule(ReactiveHelper.java:130) [camel-support-3.0.0-M2.jar:3.0.0-M2]
    at org.apache.camel.support.ReactiveHelper.scheduleMain(ReactiveHelper.java:43) [camel-support-3.0.0-M2.jar:3.0.0-M2]
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:88) [camel-core-3.0.0-M2.jar:3.0.0-M2]
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:221) [camel-core-3.0.0-M2.jar:3.0.0-M2]
    at org.apache.camel.component.seda.SedaConsumer.sendToConsumers(SedaConsumer.java:289) [camel-seda-3.0.0-M2.jar:3.0.0-M2]
    at org.apache.camel.component.seda.SedaConsumer.doRun(SedaConsumer.java:203) [camel-seda-3.0.0-M2.jar:3.0.0-M2]
    at org.apache.camel.component.seda.SedaConsumer.run(SedaConsumer.java:148) [camel-seda-3.0.0-M2.jar:3.0.0-M2]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135) [na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [na:na]
    at java.base/java.lang.Thread.run(Thread.java:844) [na:na]

调试时我们发现 org.apache.camel.component.ahc.ws.WsProducer 尝试发送带有 org.apache.camel.component.ahc.ws.WebSocket object 的消息,在我们的例子中是 null:

但是该组件似乎 plugged-in 正确,因为发送到 non-secured WS 端点以进行测试按预期工作。然后它也只生成一次 "Reconnecting websocket" 日志。

问题

有没有人在 WS 握手期间进行身份验证时遇到过类似的问题?

我们已经查看了 AHC-WS 组件的配置可能性,但其中 none 似乎适合这种情况。

此外,我们需要动态生成和添加令牌,因此hard-coding它也无济于事。

@paizo 的评论让我们产生了在自定义 DefaultAsyncHttpClient 实现中覆盖 prepareGet() 方法的想法:

@Override
public BoundRequestBuilder prepareGet(String url) {
    // OAuth token
    String token = "Bearer my-jwt-token";

    // Headers for the handshake request
    HttpHeaders httpHeaders = new DefaultHttpHeaders();
    httpHeaders.set("Authorization", token);

    // Prepare the websocket upgrade handshake
    BoundRequestBuilder boundRequestBuilder = super.prepareGet(url);
    boundRequestBuilder.setHeaders(httpHeaders);    

    return boundRequestBuilder;
}

此客户端现在必须由 AHC-WS 组件使用。事实上,它在 connect() 方法期间被 WsEndpoint class 使用,其中 getClient() 必须 return 来自上面的自定义客户端:

public void connect() throws Exception {
    String uri = getHttpUri().toASCIIString();

    log.debug("Connecting to {}", uri);
    websocket = getClient().prepareGet(uri).execute(
        new WebSocketUpgradeHandler.Builder()
            .addWebSocketListener(listener).build()).get();
}

不幸的是,AHC-WS 组件不能作为 Spring bean 使用,但可以从 application.yml:

中指向自定义客户端
camel:
  component:
    ahc-ws:
      client: myCustomAsyncHttpClientImpl

像这样,可以自定义websocket升级握手