在 API 端口发送假 RxFrameNtf 的简单方法

Simple way to send fake RxFrameNtf on API port

我可以从 webshel​​l 向连接到 API 的应用程序发送伪造的 RxFrameNtfs/DatagramNtfs 吗?
IE。在调制解调器 websh 中,我希望创建一个 ntf:

dgntf = new org.arl.unet.DatagramNtf(from:2,to:1,data:new String("hello"), priority:org.arl.unet.Priority.NORMAL, protocol:0, ttl:2)

我想将其发送到使用 Python Unetsocket 连接到调制解调器的应用程序。 本质上,我正在寻找的是一种简单的方法来测试我可以做到的 Python 方面 在单个模拟调制解调器和物理调制解调器上使用。

如果我正确理解你想要的是基本的 python tx.pyrx.py,它们在真实和模拟环境中进行通信。那么答案是肯定的。

对于模拟: 您可以 运行 samples 文件夹中的 2-node-network.groovy

//2-node-network.groovy

import org.arl.fjage.*

///////////////////////////////////////////////////////////////////////////////
// display documentation

println '''
2-node network
--------------

Node A: tcp://localhost:1101, http://localhost:8081/
Node B: tcp://localhost:1102, http://localhost:8082/
'''

///////////////////////////////////////////////////////////////////////////////
// simulator configuration

platform = RealTimePlatform   // use real-time mode

// run the simulation forever
simulate {
  node 'A', location: [ 0.km, 0.km, -15.m], web: 8081, api: 1101, stack: "$home/etc/setup"
  node 'B', location: [ 1.km, 0.km, -15.m], web: 8082, api: 1102, stack: "$home/etc/setup"
}

现在打开 2 个单独的 ipython3 终端来测试功能。

我建议先打开 rx 侧,以免错过传输。

C:\Users\jay_p>ipython3
Python 3.9.1 (tags/v3.9.1:1e5d33e, Dec  7 2020, 17:08:21) [MSC v.1927 64 bit (AMD64)]
Type 'copyright', 'credits' or 'license' for more information
IPython 7.19.0 -- An enhanced Interactive Python. Type '?' for help.

In [1]: from unetpy import UnetSocket
   ...: from unetpy import *
   ...: 
   ...: s = UnetSocket('localhost', 1102)
   ...: modem = s.getGateway()

In [2]: rx = modem.receive(RxFrameNtf, 5000) # this will wait till you receive ntf
   ...: # print rx data, you will get this once you tx data from other side             
   ...: print('from node', rx.from_, ':', bytearray(rx.data).decode()) 
Out[2] from node 204 : hello!

Tx 一侧:

C:\Users\jay_p>ipython3
Python 3.9.1 (tags/v3.9.1:1e5d33e, Dec  7 2020, 17:08:21) [MSC v.1927 64 bit (AMD64)]
Type 'copyright', 'credits' or 'license' for more information
IPython 7.19.0 -- An enhanced Interactive Python. Type '?' for help.

In [1]: from unetpy import UnetSocket
   ...: 
   ...: s = UnetSocket('localhost', 1101)

In [2]: s.send('hello!', 0)
Out[2]: True

In [3]: s.close()

您也可以 运行 这作为 tx.pyrx.py。借自 Unet Documentations - 第 2.5 节 .

# tx.py  
from unetpy import UnetSocket

s = UnetSocket('localhost', 1101)                
s.send('hello!', 0)                              
s.close()
# rx.py  
from unetpy import UnetSocket
from unetpy import *

s = UnetSocket('localhost', 1102)
modem = s.getGateway()                               
rx = modem.receive(RxFrameNtf, 5000)                                                
print('from node', rx.from_, ':', bytearray(rx.data).decode())  
s.close()

对于真正的调制解调器,您只需在脚本中相应地更改 IP address,它也可以在调制解调器上完美运行。

补充一下,如果您在调制解调器上执行此操作并希望从调制解调器的 shell 发送,这非常简单:

send new org.arl.unet.DatagramNtf(
  recipient: topic(agent("uwlink")),   // to make it appear as if it came from uwlink
  from: 2,
  to: 1,
  data: new String("hello"),
  priority: org.arl.unet.Priority.NORMAL,
  protocol: 0,
  ttl: 2
)