将列表从 QSharedMemory 复制到 sip.voidptr 的 Pythonic 方法

Pythonic way to copy list to a sip.voidptr from QSharedMemory

我正在使用 PyQT5 和 QSharedMemory class。我正在创建一个可以容纳 6 个 1 字节元素的共享内存。为了在共享内存数组中复制这些元素,我循环遍历 python 列表中的元素,如下面的代码片段:

f = shared_mem.data()
k = f.asarray()
memtocopy = [0,1,2,3,4,5]
for i in range(0,len(memtocopy)):
    k[i]  = memtocopy[i]
shared_mem.unlock()

这看起来很不像pythonic 和样板代码。请问有没有更合适的方法达到同样的效果?

使用时

k[:] = memtocopy

或:

k[:] = np.asarray(memtocopy,np.uint8)

它将失败并显示错误消息:

TypeError: can only assign another array of unsigned char to the slice

整个复现测试代码如下:

from PyQt5 import QtCore 

# Create shared memory and attach it
shared_mem = QtCore.QSharedMemory()
shared_mem.setNativeKey("test")
shared_mem.create(4*6)
shared_mem.attach()


# Fill in 
shared_mem.lock()
f = shared_mem.data()
k = f.asarray()

memtocopy = [0,1,2,3,4,5]

# Loop in question
for i in range(0,len(memtocopy)):
    k[i]  = memtocopy[i]

shared_mem.unlock()



# Read out
shared_mem.lock()
f1 = shared_mem.data()
k1 = f1.asarray()
shared_mem.unlock()

# Test results
if k1[0] == memtocopy[0]:
    print("success!")
else:
    print("fail!")

经过一番摆弄,我设法生成了一个适合我的组合:

a = array.array('i', range(6))
f[:] = buffer(a)
b = array.array('i')
b.fromstring(buffer(f))

这依赖于双向读取的缓冲协议。您可能可以直接将数组与 k 一起使用,并且 fromstring 在以后的版本中已重命名为 frombytes

在 Python 3.4 中,这有效:

a = array.array('i', range(6))
f[:] = memoryview(a).cast('B')
b = array.array('i')
b.frombytes(memoryview(f))

memoryview 替换了 buffer,但知道元素有 4 个字节大,因此需要额外的转换。

这是一种使用 struct and memoryview 的更简单的方法,它使用一对一行来读取和写入数据 :

import struct
from PyQt5 import QtCore

shared_mem = QtCore.QSharedMemory()
shared_mem.setNativeKey("test")
shared_mem.create(4*6)
shared_mem.attach()

memtocopy = [0,1,2,3,4,5]

try:

    # Fill in 
    shared_mem.lock()
    shared_mem.data()[:] = memoryview(struct.pack('=6i', *memtocopy))
    shared_mem.unlock()

    # Read out
    shared_mem.lock()
    # python3
    k = memoryview(shared_mem.data()).cast('i')
    # python2
    # k = struct.unpack('=6i', memoryview(shared_mem.data()))
    shared_mem.unlock()

    if k[3] == memtocopy[3]:
        print("success!")
    else:
        print("fail!")

finally:

    shared_mem.detach()