动态的 Clojure 秘诀 Channels/pub-sub

Clojure Recipe for Dynamic Channels/pub-sub

我正在寻找一种使用 core.async(或任何可行的方法)动态建立出版物订阅者的方法。

问题:我有一些消息需要根据消息的 :sender 进行处理。每条消息将由相同的函数操作,但重点是每个发件人的消息将按顺序处理,一次一条——多个主题基于 :sender 每个消费者都有一个钥匙。我还需要一种方法来限制所有订阅中活跃消费者的数量,以降低资源利用率。

我的想法是我会有一个 pub 频道:

(def in-chan (chan))
(def publication (pub in-chan :sender))

但我希望能够确保在新发件人联机时始终有订阅者。只要代码保持小而简单,我愿意接受更好的设施。

问题:有没有一种惯用的方法可以确保在发送消息之前有特定发布的订阅者?如何协调每个订阅的所有消费者使用共享线程池?

编辑:我已经弄清楚如何使用线程池和每个主题的单个消费者来协调工作。我认为为了检查子是否存在,我将使用映射的引用将主题名称存储到子。如果 ref 没有订阅者条目,我会创建一个并将其添加到地图中;接下来我将订阅者注册到发布并发布消息。这个问题的目的是看看是否有更好的方法来启动和跟踪动态创建主题的订阅者。

我想到的解决方案是使用 ref:

(def registration (ref {}))

在写入之前写入发布通道的线程使用此注册:

(defn register
  [registration topic-name]
  (dosync
    (let [r (ensure registration)
          something-to-track :tracked] ;; In my case, I'm keeping track of a channel
      (when-not (get r topic-name)
        (alter registration assoc topic-name something-to-track)
        something-to-track))))

每当我们需要发布消息时,我们都可以使用此功能 "register" 新订阅者。如果以前不存在,它将 return something-to-track。就我而言,这是我随后将调用 sub 的频道。如果是nil,我可以无视。为了真正不在并发环境中错过消息,我需要在事务中做一些事情(需要了解 ensure 是否会通过跨线程授予对 registration 的独占访问来保护我)但我的管道是这样的小我可以写到pub单线程。