如何在 Raspberry Pi 上实现 Modbus?

How to implement Modbus on Raspberry Pi?

我目前正在从事一个项目,我正在尝试以 Raspberry Pi 4 作为主机并控制多个执行器作为从机来实现 Modbus。为此,我购买了一个 special shield for my Pi. I've run a demo test program 来确认 Pi 可以使用它的新防护罩,但之后就碰壁了。

Shield user manual - 在用户手册文件夹中。

大师:

## To install dependencies:
## sudo pip3 install modbus-tk
##################################################################################################
import serial
import fcntl
import os
import struct
import termios
import array
#import modbus lib
import modbus_tk
import modbus_tk.defines as cst
import modbus_tk.modbus as modbus
#import modbus_tk.modbus_rtu as modbus_rtu
from modbus_tk import modbus_rtu

# RS485 ioctls define
TIOCGRS485 = 0x542E
TIOCSRS485 = 0x542F
SER_RS485_ENABLED = 0b00000001
SER_RS485_RTS_ON_SEND = 0b00000010
SER_RS485_RTS_AFTER_SEND = 0b00000100
SER_RS485_RX_DURING_TX = 0b00010000
# rs 485 port
ser1 = serial.Serial("/dev/ttySC0",9600)    
ser2 = serial.Serial("/dev/ttySC1",9600)

def rs485_enable():
    buf = array.array('i', [0] * 8) # flags, delaytx, delayrx, padding
    #enable 485 chanel 1
    fcntl.ioctl(ser1, TIOCGRS485, buf)
    buf[0] |=  SER_RS485_ENABLED|SER_RS485_RTS_AFTER_SEND
    buf[1]  = 0
    buf[2]  = 0
    fcntl.ioctl(ser1, TIOCSRS485, buf)

    #enable 485 chanel 2
    fcntl.ioctl(ser2, TIOCGRS485, buf)
    buf[0] |=  SER_RS485_ENABLED|SER_RS485_RTS_AFTER_SEND
    buf[1]  = 0
    buf[2]  = 0
    fcntl.ioctl(ser2, TIOCSRS485, buf)
#end of rs485_enable():


if __name__ == '__main__':

    logger = modbus_tk.utils.create_logger("console")

    rs485_enable()

    #set modbus master
    master = modbus_rtu.RtuMaster(
           serial.Serial(port= '/dev/ttySC0',
           baudrate=9600,
           bytesize=8,
           parity='N',
           stopbits=1,
           xonxoff=0)
       )

    master.set_timeout(5.0)
    master.set_verbose(True)
    logger.info("connected")

    logger.info(master.execute(1, cst.READ_HOLDING_REGISTERS, 0, 4))

    #send some queries
    #logger.info(master.execute(1, cst.READ_COILS, 0, 10))
    #logger.info(master.execute(1, cst.READ_DISCRETE_INPUTS, 0, 8))
    #logger.info(master.execute(1, cst.READ_INPUT_REGISTERS, 100, 3))
    #logger.info(master.execute(1, cst.READ_HOLDING_REGISTERS, 100, 12))
    #logger.info(master.execute(1, cst.WRITE_SINGLE_COIL, 7, output_value=1))
    #logger.info(master.execute(1, cst.WRITE_SINGLE_REGISTER, 100, output_value=54))
    #logger.info(master.execute(1, cst.WRITE_MULTIPLE_COILS, 0, output_value=[1, 1, 0, 1, 1, 0, 1, 1]))
    #logger.info(master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, 100, output_value=xrange(12)))

#end of if __name__ == '__main__': 

从机:

import sys

import serial
import fcntl
import os
import struct
import termios
import array
import time

import modbus_tk
import modbus_tk.defines as cst
import modbus_tk.modbus as modbus
#import modbus_tk.modbus_rtu as modbus_rtu
from modbus_tk import modbus_rtu
# RS485 ioctls
TIOCGRS485 = 0x542E
TIOCSRS485 = 0x542F
SER_RS485_ENABLED = 0b00000001
SER_RS485_RTS_ON_SEND = 0b00000010
SER_RS485_RTS_AFTER_SEND = 0b00000100
SER_RS485_RX_DURING_TX = 0b00010000
# rs 485 port
ser1 = serial.Serial("/dev/ttySC0",9600)    
ser2 = serial.Serial("/dev/ttySC1",9600)

