'Last message only' ZMQ 订阅套接字中的选项

'Last message only' option in ZMQ Subscribe socket

当 CONFLATE 选项设置为 true 时,ZMQ 订阅者套接字仅保留队列中的最后一条消息。 (zmq_docs) 但是,它似乎对我不起作用。通常在 python 我会做类似下面的事情并且它会起作用:

context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.setsockopt(zmq.CONFLATE, 1)
subscriber.connect("tcp://localhost:5555")

下面 pub/sub 模式中的订阅者简单地忽略了设置为 1 的 CONFLATE 选项。您可以通过订阅者上显示的分钟和秒来观察程序在处理旧消息时被占用 (纤维 42)。如果 fib 设置为一个微不足道的值,你可以看到订阅者确实在接收来自发布者的消息。

这是发布者:

open System
open fszmq
open fszmq.Context
open fszmq.Socket


let funcPublish () =
  use context   = new Context()
  use publisher = pub context
  "tcp://*:5563" |> bind publisher

  while true do
    let tm = System.DateTime.Now
    let t = String.Concat([tm.Minute.ToString(); " "; tm.Second.ToString()])
    t |> s_send publisher
    sleep 1

  EXIT_SUCCESS


[<EntryPoint>]
let main argv = 
    funcPublish ()
    0

这是订阅者:

open fszmq
open fszmq.Context
open fszmq.Socket

let rec fib n =
    match n with
    | 1 | 2 -> 1
    | n -> fib(n-1) + fib(n-2)

let funcSubscribe () = 
  use context    = new Context()
  use subscriber = sub context
  "tcp://localhost:5563" |> connect subscriber
  Socket.setOption subscriber (ZMQ.CONFLATE, 1)
  [ ""B ] |> subscribe subscriber


  while true do
    let contents = s_recv subscriber
    fib 42
    contents |> printfn "%A"

  EXIT_SUCCESS


[<EntryPoint>]
let main argv = 
    funcSubscribe ()
    0

谢谢。

我注意到您的 Python 代码和 F# 代码之间有一处不同。在 Python 中,您在连接到套接字 之前 设置了 CONFLATE 选项,但是在 F# 中,您在 之后 设置了 CONFLATE 选项连接。

如果您在 F# 代码中将 Socket.setOption 行移动到 connect subscriber 行之前,我想这应该可以解决您的问题。

换句话说,不是这个:

"tcp://localhost:5563" |> connect subscriber
Socket.setOption subscriber (ZMQ.CONFLATE, 1)

你可能应该有这个:

Socket.setOption subscriber (ZMQ.CONFLATE, 1)
"tcp://localhost:5563" |> connect subscriber