pyzmq REQ/REP with asyncio await for variable

pyzmq REQ/REP with asyncio await for variable

我第一次玩 asyncio python 并尝试将它与 ZMQ 结合起来。

基本上我的问题是我有一个 REP/REQ 系统,在 async def 中有一个我需要等待的功能。如何不更新值。 下面是一段代码来说明这一点:

#Declaring the zmq context
context = zmq_asyncio.Context()
REP_server_django = context.socket(zmq.REP)
REP_server_django.bind("tcp://*:5558")

我将这个对象发送到 class 并在这个函数中取回它

async def readsonar(self, trigger_pin, REP_server_django):
        i= 0
        while True:

            ping_from_view = await REP_server_django.recv()  # line.1
            value = await self.board.sonar_read(trigger_pin) # line.2
            print(value)                                     # line.3
            json_data = json.dumps(value)                    # line.4
            #json_data = json.dumps(i)                       # line.4bis
            REP_server_django.send(json_data.encode())       # line.5
            i+=1                                             # line.6
            await asyncio.sleep(1/1000)                      # line.7

sonar_read,正在使用 pymata_express 读取超声波传感器。如果我评论 line.2line.4 我得到 i 的正确值。如果我评论 line.1line.5print(value)sonar_read 打印出正确的值.但是,当我 运行 如此处所示时,value 未更新。

我是不是漏掉了什么?


EDIT :
编辑了关于行注释的类型。我的意思是,如果我只读取声纳并打印值。它工作正常。如果我只有 .recv().send(json.dumps(i).encode()),它就可以工作。但是如果我尝试从声纳发送值。它锁定给定的 value 未更新


EDIT2 :(Alan Yorinks 的回答):这是 MWE,它考虑了您发送的有关 class 中 zmq 声明的内容.它取自 pymata_express 示例 concurrent_tasks.py

要重现错误,运行 在两个不同的终端中运行这两个脚本。您将需要一个安装了 Frimata_express 的 arduino 开发板。如果 运行 一切顺利, PART A. 应该只在 mve_req.py 端吐出相同的值。您可以编辑不同的块(部分 A、B 或 C)以查看行为。

mve_rep.py

#ADAPTED FROM PYMATA EXPRESS EXAMPLE CONCURRENTTAKS
#https://github.com/MrYsLab/pymata-express/blob/master/examples/concurrent_tasks.py
import asyncio
import zmq
import json
import zmq.asyncio as zmq_asyncio
from pymata_express.pymata_express import PymataExpress


class ConcurrentTasks:

    def __init__(self, board):


        self.loop = board.get_event_loop()
        self.board = board

        self.ctxsync = zmq.Context()
        self.context = zmq.asyncio.Context()
        self.rep = self.context.socket(zmq.REP)
        self.rep.bind("tcp://*:5558")

        self.trigger_pin = 53
        self.echo_pin = 51

        loop.run_until_complete(self.async_init_and_run())

    async def readsonar(self):
        i = 0
        while True:


            #PART. A. WHAT I HOPE COULD WORK
            rep_recv = await self.rep.recv()                       # line.1
            value = await self.board.sonar_read(self.trigger_pin)  # line.2
            print(value)                                           # line.3
            json_data = json.dumps(value)                          # line.4
            # json_data = json.dumps(i)                            # line.4bis
            await self.rep.send(json_data.encode())                # line.5
            i += 1                                                 # line.6
            await asyncio.sleep(1 / 1000)                          # line.7


            '''
            #PART. B. WORKS FINE IN UPDATING THE SONAR_RAED VALUE AND PRINTING IT
            value = await self.board.sonar_read(self.trigger_pin)  # line.2
            print(value)                                           # line.3
            json_data = json.dumps(value)                          # line.4
            i += 1                                                 # line.6
            await asyncio.sleep(1 / 1000)                          # line.7
            '''

            '''
            #PART. C. WORKS FINE IN SENDING THE i VALUE OVER ZMQ
            rep_recv = await self.rep.recv()                       # line.1
            json_data = json.dumps(i)                              # line.4bis
            await self.rep.send(json_data.encode())                # line.5
            i += 1                                                 # line.6
            await asyncio.sleep(1 / 1000)                          # line.7
            '''



    async def async_init_and_run(self):

        await self.board.set_pin_mode_sonar(self.trigger_pin, self.echo_pin)

        readsonar = asyncio.create_task(self.readsonar())
        await readsonar

        # OTHER CREATED_TASK GO HERE, (removed them in the MVE, but they work fine)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    my_board = PymataExpress()
    try:
        ConcurrentTasks(my_board)
    except (KeyboardInterrupt, RuntimeError):
        loop.run_until_complete(my_board.shutdown())
        print('goodbye')
    finally:
        loop.close()

