Google PubSub with Dataflow 使用 Spring 集成

Google PubSub with Dataflow using Spring Integration

使用 Spring frameworkGoogle Cloud Platform 开发实时跟踪系统。 Spring Cloud GCP 可以轻松编写 GCP PubSub 应用程序 Spring Integration 方式。从他们的 github 页面,我能够编写如下应用程序:Github Samples

@Configuration
@Slf4j
public class GCPConfiguration {
    /*
    *   Message sender code
    * */
    @Bean
    @ServiceActivator(inputChannel = "pubSubOutputChannel")
    public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
        PubSubMessageHandler adapter =
                new PubSubMessageHandler(pubsubTemplate, "exampleTopic");
        adapter.setPublishCallback(new ListenableFutureCallback<String>() {
            @Override
            public void onFailure(Throwable ex) {
                log.info("There was an error sending the message.");
            }

            @Override
            public void onSuccess(String result) {
                log.info("Message was sent successfully.");
            }
        });

        return adapter;
    }

    @MessagingGateway(defaultRequestChannel = "pubSubOutputChannel")
    public interface PubSubOutboundGateway {
        void sendToPubSub(String text);
    }

    /*
    *   Message receiver code
    * */
    @Bean
    public MessageChannel pubsubInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public PubSubInboundChannelAdapter messageChannelAdapter(
            @Qualifier("pubsubInputChannel") MessageChannel inputChannel,
            PubSubOperations pubSubTemplate) {
        PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplate, "exampleSubscription");
        adapter.setOutputChannel(inputChannel);
        adapter.setAckMode(AckMode.MANUAL);
        return adapter;
    }

    @ServiceActivator(inputChannel = "pubsubInputChannel")
    public void messageReceiver(String payload, @Header(GcpHeaders.ACKNOWLEDGEMENT) AckReplyConsumer ackReplyConsumer) {
        log.info("Message arrived! Payload: " + payload);

        ackReplyConsumer.ack();
    }
}

跟踪设备将持续向此应用程序公开的 TCP 端口发送数据,需要对其进行转换,然后持久保存到 BigQueryGC SQL。从 TCP 端口获取数据并将其发布到 GC PubSub 已经到位。 我不知道的是如何以及在何处添加来自 GC PubSub

Google Cloud Dataflow 代码

更新

目标是向 GC BigQueryGC SQL 插入数据,因此可以将数据插入到这些服务中。

您的问题似乎是如何使用 Google Cloud Dataflow 从 Pub/Sub 流式传输到 Bigquery。

Dataflow 站点链接到此 example from its examples page. Note that this is using version 1.x of the SDK and not the 2.x Apache Beam version. You can find a similar example here

还有一个Google提供的template可以使用,不需要编码。

编辑:该模板最近是开源的,可在 github 上找到。