Dagster 无法在本地连接到 mongodb
Dagster cannot connect to mongodb locally
我正在阅读 Dagster 教程并认为连接到我的本地 mongodb 是一个很好的练习。
from dagster import get_dagster_logger, job, op
from pymongo import MongoClient
@op
def connection():
client = MongoClient("mongodb://localhost:27017/")
return client["development"]
@job
def execute():
client = connection()
get_dagster_logger().info(f"Connection: {client} ")
Dagster 错误:
dagster.core.errors.DagsterExecutionHandleOutputError: Error occurred while handling output "result" of step "connection":
File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/plan/execute_plan.py", line 232, in dagster_event_sequence_for_step
for step_event in check.generator(step_events):
File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/plan/execute_step.py", line 348, in core_dagster_event_sequence_for_step
for evt in _type_check_and_store_output(step_context, user_event, input_lineage):
File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/plan/execute_step.py", line 405, in _type_check_and_store_output
for evt in _store_output(step_context, step_output_handle, output, input_lineage):
File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/plan/execute_step.py", line 534, in _store_output
for elt in iterate_with_context(
File "/usr/local/lib/python3.9/site-packages/dagster/utils/__init__.py", line 400, in iterate_with_context
return
File "/usr/local/Cellar/python@3.9/3.9.12/Frameworks/Python.framework/Versions/3.9/lib/python3.9/contextlib.py", line 137, in __exit__
self.gen.throw(typ, value, traceback)
File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/plan/utils.py", line 73, in solid_execution_error_boundary
raise error_cls(
The above exception was caused by the following exception:
TypeError: cannot pickle '_thread.lock' object
File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/plan/utils.py", line 47, in solid_execution_error_boundary
yield
File "/usr/local/lib/python3.9/site-packages/dagster/utils/__init__.py", line 398, in iterate_with_context
next_output = next(iterator)
File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/plan/execute_step.py", line 524, in _gen_fn
gen_output = output_manager.handle_output(output_context, output.value)
File "/usr/local/lib/python3.9/site-packages/dagster/core/storage/fs_io_manager.py", line 124, in handle_output
pickle.dump(obj, write_obj, PICKLE_PROTOCOL)
我已经在 ipython 中本地测试了这个并且它有效所以这个问题与 dagster 有关。
默认的IOManager requires that inputs and outputs to ops be pickleable - it's likely that your MongoClient is not. You might want to try refactoring this to use Dagster's @resource方法。这允许您在@op 外部定义资源,并使稍后在测试中模拟这些资源变得非常容易。您的代码看起来像这样:
from dagster import get_dagster_logger, job, op, resource
from pymongo import MongoClient
@resource
def mongo_client():
client = MongoClient("mongodb://localhost:27017/")
return client["development"]
@op(
required_resource_keys={'mongo_client'}
)
def test_client(context):
client = context.resources.mongo_client
get_dagster_logger().info(f"Connection: {client} ")
@job(
resource_defs={'mongo_client': mongo_client}
)
def execute():
test_client()
另请注意,我将测试代码移至另一个@op,然后仅从执行@job 中调用该op。这是因为作业定义中的代码在加载时被编译,并且仅用于描述要执行的操作图。所有执行任务的通用编程都需要包含在@op 代码中。
@resource 模式的真正巧妙之处在于,这使得使用模拟资源或更普遍地交换资源进行测试变得异常容易。假设您想要一个模拟客户端,这样您就可以 运行 您的工作代码而无需实际访问数据库。您可以执行以下操作:
@resource
def mocked_mongo_client():
from unittest.mock import MagicMock
return MagicMock()
@graph
def execute_graph():
test_client()
execute_live = execute_graph.to_job(name='execute_live',
resource_defs={'mongo_client': mongo_client,})
execute_mocked = execute_graph.to_job(name='execute_mocked',
resource_defs={'mongo_client': mocked_mongo_client,})
这使用 Dagster 的 @graph 模式来描述操作的 DAG,然后使用 GraphDefinition 对象上的 .to_job()
方法以不同方式配置图形。这样你就可以拥有完全相同的底层操作结构,但传递不同的资源、标签、执行程序等。
我正在阅读 Dagster 教程并认为连接到我的本地 mongodb 是一个很好的练习。
from dagster import get_dagster_logger, job, op
from pymongo import MongoClient
@op
def connection():
client = MongoClient("mongodb://localhost:27017/")
return client["development"]
@job
def execute():
client = connection()
get_dagster_logger().info(f"Connection: {client} ")
Dagster 错误:
dagster.core.errors.DagsterExecutionHandleOutputError: Error occurred while handling output "result" of step "connection":
File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/plan/execute_plan.py", line 232, in dagster_event_sequence_for_step
for step_event in check.generator(step_events):
File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/plan/execute_step.py", line 348, in core_dagster_event_sequence_for_step
for evt in _type_check_and_store_output(step_context, user_event, input_lineage):
File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/plan/execute_step.py", line 405, in _type_check_and_store_output
for evt in _store_output(step_context, step_output_handle, output, input_lineage):
File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/plan/execute_step.py", line 534, in _store_output
for elt in iterate_with_context(
File "/usr/local/lib/python3.9/site-packages/dagster/utils/__init__.py", line 400, in iterate_with_context
return
File "/usr/local/Cellar/python@3.9/3.9.12/Frameworks/Python.framework/Versions/3.9/lib/python3.9/contextlib.py", line 137, in __exit__
self.gen.throw(typ, value, traceback)
File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/plan/utils.py", line 73, in solid_execution_error_boundary
raise error_cls(
The above exception was caused by the following exception:
TypeError: cannot pickle '_thread.lock' object
File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/plan/utils.py", line 47, in solid_execution_error_boundary
yield
File "/usr/local/lib/python3.9/site-packages/dagster/utils/__init__.py", line 398, in iterate_with_context
next_output = next(iterator)
File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/plan/execute_step.py", line 524, in _gen_fn
gen_output = output_manager.handle_output(output_context, output.value)
File "/usr/local/lib/python3.9/site-packages/dagster/core/storage/fs_io_manager.py", line 124, in handle_output
pickle.dump(obj, write_obj, PICKLE_PROTOCOL)
我已经在 ipython 中本地测试了这个并且它有效所以这个问题与 dagster 有关。
默认的IOManager requires that inputs and outputs to ops be pickleable - it's likely that your MongoClient is not. You might want to try refactoring this to use Dagster's @resource方法。这允许您在@op 外部定义资源,并使稍后在测试中模拟这些资源变得非常容易。您的代码看起来像这样:
from dagster import get_dagster_logger, job, op, resource
from pymongo import MongoClient
@resource
def mongo_client():
client = MongoClient("mongodb://localhost:27017/")
return client["development"]
@op(
required_resource_keys={'mongo_client'}
)
def test_client(context):
client = context.resources.mongo_client
get_dagster_logger().info(f"Connection: {client} ")
@job(
resource_defs={'mongo_client': mongo_client}
)
def execute():
test_client()
另请注意,我将测试代码移至另一个@op,然后仅从执行@job 中调用该op。这是因为作业定义中的代码在加载时被编译,并且仅用于描述要执行的操作图。所有执行任务的通用编程都需要包含在@op 代码中。
@resource 模式的真正巧妙之处在于,这使得使用模拟资源或更普遍地交换资源进行测试变得异常容易。假设您想要一个模拟客户端,这样您就可以 运行 您的工作代码而无需实际访问数据库。您可以执行以下操作:
@resource
def mocked_mongo_client():
from unittest.mock import MagicMock
return MagicMock()
@graph
def execute_graph():
test_client()
execute_live = execute_graph.to_job(name='execute_live',
resource_defs={'mongo_client': mongo_client,})
execute_mocked = execute_graph.to_job(name='execute_mocked',
resource_defs={'mongo_client': mocked_mongo_client,})
这使用 Dagster 的 @graph 模式来描述操作的 DAG,然后使用 GraphDefinition 对象上的 .to_job()
方法以不同方式配置图形。这样你就可以拥有完全相同的底层操作结构,但传递不同的资源、标签、执行程序等。