spring 启动时绑定 RSocket 和 gRPC 的问题

The problems with binding RSocket and gRPC in spring boot

各位! 我想到了使用 gRPC protobuff 代码生成实现作为数据层 API 来使用它而不是 RSocket 协议中的 POJO。

实现如下:

syntax = "proto3";

import "google/protobuf/wrappers.proto";

option java_package = "me.some.protoapi";
option java_multiple_files = true;

message ValidationTaskRequest {
    int64 id = 1;
    string name = 2;
}

message ValidationTaskResponse {
    int64 id = 1;
    ValidationStatus status = 2;
    ValidationError error = 3;
}

message ValidationError {
    string reason = 1;
}

enum ValidationStatus {
    PASSED = 0;
    DECLINED = 1;
}

RSocket 配置

@Configuration
public class RSocketConfiguration {

    @Bean
    public RSocket rSocket(@Value("${rsocket.client.port}") int port) {
        return RSocketFactory
            .connect()
            .mimeType(MimeTypeUtils.ALL_VALUE, MimeTypeUtils.ALL_VALUE)
            .frameDecoder(PayloadDecoder.ZERO_COPY)
            .transport(TcpClientTransport.create(port))
            .start()
            .retry()
            .block();
    }

    @Bean
    public RSocketRequester rSocketRequester(RSocket rSocket, RSocketStrategies rSocketStrategies) {
        return RSocketRequester.wrap(
            rSocket,
            MimeTypeUtils.ALL,
            MimeTypeUtils.ALL,
            rSocketStrategies
        );
    }

}

和服务本身

@Service
public class ValidationServiceImpl implements ValidationService {

    private final Logger logger = LoggerFactory.getLogger(ValidationServiceImpl.class);

    private final TaskService taskService;
    private final ReactiveRedisTemplate<String, Task> redis;
    private final RSocketRequester rSocketRequester;
    private final RedisTopicHelper redisHelper;

    @Value("${rsocket.routes.validation}")
    private String rSocketValidationRoute;

    @Value("${validation.interval}")
    private Optional<Integer> validationInterval;

    public ValidationServiceImpl(TaskService taskService, ReactiveRedisTemplate<String, Task> redis, RSocketRequester rSocketRequester, RedisTopicHelper redisHelper) {
        this.taskService = taskService;
        this.redis = redis;
        this.rSocketRequester = rSocketRequester;
        this.redisHelper = redisHelper;
    }

    @Override
    public void startValidationProcess() {
        logger.info("validation listener started");

        Flux.interval(Duration.ofMillis(validationInterval.orElse(1000)))
            .flatMap(i -> redis.keys(redisHelper.topicAllKeys()))
            .flatMap(redis.opsForValue()::get)
            .filter(it -> !it.isVerified())
            .flatMap(this::requestValidation)
            .log()
            .metrics()
            .subscribe(result -> {
                if ( result.getError() != null ) {
                    logger.error(result.getError().getReason());
                } else {
                    taskService.markTaskAsVerified(result.getId(), result.getStatus());
                    redis.opsForValue().delete(redisHelper.specifiedTopicWithId(result.getId()));
                }
            });
    }

    private Mono<ValidationTaskResponse> requestValidation(Task task) {
        return rSocketRequester
            .route(rSocketValidationRoute)
            .data(
                ValidationTaskRequest
                    .newBuilder()
                    .setId(task.getId())
                    .setName(task.getName())
            )
            .retrieveMono(ValidationTaskResponse.class);
    }

}

但是,当 spring 启动服务启动时,我发现了异常

java.lang.IllegalArgumentException: No decoder for me.some.protoapi.ValidationTaskResponse
    at org.springframework.messaging.rsocket.RSocketStrategies.decoder(RSocketStrategies.java:92) ~[spring-messaging-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.messaging.rsocket.DefaultRSocketRequester$DefaultRequestSpec.retrieveMono(DefaultRSocketRequester.java:274) ~[spring-messaging-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.messaging.rsocket.DefaultRSocketRequester$DefaultRequestSpec.retrieveMono(DefaultRSocketRequester.java:258) ~[spring-messaging-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at me.some.test.api.services.validation.ValidationServiceImpl.requestValidation(ValidationServiceImpl.java:73) ~[main/:na]
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:378) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
    at reactor.core.publisher.FluxFilter$FilterSubscriber.onNext(FluxFilter.java:107) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmit(FluxFlatMap.java:530) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
    at reactor.core.publisher.FluxFlatMap$FlatMapInner.onNext(FluxFlatMap.java:972) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
    at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
    at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onNext(FluxUsingWhen.java:355) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
    at reactor.core.publisher.FluxFilter$FilterSubscriber.onNext(FluxFilter.java:107) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
    at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:274) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
    at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:851) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
    at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:92) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
    at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
    at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
    at io.lettuce.core.RedisPublisher$ImmediateSubscriber.onNext(RedisPublisher.java:888) ~[lettuce-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at io.lettuce.core.RedisPublisher$RedisSubscription.onNext(RedisPublisher.java:281) ~[lettuce-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at io.lettuce.core.RedisPublisher$SubscriptionCommand.complete(RedisPublisher.java:756) ~[lettuce-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at io.lettuce.core.protocol.CommandWrapper.complete(CommandWrapper.java:59) ~[lettuce-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at io.lettuce.core.protocol.CommandHandler.complete(CommandHandler.java:654) ~[lettuce-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:614) ~[lettuce-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at io.lettuce.core.protocol.CommandHandler.channelRead(CommandHandler.java:565) ~[lettuce-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
    at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

如您所见,问题出在 RSocket 传输层中解码原型模型。我知道,gRPC 是一种二进制序列化,它对下面的二进制描述有一些魔力。有没有人试过bing这两种技术?任何想法都会非常有帮助。谢谢。

我从来没有遇到过这个问题,但我已经为 JSON 负载配置了我的接收器和请求器。 例如,您可以尝试设置八位字节流 MIME 类型吗?

    @Bean
    RSocket rSocket() {
        return RSocketFactory
                .connect()
                .metadataMimeType("message/x.rsocket.composite-metadata.v0")
                .frameDecoder(PayloadDecoder.ZERO_COPY)
                .dataMimeType(MimeTypeUtils.APPLICATION_JSON_VALUE)
                .transport(TcpClientTransport.create(ztlServerHostname, port))
                .start()
                .block();
    }



    @Bean
    RSocketRequester rSocketRequester(RSocket rSocket, RSocketStrategies rSocketStrategies) {
        return RSocketRequester.wrap(rSocket, MimeTypeUtils.APPLICATION_JSON,
                MimeTypeUtils.parseMimeType("message/x.rsocket.composite-metadata.v0"),
                rSocketStrategies);
    }

完整代码为here

rsocketStrategies

注册时可能是ProtobufDecoder()可以帮到你
rsocketRequesterBuilder
  .rsocketStrategies(b -> b.decoder(new ProtobufDecoder()))
     .connectTcp("localhost",7000)
     .block();