def rs485_enable():
    buf = array.array('i', [0] * 8) # flags, delaytx, delayrx, padding
    #enable 485 chanel 1
    fcntl.ioctl(ser1, TIOCGRS485, buf)
    buf[0] |=  SER_RS485_ENABLED|SER_RS485_RTS_AFTER_SEND
    buf[1]  = 0
    buf[2]  = 0
    fcntl.ioctl(ser1, TIOCSRS485, buf)

    #enable 485 chanel 2
    fcntl.ioctl(ser2, TIOCGRS485, buf)
    buf[0] |=  SER_RS485_ENABLED|SER_RS485_RTS_AFTER_SEND
    buf[1]  = 0
    buf[2]  = 0
    fcntl.ioctl(ser2, TIOCSRS485, buf)
#end of def rs485_enable():


if __name__ == '__main__':

    logger = modbus_tk.utils.create_logger("console")

    rs485_enable()

    logger = modbus_tk.utils.create_logger(name="console", record_format="%(message)s")

    #Create the server
    server = modbus_rtu.RtuServer(serial.Serial('/dev/ttySC1'))

    try:
        logger.info("running...")
        logger.info("enter 'quit' for closing the server")

        server.start()

        slave_1 = server.add_slave(1)
        slave_1.add_block('0', cst.HOLDING_REGISTERS, 0, 100)
        while True:
            cmd = sys.stdin.readline()
            args = cmd.split(' ')

            if cmd.find('quit') == 0:
                sys.stdout.write('bye-bye\r\n')
                break

            elif args[0] == 'add_slave':
                slave_id = int(args[1])
                server.add_slave(slave_id)
                sys.stdout.write('done: slave %d added\r\n' % (slave_id))

            elif args[0] == 'add_block':
                slave_id = int(args[1])
                name = args[2]
                block_type = int(args[3])
                starting_address = int(args[4])
                length = int(args[5])
                slave = server.get_slave(slave_id)
                slave.add_block(name, block_type, starting_address, length)
                sys.stdout.write('done: block %s added\r\n' % (name))

            elif args[0] == 'set_values':
                slave_id = int(args[1])
                name = args[2]
                address = int(args[3])
                values = []
                for val in args[4:]:
                    values.append(int(val))
                slave = server.get_slave(slave_id)
                slave.set_values(name, address, values)
                values = slave.get_values(name, address, len(values))
                sys.stdout.write('done: values written: %s\r\n' % (str(values)))

            elif args[0] == 'get_values':
                slave_id = int(args[1])
                name = args[2]
                address = int(args[3])
                length = int(args[4])
                slave = server.get_slave(slave_id)
                values = slave.get_values(name, address, length)
                sys.stdout.write('done: values read: %s\r\n' % (str(values)))

            else:
                sys.stdout.write("unknown command %s\r\n" % (args[0]))
    finally:
        server.stop()

我计划使用的执行器是 Linak LA36。我相信这些是我将要使用的功能:

来自 this 文档的第 21-22 页。

这堵墙相当简单地开始使用 Modbus。我研究了执行器的技术文档以确定要发送的内容,但我对编写程序一头雾水。我曾希望也许能够修改演示程序以满足我的需要,但无法理解那里的代码使用。

在互联网上搜索,我试图找到一个教程或描述不同变量和函数的作用,以便更好地理解,但找不到类似的东西。我确实找到了演示代码 originated,但无法 find/understand 任何可以帮助我的东西。

我看到有些程序应该使用 Raspberry Pi(比如 PyModbus)来启用 Modbus,但我不确定我的情况是否不同,有一个特殊的屏蔽,并且这些程序是否适用于我的设置?

所以,最后,我来到这里希望得到一些帮助。建议、说明、示例,在这一点上欢迎任何可能让我更进一步的东西。也可能是使用演示代码作为基础是错误的,有人可以指出我不同的方向吗?

我非常愿意尝试不同的事情,我们将不胜感激。

提前致谢。

更新:

从那以后,我四处寻找其他选项,偶然发现了我正在尝试使用的 minimalmodbus。 RS485 屏蔽仍处于演示配置中...

...我一直在尝试在 Python 解释器中执行从 minimalmodbus 中找到的一些代码:

>>> import minimalmodbus
>>> instr = minimalmodbus.Instrument('/dev/ttySC0', 1)
>>> instr
minimalmodbus.Instrument<id=0xb7437b2c, address=1, close_port_after_each_call=False, debug=False, serial=Serial<id=0xb7437b6c, open=True>(port='/dev/ttySC0', baudrate=19200, bytesize=8, parity='N', stopbits=1, timeout=0.05, xonxoff=False, rtscts=False, dsrdtr=False)>
>>> instr.read_register(24, 1)
5.0
>>> instr.write_register(24, 450, 1)
>>> instr.read_register(24, 1)

