如何将 python 脚本重写为 dagster 友好代码
How to rewrite python script to dagster friendly code
我有这个简单的 python 脚本。我怎样才能以在 dagster 中工作的方式重写它?
import logging
from mypackage import function1, function2, function3, function4, function5
def main():
try:
function1()
function2()
except Exception as e:
logging.exception(e)
function4()
else:
function5()
if __name__ == '__main__:
main()
这是我迄今为止一直在尝试的方法,但还有很长的路要走:
import logging
from dagster import success_hook, failure_hook
from mypackage import function1, function2, function3, function4, function5
@solid
def dag_function1() -> bool:
myvar1 = True
function1()
return myvar1
@solid
def dag_function2() -> bool:
myvar2 = True
function2()
return myvar2
@solid
def dag_function3() -> bool:
myvar3 = True
function3()
return myvar3
@failure_hook
def dag_function5():
logging.exception('NOT SURE HOW TO ACCESS MY EXCEPTION')
function5()
@success_hook
def dag_function4():
function4()
def main():
dag_function3(dag_function1(), dag_function2())
我试过类似的方法,但 dagster 抛出错误 dagster.core.errors.DagsterInvariantViolationError:找不到作业、管道、图形或存储库
为了将实体的输出传递给其他实体的输入,您需要创建一个管道来定义输入和输出之间的依赖关系。
从那里,您将能够执行管道:
import logging
from dagster import success_hook, failure_hook, solid, pipeline, execute_pipeline
from mypackage import function1, function2, function3, function4, function5
@solid
def dag_function1() -> bool:
myvar1 = True
function1()
return myvar1
@solid
def dag_function2() -> bool:
myvar2 = True
function2()
return myvar2
@solid
def dag_function3(input_1, input_2) -> bool:
myvar3 = True
function3()
return myvar3
@failure_hook
def dag_function5(context):
logging.exception(context.solid_exception)
function5()
@success_hook
def dag_function4(context):
pass
@pipeline(hook_defs={dag_function5, dag_function4})
def my_pipeline():
dag_function3(dag_function1(), dag_function2())
if __name__ == '__main__':
execute_pipeline(my_pipeline)
也就是说,从 0.13.0 开始,Dagster 已经迁移到一组新的核心 API(包括作业、操作和图表)。现有迁移指南 here 详细说明实体和管道如何映射到操作和作业。
我有这个简单的 python 脚本。我怎样才能以在 dagster 中工作的方式重写它?
import logging
from mypackage import function1, function2, function3, function4, function5
def main():
try:
function1()
function2()
except Exception as e:
logging.exception(e)
function4()
else:
function5()
if __name__ == '__main__:
main()
这是我迄今为止一直在尝试的方法,但还有很长的路要走:
import logging
from dagster import success_hook, failure_hook
from mypackage import function1, function2, function3, function4, function5
@solid
def dag_function1() -> bool:
myvar1 = True
function1()
return myvar1
@solid
def dag_function2() -> bool:
myvar2 = True
function2()
return myvar2
@solid
def dag_function3() -> bool:
myvar3 = True
function3()
return myvar3
@failure_hook
def dag_function5():
logging.exception('NOT SURE HOW TO ACCESS MY EXCEPTION')
function5()
@success_hook
def dag_function4():
function4()
def main():
dag_function3(dag_function1(), dag_function2())
我试过类似的方法,但 dagster 抛出错误 dagster.core.errors.DagsterInvariantViolationError:找不到作业、管道、图形或存储库
为了将实体的输出传递给其他实体的输入,您需要创建一个管道来定义输入和输出之间的依赖关系。
从那里,您将能够执行管道:
import logging
from dagster import success_hook, failure_hook, solid, pipeline, execute_pipeline
from mypackage import function1, function2, function3, function4, function5
@solid
def dag_function1() -> bool:
myvar1 = True
function1()
return myvar1
@solid
def dag_function2() -> bool:
myvar2 = True
function2()
return myvar2
@solid
def dag_function3(input_1, input_2) -> bool:
myvar3 = True
function3()
return myvar3
@failure_hook
def dag_function5(context):
logging.exception(context.solid_exception)
function5()
@success_hook
def dag_function4(context):
pass
@pipeline(hook_defs={dag_function5, dag_function4})
def my_pipeline():
dag_function3(dag_function1(), dag_function2())
if __name__ == '__main__':
execute_pipeline(my_pipeline)
也就是说,从 0.13.0 开始,Dagster 已经迁移到一组新的核心 API(包括作业、操作和图表)。现有迁移指南 here 详细说明实体和管道如何映射到操作和作业。