mve_req.py

import zmq
import time
import json

def start_zmq():
    context = zmq.Context()
    REQ_django  = context.socket(zmq.REQ)
    REQ_django.connect("tcp://localhost:5558")

    return REQ_django, context

def get_sonar(REQ_django):
    REQ_django.send(b"server_django")
    ping_from_server_django = REQ_django.recv()
    return ping_from_server_django.decode()

if __name__ == '__main__':

    data = {"sensors":{}}

    REQ_django, context = start_zmq()
    while REQ_django:

            data['sensors']['sonar'] = get_sonar(REQ_django)
            json_data = json.dumps(data)
            print(data)

            #DO OTHER WORK
            time.sleep(1)

    REQ_django.close()
    context.term()

( O/P MCVE-problem 定义更进一步 - 然而协调问题,无论优先级与否,{sensors|actors}-control-systems ,越是使用分布式自治代理设计的系统在专业上非常复杂,容易产生缺陷"shortcuts"或陷入

最好先阅读至少 about ZeroMQ Hierarchy in Less Than Five Seconds and 关于 相互 dead-lock 阻塞

阅读神话般的 Pieter HINTJENS 的书 "Code Connected: Volume 1"对于任何系统设计者都具有巨大的价值 )

"...seams quite interesting as it implements the async already, so I could just add the async zmq as I did. am I wrong?"

是的,没有“just add async”快捷方式,control-systems是非常有趣的学科,但相当复杂。总是。很抱歉不得不直截了当地听到这些。一些复杂性可能会在教科书示例或琐碎的 makers-projects 中对用户隐藏。然后锤子就开始尝试通过添加一个或几个更琐碎的功能来扩展它们。复杂性突然浮出水面,前所未见。


O/P正式图multi-agent-[A,B,C,D]-系统码(as-is)

将正式地图放在 full-screen 编辑器上,以便查看所有相互冲突的依赖关系和竞争 loops-of-control 的大图。延迟是最容易的部分。 un-resolvable死锁阻塞的几个风险点是核心。 ZeroMQ,因为 v2.x 有避免其中一些的工具,软件设计师有责任适当地缓解所有其他问题。控制系统(机器人或其他)必须证明这种稳健性和对错误的弹性,并且安全地 "survive" 以及所有 "external" 事故。

最好的起点是第 1 行汇编语言指令中表达的旧黄金法则:

;ASSUME NOTHING

并努力精心设计所有其余部分。


multi-agent-[A,B,C,D]-system coordination
             | | | |
             +-|-|-|--------------------- python while   ~ 100 [ms] GIL-lock enforced quota for pure-[SERIAL]-ised code-execution, imposed on all python-threads ( be it voluntarily or involuntarily interruped by the python GIL-lock mechanics, O/S-specific )
               +-|-|--------------------- hardware ~  64 - 147 [ms] self.board proxy-driven, responding to python code
                 +-|--------------------- python asynchronous, strict sequence of remote/local events dependent ZeroMQ dFSA, distributed among local-code operated REP and remote-code operated REQ-side(s) - enforcing a mutually ordered sequence of distributed behaviour as REQ/REP Scalable Formal Communication Archetype Pattern defines
                   +--------------------- python asyncio.get_event_loop() instantiated another event-loop that may permit to defer an execution(s) of some parts of otherwise imperative python-code to some later time