我将 '/dev/ttyUSB0'(在原始代码中)更改为 '/dev/ttySC0'.现在我坚持:

>>> instr
    minimalmodbus.Instrument<id=0xb7437b2c, address=1, close_port_after_each_call=False, debug=False, serial=Serial<id=0xb7437b6c, open=True>(port='/dev/ttySC0', baudrate=19200, bytesize=8, parity='N', stopbits=1, timeout=0.05, xonxoff=False, rtscts=False, dsrdtr=False)>

这给出了 SyntaxError: invalid syntax 突出显示 minimalmodbus.

我认为您需要从简单的事情开始,并在此基础上进行构建。在您的评论中,您声明要使用 minimalmodbus,这很好,但让我们从演示代码开始,并尝试先使用您的执行器。稍后你可以回到其他库,如 minimalmodbuspymodbus.

在我们进入代码之前,我认为您应该了解什么是 Modbus。本质上,Modbus 使用串行端口(它可以通过 RS485 或更传统的 RS232 或通过 TTL 电平,但这只是物理层,用于传递信息的电平;你的帽子上已经有 RS485 端口,你的执行器也可以通过 RS485 工作,所以在这方面无需担心,只要您正确连接了总线,A 到 A 和 B 到 B)。

那么,除了串口,还需要什么Modbus? Modbus 在主从配置中工作。这意味着只有一个主机(您的 Raspberry Pi 计算机)和一个或多个从机(您的执行器)。根据我们在上一段中所说的,您的 Modbus 运行s 在两线(具有 RS485 电平)总线上。在这种配置中,与具有三根线的更通用的 RS232 标准相反:RX、TX 和 GND,您不能进行全双工通信(只有主机或所有从机之一可以在总线上通话,而所有其他设备收听,类似于对讲机收音机 link)。为了进一步扩展类比,就像您在 WT 收音机上需要一个 PTT(即按即说)按钮一样,您需要 Modbus 上的主设备或任何从设备的信号到 PTT 当他们想说话时。部分RS485收发器具有硬件实现的功能;在你的帽子上, 没有详细检查电路并查看演示代码,似乎总线上的方向控制是在软件中用 rs485_enable() 函数实现的。编辑: 详细查看硬件我必须纠正自己:你的 RS485 帽子实际上是通过硬件通过其 SPI 到双 UART SC16IS752 进行方向控制。该芯片旨在向后兼容使用流量控制 RTS 信号作为方向控制(我们之前提到的 PTT 功能)的旧 UART。这就是为什么你需要 rs485_enable().

理论讲完了,现在是实践部分。您似乎错过了一件重要的事情。在您 link 编辑的手册的第 21 页上,您有以下段落:

Before integration into a MODBUS system a few parameters of the actuator have to be checked and eventually changed. This preparation is done by use of the BusLink PC tool (the tool is described in details later) and guarantees that the actuator is able to execute basic functionality. Further fine-tuning may be required to fulfill system-or application requirements.

然后,如果您返回第 12 页的 table,您会看到他们所谓的寻址(通常称为从属 ID)默认为 24​​7(未分配)。因此,您需要做的第一件事是使用此 Buslink PC 工具将执行器上的地址设置为 1 到 246 之间的任意数字(如果您计划在总线上连接多个执行器,您将拥有为每个执行器设置不同的编号)。有关详细信息,请参阅第 28 页。

成功完成该配置后,您应该可以运行 演示主代码。例如,如果你想将执行器移动 10 毫米,你可以尝试:

## To install dependencies:
## sudo pip3 install modbus-tk
##################################################################################################
import serial
import fcntl
import os
import struct
import termios
import array
#import modbus lib
import modbus_tk
import modbus_tk.defines as cst
import modbus_tk.modbus as modbus
#import modbus_tk.modbus_rtu as modbus_rtu
from modbus_tk import modbus_rtu

# RS485 ioctls define
TIOCGRS485 = 0x542E
TIOCSRS485 = 0x542F
SER_RS485_ENABLED = 0b00000001
SER_RS485_RTS_ON_SEND = 0b00000010
SER_RS485_RTS_AFTER_SEND = 0b00000100
SER_RS485_RX_DURING_TX = 0b00010000
# rs 485 port
ser1 = serial.Serial("/dev/ttySC0",19200)    
ser2 = serial.Serial("/dev/ttySC1",9600)

