将消息从 SQS 读入 Dataflow
Read messages from SQS into Dataflow
我在 AWS S3 中生成了一堆数据,只要有新文件到达 S3,就会收到 PUT 通知 being sent to SQS。我想将这些文件的内容加载到 BigQuery 中,因此我正在努力在 Google Dataflow 中设置一个简单的 ETL。但是,我不知道如何将 Dataflow 与它尚未支持的任何服务(Pubsub、Google Cloud Storage 等)集成。
In the initial release of Cloud Dataflow, extensibility for Read and Write transforms has not been implemented.
我想我可以确认这一点,因为我试图编写一个读取转换,但无法弄清楚如何使其工作(我试图将 SqsIO class 基于提供的 PubsubIO class).
所以我一直在考虑为 Dataflow 编写一个自定义的 source,但我无法思考如何使 Source 适应轮询 SQS 以进行更改。无论如何,它看起来都不是正确的抽象,但我真的不在乎我是否能让它工作。
此外,看起来我必须做一些工作才能下载 S3 文件(我尝试为此创建一个 Reader,但没有运气 b/c 上述原因).
基本上,我被卡住了。对于将 SQS 和 S3 与 Dataflow 集成的任何建议,我们将不胜感激。
Dataflow Java SDK 现在包含一个 API 用于定义自定义无限源:
这可用于实现自定义 SQS 源。
我在 AWS S3 中生成了一堆数据,只要有新文件到达 S3,就会收到 PUT 通知 being sent to SQS。我想将这些文件的内容加载到 BigQuery 中,因此我正在努力在 Google Dataflow 中设置一个简单的 ETL。但是,我不知道如何将 Dataflow 与它尚未支持的任何服务(Pubsub、Google Cloud Storage 等)集成。
In the initial release of Cloud Dataflow, extensibility for Read and Write transforms has not been implemented.
我想我可以确认这一点,因为我试图编写一个读取转换,但无法弄清楚如何使其工作(我试图将 SqsIO class 基于提供的 PubsubIO class).
所以我一直在考虑为 Dataflow 编写一个自定义的 source,但我无法思考如何使 Source 适应轮询 SQS 以进行更改。无论如何,它看起来都不是正确的抽象,但我真的不在乎我是否能让它工作。
此外,看起来我必须做一些工作才能下载 S3 文件(我尝试为此创建一个 Reader,但没有运气 b/c 上述原因).
基本上,我被卡住了。对于将 SQS 和 S3 与 Dataflow 集成的任何建议,我们将不胜感激。
Dataflow Java SDK 现在包含一个 API 用于定义自定义无限源:
这可用于实现自定义 SQS 源。