明智地导入一个 JSON 项目,因此它只加载一次

Import a JSON project wise, so it loads just once

我有一个 Python 项目,它针对特定模式执行 JSON 验证。 它将 运行 作为 GCP 数据流中的转换步骤,因此在 运行 避免一次又一次下载相同文件之前收集所有依赖项非常重要。

架构放置在一个单独的 Git 存储库中。 Transformer 的本质是您在 class 中收到一条记录,然后使用它。典型的流程是加载 JSON 模式,根据它验证记录,然后处理无效和有效的内容。以这种方式加载模式意味着我从存储库中为每条记录下载模式,它可能是数十万条。 代码被“克隆”到工人中,然后独立工作。

受到 Python 在开始时(一次)加载需求并将它们用作导入的方式的启发,我想我可以添加存储库(JSON 模式所在的位置)作为Python 要求,然后只需在我的 Python 代码中使用它。但当然,它是一个 JSON,而不是要导入的 Python 模块。它是如何工作的?

例如:

git+git://github.com/path/to/json/schema@41b95ec
import apache_beam as beam
import the_downloaded_schema
from jsonschema import validate

class Verifier(beam.DoFn):

    def process(self, record: dict):
        validate(instance=record, schema=the_downloaded_schema)

        # ... more stuff

        yield record

class Transformer(beam.PTransform):
    def expand(self, record):
        return (
            record
            | "Verify Schema" >> beam.ParDo(Verifier())
        )

为什么不直接使用 class 来加载使用缓存的资源以防止重复加载?大致如下:

class JsonLoader:
    def __init__(self):
        self.cache = set()

    def import(self, filename):
        filename = os.path.absname(filename)
        if filename not in self.cache:
            self._load_json(filename)
            self.cache.add(filename)

    def _load_json(self, filename):
        ...

您可以加​​载一次 json 架构并将其用作辅助输入。 一个例子:

import json
import requests

json_current='https://covidtracking.com/api/v1/states/current.json'

def get_json_schema(url):
  with requests.Session() as session:
    schema = json.loads(session.get(url).text)
  return schema

schema_json = get_json_schema(json_current)

def feed_schema(data, schema):
  yield {'record': data, 'schema': schema[0]}

schema = p | beam.Create([schema_json])
data = p | beam.Create(range(10))
data_with_schema = data | beam.FlatMap(feed_schema, schema=beam.pvalue.AsSingleton(schema))
# Now do your schema validation

只是 data_with_schema pcollection 的演示