明智地导入一个 JSON 项目,因此它只加载一次
Import a JSON project wise, so it loads just once
我有一个 Python 项目,它针对特定模式执行 JSON 验证。
它将 运行 作为 GCP 数据流中的转换步骤,因此在 运行 避免一次又一次下载相同文件之前收集所有依赖项非常重要。
架构放置在一个单独的 Git 存储库中。
Transformer 的本质是您在 class 中收到一条记录,然后使用它。典型的流程是加载 JSON 模式,根据它验证记录,然后处理无效和有效的内容。以这种方式加载模式意味着我从存储库中为每条记录下载模式,它可能是数十万条。
代码被“克隆”到工人中,然后独立工作。
受到 Python 在开始时(一次)加载需求并将它们用作导入的方式的启发,我想我可以添加存储库(JSON 模式所在的位置)作为Python 要求,然后只需在我的 Python 代码中使用它。但当然,它是一个 JSON,而不是要导入的 Python 模块。它是如何工作的?
例如:
- requirements.txt
git+git://github.com/path/to/json/schema@41b95ec
- dataflow_transformer.py
import apache_beam as beam
import the_downloaded_schema
from jsonschema import validate
class Verifier(beam.DoFn):
def process(self, record: dict):
validate(instance=record, schema=the_downloaded_schema)
# ... more stuff
yield record
class Transformer(beam.PTransform):
def expand(self, record):
return (
record
| "Verify Schema" >> beam.ParDo(Verifier())
)
为什么不直接使用 class 来加载使用缓存的资源以防止重复加载?大致如下:
class JsonLoader:
def __init__(self):
self.cache = set()
def import(self, filename):
filename = os.path.absname(filename)
if filename not in self.cache:
self._load_json(filename)
self.cache.add(filename)
def _load_json(self, filename):
...
您可以加载一次 json 架构并将其用作辅助输入。
一个例子:
import json
import requests
json_current='https://covidtracking.com/api/v1/states/current.json'
def get_json_schema(url):
with requests.Session() as session:
schema = json.loads(session.get(url).text)
return schema
schema_json = get_json_schema(json_current)
def feed_schema(data, schema):
yield {'record': data, 'schema': schema[0]}
schema = p | beam.Create([schema_json])
data = p | beam.Create(range(10))
data_with_schema = data | beam.FlatMap(feed_schema, schema=beam.pvalue.AsSingleton(schema))
# Now do your schema validation
只是 data_with_schema
pcollection 的演示
我有一个 Python 项目,它针对特定模式执行 JSON 验证。 它将 运行 作为 GCP 数据流中的转换步骤,因此在 运行 避免一次又一次下载相同文件之前收集所有依赖项非常重要。
架构放置在一个单独的 Git 存储库中。 Transformer 的本质是您在 class 中收到一条记录,然后使用它。典型的流程是加载 JSON 模式,根据它验证记录,然后处理无效和有效的内容。以这种方式加载模式意味着我从存储库中为每条记录下载模式,它可能是数十万条。 代码被“克隆”到工人中,然后独立工作。
受到 Python 在开始时(一次)加载需求并将它们用作导入的方式的启发,我想我可以添加存储库(JSON 模式所在的位置)作为Python 要求,然后只需在我的 Python 代码中使用它。但当然,它是一个 JSON,而不是要导入的 Python 模块。它是如何工作的?
例如:
- requirements.txt
git+git://github.com/path/to/json/schema@41b95ec
- dataflow_transformer.py
import apache_beam as beam
import the_downloaded_schema
from jsonschema import validate
class Verifier(beam.DoFn):
def process(self, record: dict):
validate(instance=record, schema=the_downloaded_schema)
# ... more stuff
yield record
class Transformer(beam.PTransform):
def expand(self, record):
return (
record
| "Verify Schema" >> beam.ParDo(Verifier())
)
为什么不直接使用 class 来加载使用缓存的资源以防止重复加载?大致如下:
class JsonLoader:
def __init__(self):
self.cache = set()
def import(self, filename):
filename = os.path.absname(filename)
if filename not in self.cache:
self._load_json(filename)
self.cache.add(filename)
def _load_json(self, filename):
...
您可以加载一次 json 架构并将其用作辅助输入。 一个例子:
import json
import requests
json_current='https://covidtracking.com/api/v1/states/current.json'
def get_json_schema(url):
with requests.Session() as session:
schema = json.loads(session.get(url).text)
return schema
schema_json = get_json_schema(json_current)
def feed_schema(data, schema):
yield {'record': data, 'schema': schema[0]}
schema = p | beam.Create([schema_json])
data = p | beam.Create(range(10))
data_with_schema = data | beam.FlatMap(feed_schema, schema=beam.pvalue.AsSingleton(schema))
# Now do your schema validation
只是 data_with_schema
pcollection 的演示