multi-agent-[A,B,C,D]-system code (as-is)
             : : : :
             : : : +---------------------------------------------------------+
             : : +-----------------------------------------------------------:-------------------+ - - - - - - - - - - - - - - - - -<?network?>- - - - - - - - - - - - - - +
             : +-------------------------------------------------------------:----------+        :                                                                         :
             :                                                               :          :        :                                                                         :
             :                                                               :          :        :                                                                         :
             !                                                               :          :        :                                                                         :
____PYTHON___!                                                               :          :        :                                                                         :
             !                                                               ?          ?        ?                                                                         ?
          +->!                                                              D?         B?       C?REP-1:{0:N}-remote---------------<?network?>------------------------REQ.C? dFSA-state?dependent
          ^  !                                                              D?         B?       C?REP-1:{0:N}                                                            .C?
          ^ A!: IMPERATIVE LOOP-HEAD: while True:                           D?AWAIT    B?       C?REP-1:{0:N}-distributed-Finite-State-Automaton (dFSA) BEHAVIOUR, local .C? side depends also on EVOLUTION OF A FUZZY, DYNAMIC, MULTIPARTY, network-wide dFSA-STATE(s) inside such ECOSYSTEM
          ^  !                                                              D?         B?       C?                                                                        
          ^  !                                                              D?         B?       C?                    REQ.C?-distributed-Finite-State-Automaton-STATE-REP.C?
          ^  !                                                              D?         B?       C?                       vC?                                             ^C?
          ^  !_______.SET DEFERRED:         P_D?C?_deferred_yield_ping     =D?await ...         C?REP.recv()---<--?---?--vC?-----<--<network>--------<--?remote-REQ-state-C?-( ^C?-dFSA-state && C?.recv()-blocking-mode of REQ/REP .recv()-waiting till a message, if any arrives error-free, blocks till then, just deferred via D?await )
          ^  !                                                              D?         B?                                vC?                                             ^C?
          ^  !_______.SET DEFERRED:         S_D?B?_deferred_yield_sonar    =D?await ...B?.board.sonar_read()-o-<--?-+    vC?                                             ^C?
          ^  !                                                                                               :      |    vC?                                             ^C?
          ^  !_______.GUI OUTPUT:           print( deferred_yield_sonar )  #A!->-----------------------------+->----?->---:?--->[ a last-known (if any) S_D?B?_deferred_yield_sonar value put "now" on GUI-screen ]
          ^  !                                                                                               :      ^    vC?                                             ^C?
          ^  !_______.SET TRANSFORMED:      S_D?B?_dependent_tranformed    =A!json.dumps( S_D?B? )--<--<--<--+      |    vC? <--[ a last-known (if any) S_D?B?_deferred_yield_sonar value transformed and assigned]
          ^  !                                                                                               :      |    vC?                                             ^C?
          ^  !_______.BLOCKING-MODE-SEND()  REP.send( S_D?B?_dependent_transformed.encode() )  #C? .send( S_D?B? )--?---->C?-->----<?network?>-->-------?remote-REQ-state-C?-( +C?-indeterministic and blocking-mode of REQ/REP .recv()-waiting till a message, if any arrives error-free, blocks till then )
          ^  !X:C?                                                                                                  ^    vC?                                             ^C?
          ^  !X:C?___.SET IMPERATIVE:       i += 1                                                                  | REQ.C?-distributed-Finite-State-Automaton-STATE-REP.C?
          ^  !X:C?                                                                                                  ?                                                       
          ^  !X:C?___.NOP/SLEEP() DEFERRED: await sleep( ... )             #D?AWAIT                                 ^                                                      :
          ^  !X:C?D?+0ms                                                                                            |                                                      :
          ^  !X:C?D?_.JUMP/LOOP                                                                                     ?                                                      :
          ^__!X:C?D?+0ms                                                                                            ^                                                      :
                                                                                                                    |                                                      :
                                                                                                                    |                                                      :
                                                                                                                    |                                                      :
____SONAR___________________________________________________________________________________________________________B? REQUEST T0: + EXPECT ~64 - ~147 [ms] LATENCY        :
                                                                                                                    B? hardware value acquisition latency can be masked    :
                                                                                                                       via await or other concurrency-trick )              :
                                                                                                                                                                           :
