从 REST API 读取的 Apache Beam 管道在本地运行,但不在 Dataflow 上运行
Apache Beam Pipeline to read from REST API runs locally but not on Dataflow
我一直在尝试使用 Dataflow 上的经典模板将我的管道连接到 运行。
管道应该读取 运行 时间参数 from_date
和 to_date
并将它们传递给 REST API。然后应该将 API 返回的答案写入 bigquery table.
它 运行 在 Dataflow 上没有任何错误,但我的数据 根本没有出现在作为数据接收器 的 gbq table 中。
当我在本地执行它时,它就像一个魅力:没有错误,我可以使用服务帐户和本地文件写入 gbq。
我怀疑我误解了不同环境中管道步骤可用的内容,并且实际上没有数据沿着管道传递。
Dataflow 运行ner 上可能没有 requests
包,但我希望收到一条错误消息...
当我尝试在数据流上运行它但写入文本(下面的注释行)时,在云存储上创建了一个文件夹,但里面没有文件。
我还怀疑这就是为什么我无法在监控中显示任何调试消息的原因 UI。
非常感谢 - 这是我的管道代码:
#!/usr/bin/env python
# coding: utf-8
import logging
import argparse
# Beam/Dataflow related imports
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions, GoogleCloudOptions
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.options.value_provider import RuntimeValueProvider
# Handling of API calls
import requests
import json
class get_api_data(beam.DoFn):
def __init__(self):
logging.debug("fetching api data")
def process(self, dates):
bearer_token = "api_secret"
from_date = str(dates[0])
to_date = str(dates[1])
logging.debug("Now fetching from ", from_date, " to ", to_date)
payload = {'stuff': 'stuff',
'from': from_date,
'to': to_date,
'other_stuff': 'other_stuff'
}
payload = json.dumps(payload)
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + bearer_token,
'Accept': 'application/json',
'Content-Type': 'application/json'
}
r = requests.post("api_url", data= payload, headers=headers)
return [line.decode("utf-8") for line in r.iter_lines()][1:]
class Split(beam.DoFn):
def process(self, element):
try:
pid, date, another_kpi, yet_another_kpi = element.split(",")
logging.debug(" | ".join(element.split(",")) )
except ValueError:
logging.error(" | ".join(element.split(",")) )
return [{
'pid':str(pid),
'date':str(date),
'another_kpi':int(another_kpi),
'yet_another_kpi':float(yet_another_kpi)
}]
class UserOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--to_date', dest='to_date', type=str)
parser.add_value_provider_argument('--from_date', dest='from_date', type=str)
def run(argv=None):
parser = argparse.ArgumentParser()
path_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
print("Google Cloud Options: ", pipeline_options.view_as(GoogleCloudOptions))
from_date = pipeline_options.view_as(UserOptions).from_date
to_date = pipeline_options.view_as(UserOptions).to_date
logging.debug("Data from ", from_date, " to ", to_date)
table_spec = bigquery.TableReference(
projectId='my_project',
datasetId='my_dataset',
tableId='my_table')
table_schema = 'pid:STRING, date:STRING, another_kpi:INT64, yet_another_kpi:FLOAT64'
p1 = beam.Pipeline(options=pipeline_options)
ingest_data = (
p1
| 'pass dates' >> beam.Create([[from_date, to_date]])
| 'fetch API data' >> beam.ParDo(get_api_data())
| 'split records' >> beam.ParDo(Split())
| 'write into gbq' >> beam.io.gcp.bigquery.WriteToBigQuery(table = table_spec, schema=table_schema, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE ,create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
#| 'write to text' >> beam.io.WriteToText("./test_v2.csv")
)
result = p1.run()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.DEBUG)
run()
显然禁止将 ValueProvider 与 Create
结合使用,尽管我没有收到错误消息。
我通过使用解决了它:
class OutputValueProviderFn(beam.DoFn):
def __init__(self, vp1, vp2):
self.vp1 = vp1
self.vp2 = vp2
def process(self, unused_elm):
logging.info("Providing dates: ", self.vp1.get(), self.vp2.get() )
yield [self.vp1.get(), self.vp2.get()]
...
from_date = pipeline_options.view_as(UserOptions).from_date
to_date = pipeline_options.view_as(UserOptions).to_date
pipel = (
p1
| 'Start Pipeline' >> beam.Create([None])
| 'Read from and to date' >> beam.ParDo(OutputValueProviderFn(from_date, to_date))
| 'fetch API data' >> beam.ParDo(get_api_data())
...
)
灵感
我一直在尝试使用 Dataflow 上的经典模板将我的管道连接到 运行。
管道应该读取 运行 时间参数 from_date
和 to_date
并将它们传递给 REST API。然后应该将 API 返回的答案写入 bigquery table.
它 运行 在 Dataflow 上没有任何错误,但我的数据 根本没有出现在作为数据接收器 的 gbq table 中。 当我在本地执行它时,它就像一个魅力:没有错误,我可以使用服务帐户和本地文件写入 gbq。
我怀疑我误解了不同环境中管道步骤可用的内容,并且实际上没有数据沿着管道传递。
Dataflow 运行ner 上可能没有 requests
包,但我希望收到一条错误消息...
当我尝试在数据流上运行它但写入文本(下面的注释行)时,在云存储上创建了一个文件夹,但里面没有文件。
我还怀疑这就是为什么我无法在监控中显示任何调试消息的原因 UI。
非常感谢 - 这是我的管道代码:
#!/usr/bin/env python
# coding: utf-8
import logging
import argparse
# Beam/Dataflow related imports
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions, GoogleCloudOptions
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.options.value_provider import RuntimeValueProvider
# Handling of API calls
import requests
import json
class get_api_data(beam.DoFn):
def __init__(self):
logging.debug("fetching api data")
def process(self, dates):
bearer_token = "api_secret"
from_date = str(dates[0])
to_date = str(dates[1])
logging.debug("Now fetching from ", from_date, " to ", to_date)
payload = {'stuff': 'stuff',
'from': from_date,
'to': to_date,
'other_stuff': 'other_stuff'
}
payload = json.dumps(payload)
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + bearer_token,
'Accept': 'application/json',
'Content-Type': 'application/json'
}
r = requests.post("api_url", data= payload, headers=headers)
return [line.decode("utf-8") for line in r.iter_lines()][1:]
class Split(beam.DoFn):
def process(self, element):
try:
pid, date, another_kpi, yet_another_kpi = element.split(",")
logging.debug(" | ".join(element.split(",")) )
except ValueError:
logging.error(" | ".join(element.split(",")) )
return [{
'pid':str(pid),
'date':str(date),
'another_kpi':int(another_kpi),
'yet_another_kpi':float(yet_another_kpi)
}]
class UserOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--to_date', dest='to_date', type=str)
parser.add_value_provider_argument('--from_date', dest='from_date', type=str)
def run(argv=None):
parser = argparse.ArgumentParser()
path_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
print("Google Cloud Options: ", pipeline_options.view_as(GoogleCloudOptions))
from_date = pipeline_options.view_as(UserOptions).from_date
to_date = pipeline_options.view_as(UserOptions).to_date
logging.debug("Data from ", from_date, " to ", to_date)
table_spec = bigquery.TableReference(
projectId='my_project',
datasetId='my_dataset',
tableId='my_table')
table_schema = 'pid:STRING, date:STRING, another_kpi:INT64, yet_another_kpi:FLOAT64'
p1 = beam.Pipeline(options=pipeline_options)
ingest_data = (
p1
| 'pass dates' >> beam.Create([[from_date, to_date]])
| 'fetch API data' >> beam.ParDo(get_api_data())
| 'split records' >> beam.ParDo(Split())
| 'write into gbq' >> beam.io.gcp.bigquery.WriteToBigQuery(table = table_spec, schema=table_schema, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE ,create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
#| 'write to text' >> beam.io.WriteToText("./test_v2.csv")
)
result = p1.run()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.DEBUG)
run()
显然禁止将 ValueProvider 与 Create
结合使用,尽管我没有收到错误消息。
我通过使用解决了它:
class OutputValueProviderFn(beam.DoFn):
def __init__(self, vp1, vp2):
self.vp1 = vp1
self.vp2 = vp2
def process(self, unused_elm):
logging.info("Providing dates: ", self.vp1.get(), self.vp2.get() )
yield [self.vp1.get(), self.vp2.get()]
...
from_date = pipeline_options.view_as(UserOptions).from_date
to_date = pipeline_options.view_as(UserOptions).to_date
pipel = (
p1
| 'Start Pipeline' >> beam.Create([None])
| 'Read from and to date' >> beam.ParDo(OutputValueProviderFn(from_date, to_date))
| 'fetch API data' >> beam.ParDo(get_api_data())
...
)
灵感