如何解雇一个又一个出版商?

How to fire one publisher after another?

我有两个发布者,一个根据状态收集消息,第二个是计时器。我希望这些按顺序触发 - 所以首先收集数据,然后启动计时器。我怎样才能做到这一点?这是我当前的代码:

let messagesPublisher = OnboardingStateLogic.publisher(
  forState: state,
  nextState: nextState
)

messagesPublisher
  .sink { completion in 
    print("completed")
  } receiveValue: { [weak self] messages in
    messages.forEach { message in
      self?.queue.enqueue(message)
    }
  }

timer = Timer
  .publish(every: 2, on: .main, in: .default)
  .autoconnect()
  .sink { _ in
    self.dequeueMessages()
  }

您可以使用 flatMap 运算符将发布者或值转换为 Combine 链中的新发布者。例如:

class ViewModel : ObservableObject {
    var cancellable : AnyCancellable?
    
    init() {
        cancellable = Just(1).flatMap { _ in
            Timer
              .publish(every: 2, on: .main, in: .default)
              .autoconnect()
        }.sink { value in
            print("Publisher fired: \(value)")
        }
    }
}

你会用你的 messagesPublisher

替换我琐碎的 Just(1) 发布者

我最终使用了 Publishers.Zip。这仅在两个上游发布者都输出值时发布:

messagesPublisher.sink(
  receiveCompletion: { _ in },
  receiveValue: { [weak self] message in
   self?.queue.enqueue(message)
  }
).store(in: &cancellables)

let delayPublisher = Timer
  .publish(every: 2, on: .main, in: .default)
  .autoconnect()
  .eraseToAnyPublisher()
  .setFailureType(to: Error.self)

let delayedValuesPublisher = Publishers.Zip(messagesPublisher, delayPublisher).eraseToAnyPublisher()

delayedValuesPublisher.sink(
  receiveCompletion: { completion in
    switch completion {
    case .failure(let error):
      print("something went wrong", error)
    case .finished:
      break
    }
  },
  receiveValue: { [weak self] _ in
    self?.dequeueMessages()
  }
).store(in: &cancellables)