____REQ-side(s)_?{0:N} __________________________________________________________________________________________________________________________________________________^C? dFSA-remote autonomous-agent outside-of-CodeBlock-scope-of-control + behind <?network?>
_____REQ-side(s)_?{0:N} _________________________________________________________________________________________________________________________________________________^C? dFSA-remote autonomous-agent outside-of-CodeBlock-scope-of-control + behind <?network?>
______REQ-side(s)_?{0:N} ________________________________________________________________________________________________________________________________________________^C? dFSA-remote autonomous-agent outside-of-CodeBlock-scope-of-control + behind <?network?>
_______REQ-side(s)_?{0:N} _______________________________________________________________________________________________________________________________________________^C? dFSA-remote autonomous-agent outside-of-CodeBlock-scope-of-control + behind <?network?>
     ...                                                                                                                                                                 ::: ...
______...REQ-side(s)_?{0:N} _____________________________________________________________________________________________________________________________________________^C? dFSA-remote autonomous-agent outside-of-CodeBlock-scope-of-control + behind <?network?>

正如 O/P 的 EDIT : 2 小时前解释的那样,

问题现在很明显。无限while True:-loop指示hard-step逐行通过,然后循环-"rotate"所有步骤,一个接一个,同时任何存在的 asyncio await-装饰仿函数都被留下,异步独立于此 "main" A:while True:-命令式代码执行的循环块。同样的方式a B:self.board-设备的外部sonar-device是一个独立定时的设备,在python代码之外,有一些难以管理的hardware/read/decode-latencies,fixed-looping + C 的协调:ZeroMQ-REQ/REP-Archetype-behaviour(再次 externally-coordinated 和分散的 "foreign" REQ-actor(s)/agent(s) - 是的,你不知道,其中有多少...... - 但都在你的控制范围之外,而且所有 REQ-side(s) 和你的 locally-instantiated REP-side distributed-Finite-State-Machine 状态完全独立于 "framing"-python 循环驱动步骤的意愿向前并执行下一步,下一步,下一步...) + 另一个,这里D: asyncio.get_event_loop()-实例化"third"-event_loop,这会影响 await 修饰的仿函数实际上是如何被允许推迟产生它们的结果并在稍后的时间交付它们的 ----- 而且,这就是"cross-bread"-event_loops.

的问题

如果此问题设置已由任何计算机科学教授详细阐述,she/he 值得起立鼓掌,因为它使该任务成为分布式系统问题的最佳示例 - 几乎可以作为一种致敬感谢 Margaret HAMILTON 夫人在正确设计阿波罗 AGC 计算机系统方面所做的工作,她的工作解决了这个问题 class-of-problems,从而挽救了机组人员的生命,并挽救了 50 年前登月的所有荣耀。很棒的演讲,汉密尔顿夫人,很棒的演讲。

琐碎,却恰到好处。

确实是一项可爱而科学的绝妙任务:

为一组独立定时和操作的代理 [A, B, C, D], A 的稳健、failure-resilient 和协调工作设计一个策略,作为命令式解释 python 语言,主要有 GIL-lock prevented zero-concurrency,但是纯粹的 [SERIAL] process-flow,C 是 semi-persistent network-distributed REQ/REP-agents,B 是一个独立操作的硬件设备,有一些有限的 I/O 与 A-inspectable self.board-proxy 的接口,所有这些都是相互独立和物理分布的跨越给定的软件、硬件和网络生态系统。

昨天已经提出了硬件诊断+提议的系统架构方法。如果不测试 self.board 托管的 sonar-device 延迟,没有人可以决定下一个最佳步骤,因为现实的(in-vivo 基准)硬件 response-times(+ 最好还有文档.board 及其外围传感器设备 MUX-ed 或否?PRIO-driven 或 MUTEX-locked 或静态,non-shared 外围设备,register-read-only 抽象, ... ? ) 是决定可能的 [A, B, C, D]-协调策略的主要因素。


ZeroMQ 部分:

