如何使用 Apache Beam (Dataflow) 从 API 获取数据?

How to get data from an API using Apache Beam (Dataflow)?

我做过一些 Python 编程,但无论如何我都不是经验丰富的开发人员。我们有一个 Python etl 程序,它被设置为 Cloud Functions,但由于要加载的数据太多,它正在超时,我们正在寻求重新编写它以在 Dataflow 中工作。

目前的代码只是连接到一个 API,其中 returns 一个换行符 JSON,然后将数据加载到一个新的 table在 BigQuery 中。

这是我们第一次使用 Dataflow,我们只是想了解它的工作原理。将数据导入 BigQuery 似乎很容易,我们遇到的绊脚石是如何从 API 中获取数据。我们不清楚如何进行这项工作,我们是否需要按照 [Develop IO Connector] 开发新的 I/O 连接器?或者开发新的连接器似乎很复杂,还有其他选择吗?

我们进行了大量的谷歌搜索,但没有找到任何明显的帮助。

这是我们的代码示例,但我们不能 100% 确定它是否在正确的轨道上。该代码不起作用,我们认为它最初需要是 .io.read 而不是 .ParDo,但我们不太确定该去哪里。非常感谢一些指导!

class callAPI(beam.DoFn):
   def __init__(self, input_header, input_uri):
       self.headers = input_header
       self.remote_url = input_uri

   def process(self):
       try:
           res = requests.get(self.remote_url, headers=self.headers) 
           res.raise_for_status()
       except HTTPError as message:
           logging.error(message)
           return
       return res.text

with beam.Pipeline() as p:

    data = ( p 
                | 'Call  API ' >> beam.ParDo(callAPI(HEADER, REMOTE_URI))
                | beam.Map(print))

提前致谢。

你能分享一下你的云函数的代码吗? 这是计划任务还是事件触发?如果它是计划任务,Apache Airflow 可能是更好的选择,您可以使用 Dataflow Python Operators 和 BigQueryOperators 来完成您正在寻找的事情

您走在正确的轨道上,但还有一些问题需要解决。

正如您所指出的,管道的根需要是某种读取。 ParDo 操作处理一组元素(理想情况下并行处理),但需要一些输入来处理。你可以做

p | beam.Create(['a', 'b', 'c']) | beam.ParDo(SomeDoFn())

其中 SomeDoFn 将被传递 abc 到它的 process 方法中。如果没有合理的输入并且您希望确保只调用一次 DoFn,则有一个特殊的 p | beam.Impulse() 操作将生成单个 None 元素。您还可以从文件(或类似文件)中读取元素。请注意,您的 process 方法需要 self 和要处理的元素,并且 returns 是一个可迭代的(以允许零个或多个输出。还有 beam.Mapbeam.FlatMap 封装了更简单的模式)。所以你可以做类似

的事情
class CallAPI(beam.DoFn):
   def __init__(self, input_header):
       self.headers = input_header

   def process(self, input_uri):
       try:
           res = requests.get(input_uri, headers=self.headers) 
           res.raise_for_status()
       except HTTPError as message:
           logging.error(message)
       yield res.text

with beam.Pipeline() as p:

    data = (
        p
        | beam.Create([REMOTE_URI]) 
        | 'Call API ' >> beam.ParDo(CallAPI(HEADER))
        | beam.Map(print))

这将允许您在同一管道中读取多个 URI(并行)。

如果你的源是这样的,你可以写一个完整的 IO 连接器,它可以被分割(最好是动态地)而不是只在一个巨大的请求中读取。