ZeroMQ with Spring (spring-integration-zeromq)
ZeroMQ with Spring (spring-integration-zeromq)
我正在使用 spring-integration-zeromq,我正在尝试设置身份验证设置。
@Bean
ZeroMqChannel zeroMqPubSubChannel(ZContext context, ObjectMapper objectMapper) {
ZeroMqChannel channel = new ZeroMqChannel(context, true);
channel.setConnectUrl("tcp://localhost:6001:6002");
channel.setConsumeDelay(Duration.ofMillis(100));
channel.setMessageConverter(new GenericMessageConverter());
channel.setSendSocketConfigurer(socket -> {
socket.setZAPDomain("global".getBytes());
socket.setCurveServer(true);
socket.setCurvePublicKey("my_public_key".getBytes());
socket.setCurveSecretKey("my_secret_key".getBytes());
});
EmbeddedJsonHeadersMessageMapper mapper = new EmbeddedJsonHeadersMessageMapper(objectMapper);
channel.setMessageMapper(mapper);
channel.afterPropertiesSet();
channel.subscribe(m -> System.out.println(m));
return channel;
}
但是,setSendSocketConfigurer 的结果似乎被忽略了。
在 org.springframework.integration.zeromq.channel.ZeroMqChannel 中,sendSocketConnectionConfigurer
被初始化为一个空的 lambda 并传递给 prepareSendSocketMono
;所以我调用 setSendSocketConfigurer
因此似乎没有任何效果,因为它只替换了 ZeroMqChannel 实例中的 属性,但没有应用于当时已经创建的套接字单声道。如何正确设置身份验证?我错过了什么吗?
UPD1
在 Artem Bilan 提供修复后,套接字配置器似乎已开始应用于通道,但通信停止工作。我已经应用了建议并尝试设置 ZeroMqProxy,希望它能为我提供一个解决方法,但仍然没有成功:即使我在同一配置中的日志订阅也没有通过身份验证(尽管如果我删除套接字配置器它会工作电话)
@Configuration
public class ZeroMQConfig {
@Bean
ZeroMqProxy zeroMqProxy(ZContext context, @Value("${zmq.channel.port.frontend}") int frontendPort,
@Value("${zmq.channel.port.backend}") int backendPort) {
ZeroMqProxy proxy = new ZeroMqProxy(context, ZeroMqProxy.Type.SUB_PUB);
proxy.setExposeCaptureSocket(true);
proxy.setFrontendPort(frontendPort);
proxy.setBackendPort(backendPort);
ZCert cert = new ZCert();
proxy.setFrontendSocketConfigurer(socket -> {
socket.setCurvePublicKey(cert.getPublicKey());
socket.setCurveSecretKey(cert.getSecretKey());
socket.setCurveServerKey(Z85.decode("my_server_public_key"));
});
proxy.setBackendSocketConfigurer(socket -> {
socket.setCurvePublicKey(cert.getPublicKey());
socket.setCurveSecretKey(cert.getSecretKey());
socket.setCurveServerKey(Z85.decode("my_server_public_key"));
});
return proxy;
}
@Bean
public ZContext zContext() {
ZContext context = new ZContext();
ZAuth auth = new ZAuth(context);
auth.configureCurve(ZAuth.CURVE_ALLOW_ANY);
auth.setVerbose(true);
return context;
}
@Bean(name = "zeroMqPubChannel")
ZeroMqChannel zeroMqPubChannel(ZContext context, ObjectMapper objectMapper, ZeroMqProxy proxy){
ZeroMqChannel channel = new ZeroMqChannel(context, true);
channel.setZeroMqProxy(proxy);
channel.setConsumeDelay(Duration.ofMillis(100));
channel.setMessageConverter(new GenericMessageConverter());
EmbeddedJsonHeadersMessageMapper mapper = new EmbeddedJsonHeadersMessageMapper(objectMapper);
channel.setMessageMapper(mapper);
return channel;
}
@Bean
@ServiceActivator(inputChannel = "zeroMqPubChannel")
public MessageHandler subscribe() {
return message -> System.out.println(message);
}
}
是的...我明白你的意思了。这是一个错误:我们必须推迟 his.sendSocketConfigurer
的使用,直到真正与 socket
发生交互。我会尽快解决的。
现在对您的配置做几点说明:
您不能自己打电话给 afterPropertiesSet()
。让 Spring 应用程序上下文为您管理其回调!
您不得在其 bean 定义中订阅 MessageChannel
。相反考虑有一个@ServiceActivator(inputChannel = "zeroMqPubSubChannel")
。在文档中查看更多信息:https://docs.spring.io/spring-integration/reference/html/messaging-endpoints.html#service-activator
遗憾的是,无法将该自定义传递到内部 ZMQ.Socket
实例中...
更新
在 ZeroMQ 中使用 Curve auth 的工作测试:
@Test
void testPubSubWithCurve() throws InterruptedException {
ZContext CONTEXT = new ZContext();
new ZAuth(CONTEXT).configureCurve(ZAuth.CURVE_ALLOW_ANY).setVerbose(true);
ZMQ.Curve.KeyPair frontendKeyPair = ZMQ.Curve.generateKeyPair();
ZMQ.Curve.KeyPair backendKeyPair = ZMQ.Curve.generateKeyPair();
ZeroMqProxy proxy = new ZeroMqProxy(CONTEXT, ZeroMqProxy.Type.SUB_PUB);
proxy.setBeanName("subPubCurveProxy");
proxy.setFrontendSocketConfigurer(socket -> {
socket.setZAPDomain("global".getBytes());
socket.setCurveServer(true);
socket.setCurvePublicKey(frontendKeyPair.publicKey.getBytes());
socket.setCurveSecretKey(frontendKeyPair.secretKey.getBytes());
});
proxy.setBackendSocketConfigurer(socket -> {
socket.setZAPDomain("global".getBytes());
socket.setCurveServer(true);
socket.setCurvePublicKey(backendKeyPair.publicKey.getBytes());
socket.setCurveSecretKey(backendKeyPair.secretKey.getBytes());
});
proxy.afterPropertiesSet();
proxy.start();
ZeroMqChannel channel = new ZeroMqChannel(CONTEXT, true);
channel.setZeroMqProxy(proxy);
channel.setBeanName("testChannelWithCurve");
channel.setSendSocketConfigurer(socket -> {
ZCert clientCert = new ZCert();
socket.setCurvePublicKey(clientCert.getPublicKey());
socket.setCurveSecretKey(clientCert.getSecretKey());
socket.setCurveServerKey(frontendKeyPair.publicKey.getBytes());
});
channel.setSubscribeSocketConfigurer(socket -> {
ZCert clientCert = new ZCert();
socket.setCurvePublicKey(clientCert.getPublicKey());
socket.setCurveSecretKey(clientCert.getSecretKey());
socket.setCurveServerKey(backendKeyPair.publicKey.getBytes());
}
);
channel.setConsumeDelay(Duration.ofMillis(10));
channel.afterPropertiesSet();
BlockingQueue<Message<?>> received = new LinkedBlockingQueue<>();
channel.subscribe(received::offer);
channel.subscribe(received::offer);
await().until(() -> proxy.getBackendPort() > 0);
// Give it some time to connect and subscribe
Thread.sleep(1000);
GenericMessage<String> testMessage = new GenericMessage<>("test1");
assertThat(channel.send(testMessage)).isTrue();
Message<?> message = received.poll(10, TimeUnit.SECONDS);
assertThat(message).isNotNull().isEqualTo(testMessage);
message = received.poll(10, TimeUnit.SECONDS);
assertThat(message).isNotNull().isEqualTo(testMessage);
channel.destroy();
proxy.stop();
CONTEXT.close();
}
我正在使用 spring-integration-zeromq,我正在尝试设置身份验证设置。
@Bean
ZeroMqChannel zeroMqPubSubChannel(ZContext context, ObjectMapper objectMapper) {
ZeroMqChannel channel = new ZeroMqChannel(context, true);
channel.setConnectUrl("tcp://localhost:6001:6002");
channel.setConsumeDelay(Duration.ofMillis(100));
channel.setMessageConverter(new GenericMessageConverter());
channel.setSendSocketConfigurer(socket -> {
socket.setZAPDomain("global".getBytes());
socket.setCurveServer(true);
socket.setCurvePublicKey("my_public_key".getBytes());
socket.setCurveSecretKey("my_secret_key".getBytes());
});
EmbeddedJsonHeadersMessageMapper mapper = new EmbeddedJsonHeadersMessageMapper(objectMapper);
channel.setMessageMapper(mapper);
channel.afterPropertiesSet();
channel.subscribe(m -> System.out.println(m));
return channel;
}
但是,setSendSocketConfigurer 的结果似乎被忽略了。
在 org.springframework.integration.zeromq.channel.ZeroMqChannel 中,sendSocketConnectionConfigurer
被初始化为一个空的 lambda 并传递给 prepareSendSocketMono
;所以我调用 setSendSocketConfigurer
因此似乎没有任何效果,因为它只替换了 ZeroMqChannel 实例中的 属性,但没有应用于当时已经创建的套接字单声道。如何正确设置身份验证?我错过了什么吗?
UPD1
在 Artem Bilan 提供修复后,套接字配置器似乎已开始应用于通道,但通信停止工作。我已经应用了建议并尝试设置 ZeroMqProxy,希望它能为我提供一个解决方法,但仍然没有成功:即使我在同一配置中的日志订阅也没有通过身份验证(尽管如果我删除套接字配置器它会工作电话)
@Configuration
public class ZeroMQConfig {
@Bean
ZeroMqProxy zeroMqProxy(ZContext context, @Value("${zmq.channel.port.frontend}") int frontendPort,
@Value("${zmq.channel.port.backend}") int backendPort) {
ZeroMqProxy proxy = new ZeroMqProxy(context, ZeroMqProxy.Type.SUB_PUB);
proxy.setExposeCaptureSocket(true);
proxy.setFrontendPort(frontendPort);
proxy.setBackendPort(backendPort);
ZCert cert = new ZCert();
proxy.setFrontendSocketConfigurer(socket -> {
socket.setCurvePublicKey(cert.getPublicKey());
socket.setCurveSecretKey(cert.getSecretKey());
socket.setCurveServerKey(Z85.decode("my_server_public_key"));
});
proxy.setBackendSocketConfigurer(socket -> {
socket.setCurvePublicKey(cert.getPublicKey());
socket.setCurveSecretKey(cert.getSecretKey());
socket.setCurveServerKey(Z85.decode("my_server_public_key"));
});
return proxy;
}
@Bean
public ZContext zContext() {
ZContext context = new ZContext();
ZAuth auth = new ZAuth(context);
auth.configureCurve(ZAuth.CURVE_ALLOW_ANY);
auth.setVerbose(true);
return context;
}
@Bean(name = "zeroMqPubChannel")
ZeroMqChannel zeroMqPubChannel(ZContext context, ObjectMapper objectMapper, ZeroMqProxy proxy){
ZeroMqChannel channel = new ZeroMqChannel(context, true);
channel.setZeroMqProxy(proxy);
channel.setConsumeDelay(Duration.ofMillis(100));
channel.setMessageConverter(new GenericMessageConverter());
EmbeddedJsonHeadersMessageMapper mapper = new EmbeddedJsonHeadersMessageMapper(objectMapper);
channel.setMessageMapper(mapper);
return channel;
}
@Bean
@ServiceActivator(inputChannel = "zeroMqPubChannel")
public MessageHandler subscribe() {
return message -> System.out.println(message);
}
}
是的...我明白你的意思了。这是一个错误:我们必须推迟 his.sendSocketConfigurer
的使用,直到真正与 socket
发生交互。我会尽快解决的。
现在对您的配置做几点说明:
您不能自己打电话给 afterPropertiesSet()
。让 Spring 应用程序上下文为您管理其回调!
您不得在其 bean 定义中订阅 MessageChannel
。相反考虑有一个@ServiceActivator(inputChannel = "zeroMqPubSubChannel")
。在文档中查看更多信息:https://docs.spring.io/spring-integration/reference/html/messaging-endpoints.html#service-activator
遗憾的是,无法将该自定义传递到内部 ZMQ.Socket
实例中...
更新
在 ZeroMQ 中使用 Curve auth 的工作测试:
@Test
void testPubSubWithCurve() throws InterruptedException {
ZContext CONTEXT = new ZContext();
new ZAuth(CONTEXT).configureCurve(ZAuth.CURVE_ALLOW_ANY).setVerbose(true);
ZMQ.Curve.KeyPair frontendKeyPair = ZMQ.Curve.generateKeyPair();
ZMQ.Curve.KeyPair backendKeyPair = ZMQ.Curve.generateKeyPair();
ZeroMqProxy proxy = new ZeroMqProxy(CONTEXT, ZeroMqProxy.Type.SUB_PUB);
proxy.setBeanName("subPubCurveProxy");
proxy.setFrontendSocketConfigurer(socket -> {
socket.setZAPDomain("global".getBytes());
socket.setCurveServer(true);
socket.setCurvePublicKey(frontendKeyPair.publicKey.getBytes());
socket.setCurveSecretKey(frontendKeyPair.secretKey.getBytes());
});
proxy.setBackendSocketConfigurer(socket -> {
socket.setZAPDomain("global".getBytes());
socket.setCurveServer(true);
socket.setCurvePublicKey(backendKeyPair.publicKey.getBytes());
socket.setCurveSecretKey(backendKeyPair.secretKey.getBytes());
});
proxy.afterPropertiesSet();
proxy.start();
ZeroMqChannel channel = new ZeroMqChannel(CONTEXT, true);
channel.setZeroMqProxy(proxy);
channel.setBeanName("testChannelWithCurve");
channel.setSendSocketConfigurer(socket -> {
ZCert clientCert = new ZCert();
socket.setCurvePublicKey(clientCert.getPublicKey());
socket.setCurveSecretKey(clientCert.getSecretKey());
socket.setCurveServerKey(frontendKeyPair.publicKey.getBytes());
});
channel.setSubscribeSocketConfigurer(socket -> {
ZCert clientCert = new ZCert();
socket.setCurvePublicKey(clientCert.getPublicKey());
socket.setCurveSecretKey(clientCert.getSecretKey());
socket.setCurveServerKey(backendKeyPair.publicKey.getBytes());
}
);
channel.setConsumeDelay(Duration.ofMillis(10));
channel.afterPropertiesSet();
BlockingQueue<Message<?>> received = new LinkedBlockingQueue<>();
channel.subscribe(received::offer);
channel.subscribe(received::offer);
await().until(() -> proxy.getBackendPort() > 0);
// Give it some time to connect and subscribe
Thread.sleep(1000);
GenericMessage<String> testMessage = new GenericMessage<>("test1");
assertThat(channel.send(testMessage)).isTrue();
Message<?> message = received.poll(10, TimeUnit.SECONDS);
assertThat(message).isNotNull().isEqualTo(testMessage);
message = received.poll(10, TimeUnit.SECONDS);
assertThat(message).isNotNull().isEqualTo(testMessage);
channel.destroy();
proxy.stop();
CONTEXT.close();
}