def rs485_enable():
    buf = array.array('i', [0] * 8) # flags, delaytx, delayrx, padding
    #enable 485 chanel 1
    fcntl.ioctl(ser1, TIOCGRS485, buf)
    buf[0] |=  SER_RS485_ENABLED|SER_RS485_RTS_AFTER_SEND
    buf[1]  = 0
    buf[2]  = 0
    fcntl.ioctl(ser1, TIOCSRS485, buf)

    #enable 485 chanel 2
    fcntl.ioctl(ser2, TIOCGRS485, buf)
    buf[0] |=  SER_RS485_ENABLED|SER_RS485_RTS_AFTER_SEND
    buf[1]  = 0
    buf[2]  = 0
    fcntl.ioctl(ser2, TIOCSRS485, buf)
#end of rs485_enable():


if __name__ == '__main__':

    logger = modbus_tk.utils.create_logger("console")

    rs485_enable()

    #set modbus master
    master = modbus_rtu.RtuMaster(
           serial.Serial(port= '/dev/ttySC0',
           baudrate=9600,
           bytesize=8,
           parity='N',
           stopbits=1,
           xonxoff=0)
       )

    master.set_timeout(5.0)
    master.set_verbose(True)
    logger.info("connected")

    logger.info(master.execute(1, cst.WRITE_SINGLE_REGISTER, 1, output_value=100))  #Write target position 10mm (1/10mm*100)
    logger.info(master.execute(1, cst.WRITE_SINGLE_REGISTER, 2, output_value=1))  #Move actuator

请注意,我只更改了演示代码的最后两行。第一个 1,就在 cst.WRITE_SINGLE_REGISTER 之前,必须与您使用 BusLink PC 工具设置的地址从站相同。后面的数字(第一行 1,第二行 2)是您需要根据手册第 22 页写入的寄存器编号。最后,output_value 是您需要写入每个寄存器的值。在寄存器编号 1 上,您需要写下要将执行器从其参考位置移动的目标位置(以 0.1 毫米的倍数测量),然后在第二个上写上 1(再次参见手册第 22 页上的 table ,第 2 步和第 3 步)。

您可以通过读取输入寄存器 3 和 5 来完成步骤 4 的序列。请注意,要读取输入寄存器,功能代码是 cst.READ_INPUT_REGISTERS

试一试,看看能否成功。 完成后我们可以看一下minimalmodbus

编辑:更好地了解你的硬件是如何工作的(见上面的编辑),现在很清楚你可以使用你喜欢的任何 Modbus 库,你只需要 将演示代码保留在 #end of rs485_enable(): 上方,并在开始发送数据之前在某处调用 rs485_enable()

对于 minimalmodbus 你可以尝试这样的事情:

import serial
import fcntl
import os
import struct
import termios
import array
#Remove modbus-tk imports and add minimalmodbus
import minimalmodbus


# only /dev/ttySC0 will be used
# RS485 ioctls define
TIOCGRS485 = 0x542E
TIOCSRS485 = 0x542F
SER_RS485_ENABLED = 0b00000001
SER_RS485_RTS_ON_SEND = 0b00000010
SER_RS485_RTS_AFTER_SEND = 0b00000100
SER_RS485_RX_DURING_TX = 0b00010000
# rs 485 port
ser1 = serial.Serial("/dev/ttySC0",19200)    


def rs485_enable():
    buf = array.array('i', [0] * 8) # flags, delaytx, delayrx, padding
    #enable 485 chanel 1
    fcntl.ioctl(ser1, TIOCGRS485, buf)
    buf[0] |=  SER_RS485_ENABLED|SER_RS485_RTS_AFTER_SEND
    buf[1]  = 0
    buf[2]  = 0
    fcntl.ioctl(ser1, TIOCSRS485, buf)

#end of rs485_enable():


if __name__ == '__main__':

    actuator = minimalmodbus.Instrument('/dev/ttySC0', 1) # port name, slave address (in decimal), change according to actuator address

    rs485_enable()   #you need to keep this for your hat to work

    #minimalmodbus setup
    actuator.serial.port               # this is the serial port name
    actuator.serial.baudrate = 19200   # Baud rate
    actuator.serial.bytesize = 8
    actuator.serial.parity   = serial.PARITY_NONE
    actuator.serial.stopbits = 1
    actuator.serial.timeout  = 0.05   # seconds

    actuator.address     # this is the slave (actuator) address number
    actuator.mode = minimalmodbus.MODE_RTU   # rtu mode

    #write registers
    actuator.write_register(1, 100)  #write target distance to move
    actuator.write_register(2, 1)    #Move!