java 用于连接 activemq 以从 activemq 推送和获取消息的双向流的 grpc 服务器
java grpc server for a bi-directional stream that connect activemq to push & get message from activemq
我正在尝试为连接 activemq 的双向流编写一个 java grpc 服务器,以从 activemq 推送和获取消息
我能够在服务器上实例化一个 Stream 观察器。然而,问题是一旦我在服务器 StreamObserver 上调用 onNext 函数时从 grpc 客户端获取数据。我正在尝试使用 JMSTemplate 在 activemq 中推送消息。我总是将 jmsTemplate 设置为 null。
我正在如下配置 JMSTemplate 以创建 JMSTemplate,
@Configuration
@EnableJms
public class JmsConfig {
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(
ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory
= new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrency("5-10");
return factory;
}
}
application.yml
spring:
application-name: bidirectional-server
activemq:
broker-url: tcp://localhost:61616
user: admin
password: admin
packages.trust-all: true
HelloService.java
@GrpcService
public class HelloService extends HelloServiceGrpc.HelloServiceImplBase {
/**
* @param responseObserver
*/
@Override
public StreamObserver<HelloEnvelope> transfer(StreamObserver<HelloEnvelope> responseObserver) {
return new TransferHelloRequestStreaming(responseObserver);
}
}
TransferHelloRequestStreaming.java
@Slf4j
@Service
public class TransferHelloRequestStreaming implements StreamObserver<HelloEnvelope> {
private StreamObserver<HelloEnvelope> transferHelloResponseStreamObserver;
private HelloEnvelope transferHelloResponse = null;
public TransferMessageRequestStreaming() {
}
public TransferHelloRequestStreaming(StreamObserver<HelloEnvelope> transferHelloResponseStreamObserver) {
this.transferHelloResponseStreamObserver = transferHelloResponseStreamObserver;
}
@Override
public void onNext(HelloEnvelope transferHelloRequest) {
new HelloProducer().sendTo("input",transferHelloRequest);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onCompleted() {
this.transferHelloResponseStreamObserver.onCompleted();
}
}
HelloProducer.java
@Service
@Slf4j
public class HelloProducer {
@Autowired
JmsTemplate jmsTemplate;
public void sendTo(String destination, MessageEnvelope transferHelloRequest) {
jmsTemplate.convertAndSend(destination, transferHelloRequest);
}
}
HelloService.proto
syntax = "proto3";
import "google/protobuf/any.proto";
option java_multiple_files = true;
option java_outer_classname = "HelloProto";
package com.server.grpcserver;
service HelloService {
rpc transfer(stream HelloEnvelope) returns
(stream HelloEnvelope) {
}
}
message HelloEnvelope {
map<string, google.protobuf.Any> header =1;
bytes body =2;
}
在 HelloProducer.java 中,我总是将 jmsTemplate 设置为 null。一旦我在 onNext() 中从 grpc 客户端获取数据,我想将数据推送到 activeMQ 中。
new HelloProducer()
您还需要 @Autowire
HelloProducer
- 您正在创建一个新实例(不由 Spring 管理,因此它没有任何自动连接) .
我正在尝试为连接 activemq 的双向流编写一个 java grpc 服务器,以从 activemq 推送和获取消息
我能够在服务器上实例化一个 Stream 观察器。然而,问题是一旦我在服务器 StreamObserver 上调用 onNext 函数时从 grpc 客户端获取数据。我正在尝试使用 JMSTemplate 在 activemq 中推送消息。我总是将 jmsTemplate 设置为 null。
我正在如下配置 JMSTemplate 以创建 JMSTemplate,
@Configuration
@EnableJms
public class JmsConfig {
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(
ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory
= new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrency("5-10");
return factory;
}
}
application.yml
spring:
application-name: bidirectional-server
activemq:
broker-url: tcp://localhost:61616
user: admin
password: admin
packages.trust-all: true
HelloService.java
@GrpcService
public class HelloService extends HelloServiceGrpc.HelloServiceImplBase {
/**
* @param responseObserver
*/
@Override
public StreamObserver<HelloEnvelope> transfer(StreamObserver<HelloEnvelope> responseObserver) {
return new TransferHelloRequestStreaming(responseObserver);
}
}
TransferHelloRequestStreaming.java
@Slf4j
@Service
public class TransferHelloRequestStreaming implements StreamObserver<HelloEnvelope> {
private StreamObserver<HelloEnvelope> transferHelloResponseStreamObserver;
private HelloEnvelope transferHelloResponse = null;
public TransferMessageRequestStreaming() {
}
public TransferHelloRequestStreaming(StreamObserver<HelloEnvelope> transferHelloResponseStreamObserver) {
this.transferHelloResponseStreamObserver = transferHelloResponseStreamObserver;
}
@Override
public void onNext(HelloEnvelope transferHelloRequest) {
new HelloProducer().sendTo("input",transferHelloRequest);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onCompleted() {
this.transferHelloResponseStreamObserver.onCompleted();
}
}
HelloProducer.java
@Service
@Slf4j
public class HelloProducer {
@Autowired
JmsTemplate jmsTemplate;
public void sendTo(String destination, MessageEnvelope transferHelloRequest) {
jmsTemplate.convertAndSend(destination, transferHelloRequest);
}
}
HelloService.proto
syntax = "proto3";
import "google/protobuf/any.proto";
option java_multiple_files = true;
option java_outer_classname = "HelloProto";
package com.server.grpcserver;
service HelloService {
rpc transfer(stream HelloEnvelope) returns
(stream HelloEnvelope) {
}
}
message HelloEnvelope {
map<string, google.protobuf.Any> header =1;
bytes body =2;
}
在 HelloProducer.java 中,我总是将 jmsTemplate 设置为 null。一旦我在 onNext() 中从 grpc 客户端获取数据,我想将数据推送到 activeMQ 中。
new HelloProducer()
您还需要 @Autowire
HelloProducer
- 您正在创建一个新实例(不由 Spring 管理,因此它没有任何自动连接) .