调用 flow.register() 时,Prefect 未注册流程。它触发无法泡菜错误
Prefect is not registering flow when call flow.register() . It triggers can´t pickle error
使用 Prefect 向我的后端服务器注册工作流returns出现此错误:
Traceback (most recent call last):
File "/home/usr/Documents/Test_Automation/Automation/qa-automation/date_validator_Prefect/date_validator.py", line 114, in
flow.register(project_name="QA-Test")
File "/home/usr/.local/lib/python3.9/site-packages/prefect/core/flow.py", line 1726, in register
registered_flow = client.register(
File "/home/usr/.local/lib/python3.9/site-packages/prefect/client/client.py", line 848, in register
serialized_flow = flow.serialize(build=build) # type: Any
File "/home/usr/.local/lib/python3.9/site-packages/prefect/core/flow.py", line 1507, in serialize
self.storage.add_flow(self)
File "/home/usr/.local/lib/python3.9/site-packages/prefect/storage/local.py", line 143, in add_flow
f.write(flow_to_bytes_pickle(flow))
File "/home/usr/.local/lib/python3.9/site-packages/prefect/utilities/storage.py", line 178, in flow_to_bytes_pickle
cloudpickle.dumps(flow, protocol=4), newline=False
File "/home/usr/.local/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "/home/usr/.local/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 633, in dump
return Pickler.dump(self, obj)
TypeError: cannot pickle 'weakref' object
只有当我尝试 flow.register(project="QA-test")
时才会发生。
如果我只添加 flow.run()
它工作正常。
这是代码的开头:
@task(name="T3")
def feed_validator():
jsonpath = ''
results = []
urlsData = load_data('urls2.json')
for feed in urlsData:
for country in feed["countries"]:
feedUrl = feed['url']
if feed['type'] == 'Availability':
feedUrl = feedUrl.replace('/us/', "/" + country.lower() + "/")
feedData = getFeed(feedUrl)
feedDateInfo = isFeedUpdated(feedData)
if feed['network'] == 'network-samp(gate)':
feed['network'] = 'network-samp'
elif feed['network'] == 'network-samp(samp)':
feed['network'] = 'network-samp2'
country = ''
elif feed['network'] == 'network-samp3(samp3)':
feed['network'] = 'network-samp3'
country = ''
elif feed['type'] == 'Catalog':
country = ''
results.append((feed['platform'], feed['type'], feed['network'], country, feedDateInfo[1]))
orderedResults = multisort(list(results), ((4, False), (1, False), (2, False), (3, False)))
report = email_sender.createResultsEmail(orderedResults)
email_sender.sendEmail(report.as_string())
with Flow("test", storage=Local()) as flow:
feed_validator()
flow.register(project_name="QA-Test")
flow.run()
当使用本地存储时,Prefect 默认使用 pickled flow。您可以切换到使用脚本存储,如 this Documentation page
中所述
使用 Prefect 向我的后端服务器注册工作流returns出现此错误:
Traceback (most recent call last): File "/home/usr/Documents/Test_Automation/Automation/qa-automation/date_validator_Prefect/date_validator.py", line 114, in flow.register(project_name="QA-Test") File "/home/usr/.local/lib/python3.9/site-packages/prefect/core/flow.py", line 1726, in register registered_flow = client.register( File "/home/usr/.local/lib/python3.9/site-packages/prefect/client/client.py", line 848, in register serialized_flow = flow.serialize(build=build) # type: Any File "/home/usr/.local/lib/python3.9/site-packages/prefect/core/flow.py", line 1507, in serialize self.storage.add_flow(self) File "/home/usr/.local/lib/python3.9/site-packages/prefect/storage/local.py", line 143, in add_flow f.write(flow_to_bytes_pickle(flow)) File "/home/usr/.local/lib/python3.9/site-packages/prefect/utilities/storage.py", line 178, in flow_to_bytes_pickle cloudpickle.dumps(flow, protocol=4), newline=False File "/home/usr/.local/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps cp.dump(obj) File "/home/usr/.local/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 633, in dump return Pickler.dump(self, obj) TypeError: cannot pickle 'weakref' object
只有当我尝试 flow.register(project="QA-test")
时才会发生。
如果我只添加 flow.run()
它工作正常。
这是代码的开头:
@task(name="T3")
def feed_validator():
jsonpath = ''
results = []
urlsData = load_data('urls2.json')
for feed in urlsData:
for country in feed["countries"]:
feedUrl = feed['url']
if feed['type'] == 'Availability':
feedUrl = feedUrl.replace('/us/', "/" + country.lower() + "/")
feedData = getFeed(feedUrl)
feedDateInfo = isFeedUpdated(feedData)
if feed['network'] == 'network-samp(gate)':
feed['network'] = 'network-samp'
elif feed['network'] == 'network-samp(samp)':
feed['network'] = 'network-samp2'
country = ''
elif feed['network'] == 'network-samp3(samp3)':
feed['network'] = 'network-samp3'
country = ''
elif feed['type'] == 'Catalog':
country = ''
results.append((feed['platform'], feed['type'], feed['network'], country, feedDateInfo[1]))
orderedResults = multisort(list(results), ((4, False), (1, False), (2, False), (3, False)))
report = email_sender.createResultsEmail(orderedResults)
email_sender.sendEmail(report.as_string())
with Flow("test", storage=Local()) as flow:
feed_validator()
flow.register(project_name="QA-Test")
flow.run()
当使用本地存储时,Prefect 默认使用 pickled flow。您可以切换到使用脚本存储,如 this Documentation page
中所述