如果你评论[=37= - REP_server_django.send(json_data.encode()) # l.5 你进入最后一个块,因为 REQ/REP ZeroMQ 可扩展正式通信原型模式的原始严格形式不能 .recv(),如果在第一个 .recv() 收到 [= 之后,它没有回复 REQ 端43=].

这是一个简单的陷阱。


其余部分不是可重现的代码。

您可能想要:

  • 验证,如果 self.board.sonar_read( trigger_pin ) 收到任何值并测试这样做的延迟:

   import numpy as np
   from zmq import Stopwatch
   aClk = Stopwatch()

   def sonarBeep():
       try:
            a_value   = -1
            aClk.start()
            a_value   = self.board.sonar_read( trigger_pin )
            a_time_us = aClk.stop()
       except:
            try:
                aClk.stop()
            finally:
                a_time_us = -1
       finally:
           return( a_value, a_time_us )

和 运行 一系列 100 sonar-tests,以获得 min、Avg、StDev、MAX 延迟时间的读数全部在 [us] 因为这些值是基本知道的,以防某些 control-loops 被设计成 w.r.t SONAR-sensor 数据。

[ aFun( [ sonarBeep()[1] for _    in range( 100 ) ]
        )                for aFun in ( np.min, np.mean, np.std, np.max )
  ]

系统架构和sub-systems协调:

最后但同样重要的是,可以让读取和存储声纳数据,在一个绝对独立的事件循环中,不与任何其他操作协调,只从这样的存储中读取 state-variable,设置在一个独立工作的子系统(如果不是因为独立系统行为而极其省电的话)

每当有人试图紧密协调一系列独立事件(分布式系统中最糟糕的代理不协调或协调不力)设计必须在错误和时间的鲁棒性方面增长mis-alignments 和 error-resilience。否则系统很快就会 deadlock/livelock 自己。

如有疑问,可借鉴 XEROX Palo Alto 研究中心 MVC-separation 的原始理念,其中 MODEL 部分可以(和在 GUI-frameworks 大多数时间,因为 198x+ 确实 ) 接收许多 state-variables 更新所有独立于其他系统组件,如果它们只是 read/use 实际 state-variables' 数据需要他们,因为他们需要他们。同样,如果功率预算允许,SONAR 可以连续扫描场景并将读数写入任何 local-registers,并让其他组件来询问或满足他们对最后实际 SONAR 读数的请求。

ZeroMQ zen-of-zero 也是如此。

如果这可能有帮助,请检查 local-side message-store 的 zmq.CONFLATE 模式以这种方式工作。

小提示:有人可能已经注意到,sleep( 1 / 1000 ) 是一个相当昂贵的、重复执行的步骤并且很危险,因为它实际上不会在 py2.x 中休眠,由于到整数除法。

我不确定这是否能解决您的问题,但我确实发现了一些潜在的问题。

  1. 不清楚如何调用 readsonar。
  2. 上下文的创建有错别字。
  3. REP_server_django.send 未等待。

下面是我对代码的修改(未经测试):

import asyncio
import zmq
import json


class Play:
    def __init__(self):
        self.context = zmq.asyncio.Context()
        self.REP_server_django = self.context.socket(zmq.REP)
        self.REP_server_django.bind("tcp://*:5558")
        self.event_loop = asyncio.get_event_loop()
        self.event_loop.run_until_complete(self.readsonar(4))

    async def readsonar(self, trigger_pin):
        i = 0
        while True:
            ping_from_view = await self.REP_server_django.recv()  # l.1
            value = await self.board.sonar_read(trigger_pin)  # l.2
            print(value)  # l.3
            json_data = json.dumps(value)  # l.4
            # json_data = json.dumps(i) # l.4bis
            await self.REP_server_django.send(json_data.encode())  # l.5
            i += 1  # l.6
            await asyncio.sleep(1 / 1000)  # l.6

我让它工作了,但我不得不承认,我不明白它工作的原因。基本上我必须制作一个新的 async def,它只轮询 sonar_read 的读数并使用 asyncio.wait 到 return 的值。这是代码:

#ADAPTED FROM PYMATA EXPRESS EXAMPLE CONCURRENTTAKS
#https://github.com/MrYsLab/pymata-express/blob/master/examples/concurrent_tasks.py
import asyncio
import zmq
import json
import zmq.asyncio as zmq_asyncio
from pymata_express.pymata_express import PymataExpress


class ConcurrentTasks:

    def __init__(self, board):


        self.loop = board.get_event_loop()
        self.board = board

        self.ctxsync = zmq.Context()
        self.context = zmq.asyncio.Context()
        self.rep = self.context.socket(zmq.REP)
        self.rep.bind("tcp://*:5558")

        self.trigger_pin = 53
        self.echo_pin = 51

        loop.run_until_complete(self.async_init_and_run())

    ### START:  NEW CODE THAT RESOLVED THE ISSUE
    async def pingsonar(self):
        value = await self.board.sonar_read(self.trigger_pin)
        return value

    async def readsonar(self):
        while True:
            rep_recv = await self.rep.recv() 
            value = await asyncio.wait([self.pingsonar()])
            valuesonar = list(value[0])[0].result()
            json_data = json.dumps(valuesonar) 
            await self.rep.send(json_data.encode()) 
            await asyncio.sleep(1 / 1000) #maybe this line isn't necessary

    ### END : NEW CODE THAT RESOLVED THE ISSUE

    async def async_init_and_run(self):

        await self.board.set_pin_mode_sonar(self.trigger_pin, self.echo_pin)

        readsonar = asyncio.create_task(self.readsonar())
        await readsonar

        # OTHER CREATED_TASK GO HERE, (removed them in the MVE, but they work fine)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    my_board = PymataExpress()
    try:
        ConcurrentTasks(my_board)
    except (KeyboardInterrupt, RuntimeError):
        loop.run_until_complete(my_board.shutdown())
        print('goodbye')
    finally:
        loop.close()

尽管如此,我还是很感激你的帮助。

完全公开,我是 pymata-express and python-banyan. 的作者 OP 要求我 post 这个解决方案,所以这并不是一个无耻的插件。

自从在 Python 中首次引入 asyncio 以来,我一直在使用它进行开发。 3. 当 asyncio 代码工作时,asyncio(恕我直言)可以简化并发性和代码。但是,当出现问题时,调试和了解问题的原因可能会令人沮丧。

我提前道歉,因为这可能有点冗长,但我需要提供一些背景信息,这样示例就不会看起来像一些随机代码。

python-banyan 框架的开发是为了提供线程、多处理和异步的替代方案。简而言之,Banyan 应用程序由小型目标可执行文件组成,这些可执行文件使用通过 LAN 共享的协议消息相互通信。它的核心使用 Zeromq。它的设计目的不是让流量通过 WAN 移动,而是将 LAN 用作 "software backplane." 在某些方面,Banyan 类似于 MQTT,但在 LAN 中使用时速度要快得多。如果需要,它确实能够连接到 MQTT 网络。

Banyan的一部分是一个叫做OneGPIO的概念。它是一种协议消息传递规范,将 GPIO 功能抽象为独立于任何硬件实现。为了实现硬件细节,开发了称为 Banyan 硬件网关的专用 Banyan 组件。 Raspberry Pi、Arduino、ESP-8266 和 Adafruit Crickit Hat 都可以使用网关。 GPIO 应用程序发布通用的 OneGPIO 消息,任何或所有网关都可以选择接收这些消息。要从一个硬件平台移动到另一个硬件平台,将启动硬件关联网关,并在不修改的情况下启动控制组件(如下所示的代码)。从一个硬件平台到另一个硬件平台,任何组件都不需要修改代码,控制组件和网关都不需要修改。启动控制组件时,可以通过命令行选项指定诸如引脚号之类的变量。对于 Arduino Gateway,pymata-express 用于控制 Arduino 的 GPIO。 Pymata-express 是 StandardFirmata 客户端的异步实现。需要注意的是,下面的代码不是 asyncio。 Banyan 框架允许使用适合问题的工具进行开发,但允许解耦部分解决方案,在这种情况下,应用程序允许将异步与非异步混合,而不会出现通常在执行过程中遇到的任何麻烦所以。

在提供的代码中,class定义下方的所有代码都用于提供对命令行配置选项的支持。

import argparse
import signal
import sys
import threading
import time

from python_banyan.banyan_base import BanyanBase


class HCSR04(BanyanBase, threading.Thread):
    def __init__(self, **kwargs):
        """
        kwargs contains the following parameters
        :param back_plane_ip_address: If none, the local IP address is used
        :param process_name: HCSR04
        :param publisher_port: publishing port
        :param subscriber_port: subscriber port
        :param loop_time: receive loop idle time
        :param trigger_pin: GPIO trigger pin number
        :param echo_pin: GPIO echo pin number
        """

        self.back_plane_ip_address = kwargs['back_plane_ip_address'],
        self.process_name = kwargs['process_name']
        self.publisher_port = kwargs['publisher_port']
        self.subscriber_port = kwargs['subscriber_port'],
        self.loop_time = kwargs['loop_time']
        self.trigger_pin = kwargs['trigger_pin']
        self.echo_pin = kwargs['echo_pin']
        self.poll_interval = kwargs['poll_interval']

        self.last_distance_value = 0

        # initialize the base class
        super(HCSR04, self).__init__(back_plane_ip_address=kwargs['back_plane_ip_address'],
                                     subscriber_port=kwargs['subscriber_port'],
                                     publisher_port=kwargs['publisher_port'],
                                     process_name=kwargs['process_name'],
                                     loop_time=kwargs['loop_time'])

        threading.Thread.__init__(self)
        self.daemon = True

        self.lock = threading.Lock()

        # subscribe to receive messages from arduino gateway
        self.set_subscriber_topic('from_arduino_gateway')

        # enable hc-sr04 in arduino gateway
        payload = {'command': 'set_mode_sonar', 'trigger_pin': self.trigger_pin,
                   'echo_pin': self.echo_pin}
        self.publish_payload(payload, 'to_arduino_gateway')

        # start the thread
        self.start()

        try:
            self.receive_loop()
        except KeyboardInterrupt:
            self.clean_up()
            sys.exit(0)

    def incoming_message_processing(self, topic, payload):
        print(topic, payload)
        with self.lock:
            self.last_distance_value = payload['value']

    def run(self):
        while True:
            with self.lock:
                distance = self.last_distance_value
            payload = {'distance': distance}
            topic = 'distance_poll'
            self.publish_payload(payload, topic)
            time.sleep(self.poll_interval)


def hcsr04():
    parser = argparse.ArgumentParser()
    # allow user to bypass the IP address auto-discovery.
    # This is necessary if the component resides on a computer
    # other than the computing running the backplane.
    parser.add_argument("-b", dest="back_plane_ip_address", default="None",
                        help="None or IP address used by Back Plane")
    parser.add_argument("-i", dest="poll_interval", default=1.0,
                        help="Distance polling interval")
    parser.add_argument("-n", dest="process_name", default="HC-SRO4 Demo",
                        help="Set process name in banner")
    parser.add_argument("-p", dest="publisher_port", default="43124",
                        help="Publisher IP port")
    parser.add_argument("-s", dest="subscriber_port", default="43125",
                        help="Subscriber IP port")
    parser.add_argument("-t", dest="loop_time", default=".1",
                        help="Event Loop Timer in seconds")
    parser.add_argument("-x", dest="trigger_pin", default="12",
                        help="Trigger GPIO pin number")
    parser.add_argument("-y", dest="echo_pin", default="13",
                        help="Echo GPIO pin number")

    args = parser.parse_args()

    if args.back_plane_ip_address == 'None':
        args.back_plane_ip_address = None
    kw_options = {'back_plane_ip_address': args.back_plane_ip_address,
                  'publisher_port': args.publisher_port,
                  'subscriber_port': args.subscriber_port,
                  'process_name': args.process_name,
                  'loop_time': float(args.loop_time),
                  'trigger_pin': int(args.trigger_pin),
                  'echo_pin': int(args.echo_pin),
                  'poll_interval': int(args.poll_interval)
                  }

    # replace with the name of your class
    HCSR04(**kw_options)


# signal handler function called when Control-C occurs
def signal_handler(sig, frame):
    print('Exiting Through Signal Handler')
    raise KeyboardInterrupt


# listen for SIGINT
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

if __name__ == '__main__':
    hcsr04()