java 用于连接 activemq 以从 activemq 推送和获取消息的双向流的 grpc 服务器

java grpc server for a bi-directional stream that connect activemq to push & get message from activemq

我正在尝试为连接 activemq 的双向流编写一个 java grpc 服务器,以从 activemq 推送和获取消息

我能够在服务器上实例化一个 Stream 观察器。然而,问题是一旦我在服务器 StreamObserver 上调用 onNext 函数时从 grpc 客户端获取数据。我正在尝试使用 JMSTemplate 在 activemq 中推送消息。我总是将 jmsTemplate 设置为 null。

我正在如下配置 JMSTemplate 以创建 JMSTemplate,

@Configuration
@EnableJms
public class JmsConfig {

    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(
            ConnectionFactory connectionFactory) {

        DefaultJmsListenerContainerFactory factory
                = new DefaultJmsListenerContainerFactory();

        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrency("5-10");

        return factory;
    }
}

application.yml

spring:
  application-name: bidirectional-server
  activemq:
    broker-url: tcp://localhost:61616
    user: admin
    password: admin
    packages.trust-all: true

HelloService.java

@GrpcService

public class HelloService extends HelloServiceGrpc.HelloServiceImplBase {

    /**
     * @param responseObserver
     */
    @Override
    public StreamObserver<HelloEnvelope> transfer(StreamObserver<HelloEnvelope> responseObserver) {
        return new TransferHelloRequestStreaming(responseObserver);
    }
}

TransferHelloRequestStreaming.java

@Slf4j
@Service
public class TransferHelloRequestStreaming implements StreamObserver<HelloEnvelope> {


    private  StreamObserver<HelloEnvelope> transferHelloResponseStreamObserver;

    private HelloEnvelope transferHelloResponse = null;
    public TransferMessageRequestStreaming() {
    }
    public TransferHelloRequestStreaming(StreamObserver<HelloEnvelope> transferHelloResponseStreamObserver) {
        this.transferHelloResponseStreamObserver = transferHelloResponseStreamObserver;
    }

    @Override
    public void onNext(HelloEnvelope transferHelloRequest) {

   
       new HelloProducer().sendTo("input",transferHelloRequest);

  
    }

    @Override
    public void onError(Throwable throwable) {

    }

    @Override
    public void onCompleted() {
     this.transferHelloResponseStreamObserver.onCompleted();
    }

}

HelloProducer.java

@Service
@Slf4j
public class HelloProducer {

  @Autowired
  JmsTemplate jmsTemplate;

  public void sendTo(String destination, MessageEnvelope transferHelloRequest) {
 
    jmsTemplate.convertAndSend(destination, transferHelloRequest);

  }
}

HelloService.proto

syntax = "proto3";

import "google/protobuf/any.proto";
option java_multiple_files = true;
option java_outer_classname = "HelloProto";
package com.server.grpcserver;

service HelloService {
  
    rpc transfer(stream HelloEnvelope) returns
        (stream HelloEnvelope) {
    }
}

message HelloEnvelope {
    map<string, google.protobuf.Any> header =1;
    bytes body =2;
}

在 HelloProducer.java 中,我总是将 jmsTemplate 设置为 null。一旦我在 onNext() 中从 grpc 客户端获取数据,我想将数据推送到 activeMQ 中。

new HelloProducer()

您还需要 @Autowire HelloProducer - 您正在创建一个新实例(不由 Spring 管理,因此它没有任何自动连接) .