将 self 类型的实例作为参数传递给方法

Passing instance of type self to method as argument

我想将 PipelineTask 的实例传递给 PipelineTask.add,但是当我尝试时,我得到一个 NameError,其中提到 PipelineTask 未定义。

我相信这是因为 PipelineTask 仅在 PipelineTask.__init__() 被调用后绑定。

class Task(BaseModel, abc.ABC):
    id: str

    @abc.abstractmethod
    async def run(self):
        pass


class PipelineTask(Task):
    @abc.abstractmethod
    async def run(self):
        pass

    def add(self, task: PipelineTask):
        ## TODO: how do a pass a instance of self here?
        next = self
        if self.next == None:
            self.next = task
            next = self.next
        else:
            current = self.next
            while current != None:
                current = current.next
            next = current
            current.next = next
        return self

class Pipeline(BaseModel):
    """
    Pipeline execute a sequence of tasks

    ...
    init = PipelineTask(id=0)
    pipeline = Pipeline(init=PrepareDataPipelineTask(id='prepare'))
    pipeline.add(ExecutePipelineTask(id='execute')).add(CollectResultsPipelineTask(id='collect'))
    pipeline.run()
    ...
    """

    # The pipelines innitial task
    init: PipelineTask

    async def run(self):
        await self.init.run()
        has_next = self.init.next != None
        next = self.init
        while has_next:
            next = next.next
            await next.run()
            has_next = next.next != None

    ## Adds a task to the end of the pipeline
    async def add(self, next: PipelineTask):
        """add Task to the end of the pipeline"""
        self.init.add(next)


class StdoutTask(PipelineTask):
    async def run(self):
        print(f"[Worker {self.id}] testing...")


async def test_create_pipeline():
    tasks = (
        StdoutTask(id=1, next=None)
        .add(StdoutTask(id=2, next=None))
        .add(StdoutTask(id=3, next=None))
    )
    pipeline = Pipeline(init=tasks)
    await pipeline.run()

用法示例:

class StdoutTask(PipelineTask):
    async def run(self):
        print(f"[Worker {self.id}] testing...")

@pytest.mark.asyncio
async def test_create_pipeline():
    tasks = StdoutTask(id=1).add(StdoutTask(id=2)).add(StdoutTask(id=3))
    pipeline = Pipeline(init=tasks)
    await pipeline.run()
    pass

我尝试“取消”指定 task 的类型,但是 task 缺少 next 属性(例如 AttributeError: 'XXXX' object has no attributed next

def add(self, task):
    ...

我也试过修改 task.__class__ = PipelineTask,它添加了额外的方法而不是属性。

下面是一个可重现的单个文件示例

from pydantic import BaseModel
import abc
import asyncio


class Task(BaseModel, abc.ABC):
    id: str

    @abc.abstractmethod
    async def run(self):
        pass


class PipelineTask(Task):
    @abc.abstractmethod
    async def run(self):
        pass

    def add(self, task: PipelineTask):
        ## TODO: how do a pass a instance of self here?
        next = self
        if self.next == None:
            self.next = task
            next = self.next
        else:
            current = self.next
            while current != None:
                current = current.next
            next = current
            current.next = next
        return self


class Pipeline(BaseModel):
    """
    Pipeline execute a sequence of tasks

    ...
    init = PipelineTask(id=0)
    pipeline = Pipeline(init=PrepareDataPipelineTask(id='prepare'))
    pipeline.add(ExecutePipelineTask(id='execute')).add(CollectResultsPipelineTask(id='collect'))
    pipeline.run()
    ...
    """

    # The pipelines innitial task
    init: PipelineTask

    async def run(self):
        await self.init.run()
        has_next = self.init.next != None
        next = self.init
        while has_next:
            next = next.next
            await next.run()
            has_next = next.next != None

    ## Adds a task to the end of the pipeline
    async def add(self, next: PipelineTask):
        """add Task to the end of the pipeline"""
        self.init.add(next)


class StdoutTask(PipelineTask):
    async def run(self):
        print(f"[Worker {self.id}] testing...")


async def test_create_pipeline():
    tasks = (
        StdoutTask(id=1, next=None)
        .add(StdoutTask(id=2, next=None))
        .add(StdoutTask(id=3, next=None))
    )
    pipeline = Pipeline(init=tasks)
    await pipeline.run()

解决方案

使用getattr

      def add(self, task: "PipelineTask"):
          next = getattr(self, "next", None)
          if self.next == None:
              self.next = task
              next = self.next
          else:
              current = getattr(self, "next", None)
              while current != None:
                  current = getattr(current, "next", None)
              next = getattr(current, "next", None)
              current.next = next
          return self

问题可能出在您 if self.next == None:;只需在 __init__ 中设置 self.next 或将其称为 getattr(self, "next", None)

>> class Test():
...     def add(self):
...         print(getattr(self, "missing", None))
... 
>>> t = Test()
>>> t.add()
None
>>> t.missing = "not missing"
>>> t.add()
not missing

但是,最好在 __init__

期间设置 next 属性
class PipelineTask(Task):
    def __init__(self, *args, **kwargs):
        self.next = None                   # create .next attribute
        super().__init__(*args, **kwargs)  # call base class init

    ...