使用 Combine with Swift 异步执行任务

Executing a task asynchronously using Combine with Swift

最近开始研究Combine和运行 into某题。 首先,我将描述我在做什么。 我尝试使用干净的架构 在这里你可以看到我的存储库

protocol Repository {
    func test()
}

class MockRepository: Repository {
    func test() {
        sleep(3)
    }
}

然后我创建了用例

class UseCaseBase<TInput, TOutput> {
    var task: TOutput? { return nil }

    var repository: Repository

    init(_ repository: Repository) {
        self.repository = repository
    }

    func execute(with payload: TInput) -> AnyPublisher<TOutput, Never> {
        return AnyPublisher(Future<TOutput, Never> { promise in
            promise(.success(self.task!))
        })
            .eraseToAnyPublisher()
    }
}

class MockUseCase: UseCaseBase<String, Int> {
    override var task: Int? {
        repository.test()
        return 1
    }
}

然后在 ContentView 的初始化块中我做了类似的事情

init() {
        let useCase = MockUseCase(MockRepository())
        var cancellables = Set<AnyCancellable>()
        
        useCase.execute(with: "String")
            .sink(receiveValue: { value in
                print(value)
            })
            .store(in: &cancellables)
        
        print("Started")
        
    }

首先,我想得到 “开始” 然后在 sleep(3) 之后 值“1”

现在我明白了 “1”然后“开始”

我要提到的第一件事是,您需要持有对 cancellables 的引用,否则当您向链中添加异步处理时,您的发布者将自动取消。将它移出您的 init 方法并移至 属性.

您也可以摆脱 sleep 并简单地链接到您选择的队列中的 Delay 发布者。我选择了main.

struct SomeThing {
    var cancellables = Set<AnyCancellable>()

    init() {
        MockUseCase(MockRepository())
            .execute(with: "String")
            .delay(for: 3.0, scheduler: DispatchQueue.main)
            .sink(receiveValue: { print([=10=]) } )
            .store(in: &cancellables)
        print("Started")
    }
}

class MockRepository: Repository {
    func test() {
        // sleep(3)
    }
}

另一种选择是摆脱 delay,摆脱 sleep 并异步履行您的承诺:

struct SomeThing {
    var cancellables = Set<AnyCancellable>()

    init() {
        MockUseCase(MockRepository())
            .execute(with: "String")
            .sink(receiveValue: { print([=11=]) } )
            .store(in: &cancellables)
        print("Started")
    }
}

class MockRepository: Repository {
    func test() {
        // sleep(3)
    }
}

func execute(with payload: TInput) -> AnyPublisher<TOutput, Never> {
    return Future<TOutput, Never> { promise in
        DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
            promise(.success(self.task!))
        }
    }
    .eraseToAnyPublisher()
}

您的 sleep(3) 调用在主线程上运行,这意味着它会阻止任何其他操作,包括打印“Started”文本的代码。

我不会胡说八道阻塞主线程有多糟糕,这是众所周知的信息,但这就是您看到所询问的行为的原因。

我在你的问题中没有看到任何线程切换代码,所以如果你想实现某种异步性,那么你可以使用 Rob 使用 dispatch(after:) 的解决方案,或者 do the locomotion(睡眠)在另一个线程上:

func execute(with payload: TInput) -> AnyPublisher<TOutput, Never> {
    return AnyPublisher(Future<TOutput, Never> { promise in
        DispatchQueue.global().async {
            promise(.success(self.task!))
        }
    })
        .eraseToAnyPublisher()
}