将 self 类型的实例作为参数传递给方法
Passing instance of type self to method as argument
我想将 PipelineTask
的实例传递给 PipelineTask.add
,但是当我尝试时,我得到一个 NameError
,其中提到 PipelineTask 未定义。
我相信这是因为 PipelineTask
仅在 PipelineTask.__init__()
被调用后绑定。
class Task(BaseModel, abc.ABC):
id: str
@abc.abstractmethod
async def run(self):
pass
class PipelineTask(Task):
@abc.abstractmethod
async def run(self):
pass
def add(self, task: PipelineTask):
## TODO: how do a pass a instance of self here?
next = self
if self.next == None:
self.next = task
next = self.next
else:
current = self.next
while current != None:
current = current.next
next = current
current.next = next
return self
class Pipeline(BaseModel):
"""
Pipeline execute a sequence of tasks
...
init = PipelineTask(id=0)
pipeline = Pipeline(init=PrepareDataPipelineTask(id='prepare'))
pipeline.add(ExecutePipelineTask(id='execute')).add(CollectResultsPipelineTask(id='collect'))
pipeline.run()
...
"""
# The pipelines innitial task
init: PipelineTask
async def run(self):
await self.init.run()
has_next = self.init.next != None
next = self.init
while has_next:
next = next.next
await next.run()
has_next = next.next != None
## Adds a task to the end of the pipeline
async def add(self, next: PipelineTask):
"""add Task to the end of the pipeline"""
self.init.add(next)
class StdoutTask(PipelineTask):
async def run(self):
print(f"[Worker {self.id}] testing...")
async def test_create_pipeline():
tasks = (
StdoutTask(id=1, next=None)
.add(StdoutTask(id=2, next=None))
.add(StdoutTask(id=3, next=None))
)
pipeline = Pipeline(init=tasks)
await pipeline.run()
用法示例:
class StdoutTask(PipelineTask):
async def run(self):
print(f"[Worker {self.id}] testing...")
@pytest.mark.asyncio
async def test_create_pipeline():
tasks = StdoutTask(id=1).add(StdoutTask(id=2)).add(StdoutTask(id=3))
pipeline = Pipeline(init=tasks)
await pipeline.run()
pass
我尝试“取消”指定 task
的类型,但是 task
缺少 next
属性(例如 AttributeError: 'XXXX' object has no attributed next
)
def add(self, task):
...
我也试过修改 task.__class__ = PipelineTask
,它添加了额外的方法而不是属性。
下面是一个可重现的单个文件示例
from pydantic import BaseModel
import abc
import asyncio
class Task(BaseModel, abc.ABC):
id: str
@abc.abstractmethod
async def run(self):
pass
class PipelineTask(Task):
@abc.abstractmethod
async def run(self):
pass
def add(self, task: PipelineTask):
## TODO: how do a pass a instance of self here?
next = self
if self.next == None:
self.next = task
next = self.next
else:
current = self.next
while current != None:
current = current.next
next = current
current.next = next
return self
class Pipeline(BaseModel):
"""
Pipeline execute a sequence of tasks
...
init = PipelineTask(id=0)
pipeline = Pipeline(init=PrepareDataPipelineTask(id='prepare'))
pipeline.add(ExecutePipelineTask(id='execute')).add(CollectResultsPipelineTask(id='collect'))
pipeline.run()
...
"""
# The pipelines innitial task
init: PipelineTask
async def run(self):
await self.init.run()
has_next = self.init.next != None
next = self.init
while has_next:
next = next.next
await next.run()
has_next = next.next != None
## Adds a task to the end of the pipeline
async def add(self, next: PipelineTask):
"""add Task to the end of the pipeline"""
self.init.add(next)
class StdoutTask(PipelineTask):
async def run(self):
print(f"[Worker {self.id}] testing...")
async def test_create_pipeline():
tasks = (
StdoutTask(id=1, next=None)
.add(StdoutTask(id=2, next=None))
.add(StdoutTask(id=3, next=None))
)
pipeline = Pipeline(init=tasks)
await pipeline.run()
解决方案
使用getattr
def add(self, task: "PipelineTask"):
next = getattr(self, "next", None)
if self.next == None:
self.next = task
next = self.next
else:
current = getattr(self, "next", None)
while current != None:
current = getattr(current, "next", None)
next = getattr(current, "next", None)
current.next = next
return self
问题可能出在您 if self.next == None:
;只需在 __init__
中设置 self.next
或将其称为 getattr(self, "next", None)
>> class Test():
... def add(self):
... print(getattr(self, "missing", None))
...
>>> t = Test()
>>> t.add()
None
>>> t.missing = "not missing"
>>> t.add()
not missing
但是,最好在 __init__
期间设置 next
属性
class PipelineTask(Task):
def __init__(self, *args, **kwargs):
self.next = None # create .next attribute
super().__init__(*args, **kwargs) # call base class init
...
我想将 PipelineTask
的实例传递给 PipelineTask.add
,但是当我尝试时,我得到一个 NameError
,其中提到 PipelineTask 未定义。
我相信这是因为 PipelineTask
仅在 PipelineTask.__init__()
被调用后绑定。
class Task(BaseModel, abc.ABC):
id: str
@abc.abstractmethod
async def run(self):
pass
class PipelineTask(Task):
@abc.abstractmethod
async def run(self):
pass
def add(self, task: PipelineTask):
## TODO: how do a pass a instance of self here?
next = self
if self.next == None:
self.next = task
next = self.next
else:
current = self.next
while current != None:
current = current.next
next = current
current.next = next
return self
class Pipeline(BaseModel):
"""
Pipeline execute a sequence of tasks
...
init = PipelineTask(id=0)
pipeline = Pipeline(init=PrepareDataPipelineTask(id='prepare'))
pipeline.add(ExecutePipelineTask(id='execute')).add(CollectResultsPipelineTask(id='collect'))
pipeline.run()
...
"""
# The pipelines innitial task
init: PipelineTask
async def run(self):
await self.init.run()
has_next = self.init.next != None
next = self.init
while has_next:
next = next.next
await next.run()
has_next = next.next != None
## Adds a task to the end of the pipeline
async def add(self, next: PipelineTask):
"""add Task to the end of the pipeline"""
self.init.add(next)
class StdoutTask(PipelineTask):
async def run(self):
print(f"[Worker {self.id}] testing...")
async def test_create_pipeline():
tasks = (
StdoutTask(id=1, next=None)
.add(StdoutTask(id=2, next=None))
.add(StdoutTask(id=3, next=None))
)
pipeline = Pipeline(init=tasks)
await pipeline.run()
用法示例:
class StdoutTask(PipelineTask):
async def run(self):
print(f"[Worker {self.id}] testing...")
@pytest.mark.asyncio
async def test_create_pipeline():
tasks = StdoutTask(id=1).add(StdoutTask(id=2)).add(StdoutTask(id=3))
pipeline = Pipeline(init=tasks)
await pipeline.run()
pass
我尝试“取消”指定 task
的类型,但是 task
缺少 next
属性(例如 AttributeError: 'XXXX' object has no attributed next
)
def add(self, task):
...
我也试过修改 task.__class__ = PipelineTask
,它添加了额外的方法而不是属性。
下面是一个可重现的单个文件示例
from pydantic import BaseModel
import abc
import asyncio
class Task(BaseModel, abc.ABC):
id: str
@abc.abstractmethod
async def run(self):
pass
class PipelineTask(Task):
@abc.abstractmethod
async def run(self):
pass
def add(self, task: PipelineTask):
## TODO: how do a pass a instance of self here?
next = self
if self.next == None:
self.next = task
next = self.next
else:
current = self.next
while current != None:
current = current.next
next = current
current.next = next
return self
class Pipeline(BaseModel):
"""
Pipeline execute a sequence of tasks
...
init = PipelineTask(id=0)
pipeline = Pipeline(init=PrepareDataPipelineTask(id='prepare'))
pipeline.add(ExecutePipelineTask(id='execute')).add(CollectResultsPipelineTask(id='collect'))
pipeline.run()
...
"""
# The pipelines innitial task
init: PipelineTask
async def run(self):
await self.init.run()
has_next = self.init.next != None
next = self.init
while has_next:
next = next.next
await next.run()
has_next = next.next != None
## Adds a task to the end of the pipeline
async def add(self, next: PipelineTask):
"""add Task to the end of the pipeline"""
self.init.add(next)
class StdoutTask(PipelineTask):
async def run(self):
print(f"[Worker {self.id}] testing...")
async def test_create_pipeline():
tasks = (
StdoutTask(id=1, next=None)
.add(StdoutTask(id=2, next=None))
.add(StdoutTask(id=3, next=None))
)
pipeline = Pipeline(init=tasks)
await pipeline.run()
解决方案
使用getattr
def add(self, task: "PipelineTask"):
next = getattr(self, "next", None)
if self.next == None:
self.next = task
next = self.next
else:
current = getattr(self, "next", None)
while current != None:
current = getattr(current, "next", None)
next = getattr(current, "next", None)
current.next = next
return self
问题可能出在您 if self.next == None:
;只需在 __init__
中设置 self.next
或将其称为 getattr(self, "next", None)
>> class Test():
... def add(self):
... print(getattr(self, "missing", None))
...
>>> t = Test()
>>> t.add()
None
>>> t.missing = "not missing"
>>> t.add()
not missing
但是,最好在 __init__
next
属性
class PipelineTask(Task):
def __init__(self, *args, **kwargs):
self.next = None # create .next attribute
super().__init__(*args, **kwargs) # call base class init
...