调用 flow.register() 时,Prefect 未注册流程。它触发无法泡菜错误

Prefect is not registering flow when call flow.register() . It triggers can´t pickle error

使用 Prefect 向我的后端服务器注册工作流returns出现此错误:

Traceback (most recent call last): File "/home/usr/Documents/Test_Automation/Automation/qa-automation/date_validator_Prefect/date_validator.py", line 114, in flow.register(project_name="QA-Test") File "/home/usr/.local/lib/python3.9/site-packages/prefect/core/flow.py", line 1726, in register registered_flow = client.register( File "/home/usr/.local/lib/python3.9/site-packages/prefect/client/client.py", line 848, in register serialized_flow = flow.serialize(build=build) # type: Any File "/home/usr/.local/lib/python3.9/site-packages/prefect/core/flow.py", line 1507, in serialize self.storage.add_flow(self) File "/home/usr/.local/lib/python3.9/site-packages/prefect/storage/local.py", line 143, in add_flow f.write(flow_to_bytes_pickle(flow)) File "/home/usr/.local/lib/python3.9/site-packages/prefect/utilities/storage.py", line 178, in flow_to_bytes_pickle cloudpickle.dumps(flow, protocol=4), newline=False File "/home/usr/.local/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps cp.dump(obj) File "/home/usr/.local/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 633, in dump return Pickler.dump(self, obj) TypeError: cannot pickle 'weakref' object

只有当我尝试 flow.register(project="QA-test") 时才会发生。 如果我只添加 flow.run() 它工作正常。

这是代码的开头:

@task(name="T3")
def feed_validator():
    jsonpath = ''
    results = []
    urlsData = load_data('urls2.json')
    for feed in urlsData:
        for country in feed["countries"]:
            feedUrl = feed['url']
            if feed['type'] == 'Availability':
                feedUrl = feedUrl.replace('/us/', "/" + country.lower() + "/")
            feedData = getFeed(feedUrl)
            feedDateInfo = isFeedUpdated(feedData)
            if feed['network'] == 'network-samp(gate)':
                feed['network'] = 'network-samp'
            elif feed['network'] == 'network-samp(samp)':
                feed['network'] = 'network-samp2'
                country = ''
            elif feed['network'] == 'network-samp3(samp3)':
                feed['network'] = 'network-samp3'
                country = ''
            elif feed['type'] == 'Catalog':
                country = ''
            results.append((feed['platform'], feed['type'], feed['network'], country, feedDateInfo[1]))
    orderedResults = multisort(list(results), ((4, False), (1, False), (2, False), (3, False)))
    report = email_sender.createResultsEmail(orderedResults)
    email_sender.sendEmail(report.as_string())

with Flow("test", storage=Local()) as flow:
    feed_validator()

flow.register(project_name="QA-Test")
flow.run()

当使用本地存储时,Prefect 默认使用 pickled flow。您可以切换到使用脚本存储,如 this Documentation page

中所述