Google PubSub with Dataflow 使用 Spring 集成
Google PubSub with Dataflow using Spring Integration
使用 Spring framework
和 Google Cloud Platform
开发实时跟踪系统。
Spring Cloud GCP
可以轻松编写 GCP PubSub
应用程序 Spring Integration
方式。从他们的 github 页面,我能够编写如下应用程序:Github Samples
@Configuration
@Slf4j
public class GCPConfiguration {
/*
* Message sender code
* */
@Bean
@ServiceActivator(inputChannel = "pubSubOutputChannel")
public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
PubSubMessageHandler adapter =
new PubSubMessageHandler(pubsubTemplate, "exampleTopic");
adapter.setPublishCallback(new ListenableFutureCallback<String>() {
@Override
public void onFailure(Throwable ex) {
log.info("There was an error sending the message.");
}
@Override
public void onSuccess(String result) {
log.info("Message was sent successfully.");
}
});
return adapter;
}
@MessagingGateway(defaultRequestChannel = "pubSubOutputChannel")
public interface PubSubOutboundGateway {
void sendToPubSub(String text);
}
/*
* Message receiver code
* */
@Bean
public MessageChannel pubsubInputChannel() {
return new DirectChannel();
}
@Bean
public PubSubInboundChannelAdapter messageChannelAdapter(
@Qualifier("pubsubInputChannel") MessageChannel inputChannel,
PubSubOperations pubSubTemplate) {
PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplate, "exampleSubscription");
adapter.setOutputChannel(inputChannel);
adapter.setAckMode(AckMode.MANUAL);
return adapter;
}
@ServiceActivator(inputChannel = "pubsubInputChannel")
public void messageReceiver(String payload, @Header(GcpHeaders.ACKNOWLEDGEMENT) AckReplyConsumer ackReplyConsumer) {
log.info("Message arrived! Payload: " + payload);
ackReplyConsumer.ack();
}
}
跟踪设备将持续向此应用程序公开的 TCP
端口发送数据,需要对其进行转换,然后持久保存到 BigQuery
和 GC SQL
。从 TCP
端口获取数据并将其发布到 GC PubSub
已经到位。
我不知道的是如何以及在何处添加来自 GC PubSub
的 Google Cloud Dataflow
代码
更新
目标是向 GC BigQuery
和 GC SQL
插入数据,因此可以将数据插入到这些服务中。
使用 Spring framework
和 Google Cloud Platform
开发实时跟踪系统。
Spring Cloud GCP
可以轻松编写 GCP PubSub
应用程序 Spring Integration
方式。从他们的 github 页面,我能够编写如下应用程序:Github Samples
@Configuration
@Slf4j
public class GCPConfiguration {
/*
* Message sender code
* */
@Bean
@ServiceActivator(inputChannel = "pubSubOutputChannel")
public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
PubSubMessageHandler adapter =
new PubSubMessageHandler(pubsubTemplate, "exampleTopic");
adapter.setPublishCallback(new ListenableFutureCallback<String>() {
@Override
public void onFailure(Throwable ex) {
log.info("There was an error sending the message.");
}
@Override
public void onSuccess(String result) {
log.info("Message was sent successfully.");
}
});
return adapter;
}
@MessagingGateway(defaultRequestChannel = "pubSubOutputChannel")
public interface PubSubOutboundGateway {
void sendToPubSub(String text);
}
/*
* Message receiver code
* */
@Bean
public MessageChannel pubsubInputChannel() {
return new DirectChannel();
}
@Bean
public PubSubInboundChannelAdapter messageChannelAdapter(
@Qualifier("pubsubInputChannel") MessageChannel inputChannel,
PubSubOperations pubSubTemplate) {
PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplate, "exampleSubscription");
adapter.setOutputChannel(inputChannel);
adapter.setAckMode(AckMode.MANUAL);
return adapter;
}
@ServiceActivator(inputChannel = "pubsubInputChannel")
public void messageReceiver(String payload, @Header(GcpHeaders.ACKNOWLEDGEMENT) AckReplyConsumer ackReplyConsumer) {
log.info("Message arrived! Payload: " + payload);
ackReplyConsumer.ack();
}
}
跟踪设备将持续向此应用程序公开的 TCP
端口发送数据,需要对其进行转换,然后持久保存到 BigQuery
和 GC SQL
。从 TCP
端口获取数据并将其发布到 GC PubSub
已经到位。
我不知道的是如何以及在何处添加来自 GC PubSub
Google Cloud Dataflow
代码
更新
目标是向 GC BigQuery
和 GC SQL
插入数据,因此可以将数据插入到这些服务中。