无法创建数据流模板,因为 Scrapinghub 客户端库不接受 ValueProvider
Data flow template cant be created because Scrapinghub Client Library doesn't accept ValueProvider
我正在尝试创建一个可以从由 pubsub 消息触发的云函数调用的数据流模板。 pubsub 消息将作业 ID 从 Scrapinghub(scrapy 爬虫平台)发送到触发数据流模板的云函数,该模板的输入是作业 ID,输出是相应数据到 BigQuery。这个设计的其他步骤都完成了,但是由于Scrapinghub的客户端库和apache beam之间可能不兼容,我无法创建模板。
代码:
from __future__ import absolute_import
import argparse
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from scrapinghub import ScrapinghubClient
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.value_provider import StaticValueProvider
class UserOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--input')
parser.add_value_provider_argument('--output', type=str)
class IngestionBQ:
def __init__(self): pass
@staticmethod
def parse_method(item):
dic = {k: item[k] for k in item if k not in [b'_type', b'_key']}
new_d = {}
for key in dic:
try:
new_d.update({key.decode("utf-8"): dic[key].decode("utf-8")})
except AttributeError:
new_d.update({key.decode("utf-8"): dic[key]})
yield new_d
class ShubConnect():
def __init__(self, api_key, job_id):
self.job_id = job_id
self.client = ScrapinghubClient(api_key)
def get_data(self):
data = []
item = self.client.get_job(self. job_id)
for i in item.items.iter():
data.append(i)
return data
def run(argv=None, save_main_session==True):
"""The main function which creates the pipeline and runs it."""
data_ingestion = IngestionBQ()
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
api_key = os.environ.get('api_key')
user_options = pipeline_options.view_as(UserOptions)
(p
| 'Read Data from Scrapinghub' >> beam.Create(ShubConnect(api_key, user_options.input).get_data())
| 'Trim b string' >> beam.FlatMap(data_ingestion.parse_method)
| 'Write Projects to BigQuery' >> beam.io.WriteToBigQuery(
user_options.output,
schema=schema,
# Creates the table in BigQuery if it does not yet exist.
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY)
)
p.run()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
然后我使用此命令在云中部署模板 shell:
python main.py
--project=project-name
--region=us-central1
--runner=DataflowRunner
--temp_location gs://temp/location/
--template_location gs://templates/location/
然后出现错误:
Traceback (most recent call last):
File "main.py", line 69, in <module>
run()
File "main.py", line 57, in run
| 'Write Projects to BigQuery' >> beam.io.WriteToBigQuery(
File "main.py", line 41, in get_data
item = self.client.get_job(self. job_id)
File "/home/user/data-flow/venv/lib/python3.7/site-packages/scrapinghub/client/__init__.py", line 99, in get_job
project_id = parse_job_key(job_key).project_id
File "/home/user/data-flow/venv/lib/python3.7/site-packages/scrapinghub/client/utils.py", line 60, in parse_job_key
.format(type(job_key), repr(job_key)))
ValueError: Job key should be a string or a tuple, got <class 'apache_beam.options.value_provider.RuntimeValueProvider'>: <apache_beam.options.value_provider.RuntimeValueProvider object at 0x7f1
4760a3630>
所以在此之前,我成功创建了一个模板,但我没有使用 parser.add_value_provider_argument
,而是使用了 parser.add_argument
。但是,虽然可以创建模板,但不能 运行 因为 parser.add_argument
不支持 运行 时间参数。但是,不仅可以使用 parser.add_argument
创建模板,我还可以 运行 来自云 shell 的管道 parser.add_argument
。为什么 Scrapinghub 的客户端 API 没有使用 parser.add_argument
而是使用 parser.add_value_provider_argument
抛出错误?两者之间的基本编程区别是什么?而且,当然,我如何仍然使用 ValueProvider 参数创建此模板?
非常感谢。
编辑
阅读文档后,我了解到错误的发生是因为 non-I/O 模块不支持 ValueProvider 对象。参考:https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#python_5
阅读文档后,我了解到错误的发生是因为 non-I/O 模块不支持 ValueProvider 对象。参考:https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#python_5
所以为了实现我需要做的事情,我可以切换到 Java SDK 或者想出另一个主意。但是这条路径是 dead-end,直到 non-I/O 模块支持 ValueProvider
。
我正在尝试创建一个可以从由 pubsub 消息触发的云函数调用的数据流模板。 pubsub 消息将作业 ID 从 Scrapinghub(scrapy 爬虫平台)发送到触发数据流模板的云函数,该模板的输入是作业 ID,输出是相应数据到 BigQuery。这个设计的其他步骤都完成了,但是由于Scrapinghub的客户端库和apache beam之间可能不兼容,我无法创建模板。
代码:
from __future__ import absolute_import
import argparse
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from scrapinghub import ScrapinghubClient
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.value_provider import StaticValueProvider
class UserOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--input')
parser.add_value_provider_argument('--output', type=str)
class IngestionBQ:
def __init__(self): pass
@staticmethod
def parse_method(item):
dic = {k: item[k] for k in item if k not in [b'_type', b'_key']}
new_d = {}
for key in dic:
try:
new_d.update({key.decode("utf-8"): dic[key].decode("utf-8")})
except AttributeError:
new_d.update({key.decode("utf-8"): dic[key]})
yield new_d
class ShubConnect():
def __init__(self, api_key, job_id):
self.job_id = job_id
self.client = ScrapinghubClient(api_key)
def get_data(self):
data = []
item = self.client.get_job(self. job_id)
for i in item.items.iter():
data.append(i)
return data
def run(argv=None, save_main_session==True):
"""The main function which creates the pipeline and runs it."""
data_ingestion = IngestionBQ()
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
api_key = os.environ.get('api_key')
user_options = pipeline_options.view_as(UserOptions)
(p
| 'Read Data from Scrapinghub' >> beam.Create(ShubConnect(api_key, user_options.input).get_data())
| 'Trim b string' >> beam.FlatMap(data_ingestion.parse_method)
| 'Write Projects to BigQuery' >> beam.io.WriteToBigQuery(
user_options.output,
schema=schema,
# Creates the table in BigQuery if it does not yet exist.
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY)
)
p.run()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
然后我使用此命令在云中部署模板 shell:
python main.py
--project=project-name
--region=us-central1
--runner=DataflowRunner
--temp_location gs://temp/location/
--template_location gs://templates/location/
然后出现错误:
Traceback (most recent call last):
File "main.py", line 69, in <module>
run()
File "main.py", line 57, in run
| 'Write Projects to BigQuery' >> beam.io.WriteToBigQuery(
File "main.py", line 41, in get_data
item = self.client.get_job(self. job_id)
File "/home/user/data-flow/venv/lib/python3.7/site-packages/scrapinghub/client/__init__.py", line 99, in get_job
project_id = parse_job_key(job_key).project_id
File "/home/user/data-flow/venv/lib/python3.7/site-packages/scrapinghub/client/utils.py", line 60, in parse_job_key
.format(type(job_key), repr(job_key)))
ValueError: Job key should be a string or a tuple, got <class 'apache_beam.options.value_provider.RuntimeValueProvider'>: <apache_beam.options.value_provider.RuntimeValueProvider object at 0x7f1
4760a3630>
所以在此之前,我成功创建了一个模板,但我没有使用 parser.add_value_provider_argument
,而是使用了 parser.add_argument
。但是,虽然可以创建模板,但不能 运行 因为 parser.add_argument
不支持 运行 时间参数。但是,不仅可以使用 parser.add_argument
创建模板,我还可以 运行 来自云 shell 的管道 parser.add_argument
。为什么 Scrapinghub 的客户端 API 没有使用 parser.add_argument
而是使用 parser.add_value_provider_argument
抛出错误?两者之间的基本编程区别是什么?而且,当然,我如何仍然使用 ValueProvider 参数创建此模板?
非常感谢。
编辑
阅读文档后,我了解到错误的发生是因为 non-I/O 模块不支持 ValueProvider 对象。参考:https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#python_5
阅读文档后,我了解到错误的发生是因为 non-I/O 模块不支持 ValueProvider 对象。参考:https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#python_5
所以为了实现我需要做的事情,我可以切换到 Java SDK 或者想出另一个主意。但是这条路径是 dead-end,直到 non-I/O 模块支持 ValueProvider
。