Pexpect 多线程空闲状态
Pexpect Multi-Threading Idle State
我们有大约 15,000 个节点可以通过 Pexpect 登录和提取数据。为了加快速度,我正在进行多处理——在 12 个内核之间平均分配负载。效果很好,但是每个核心仍然超过 1000 个节点 - 一次处理一个节点。
每个内核在执行此处理时的 CPU 利用率大约为 2%。这是有道理的,因为大部分时间只是等待在节点流输出时看到 Pexpect 期望值。为了尝试利用这一点并进一步加快速度,我想在每个内核的多处理中实现多线程。
为了避免共享变量出现任何问题,我将登录到一个节点所需的所有数据放入一个字典中(每个节点在字典中一个键),然后对字典进行切片,每个线程接收一个唯一的切片。然后在线程完成后,我将字典切片重新组合在一起。
但是,我仍然看到一个线程 完全 在移动到下一个线程之前完成。
我想知道什么构成了空闲状态,这样一个核心就可以移动到另一个线程上工作?它一直在寻找 Pexpect 期望值这一事实是否意味着它永远不会闲置?
此外,因为我对每个线程使用相同的目标函数。我不确定每个线程的目标函数是否相同(该函数的本地变量相同)是否会影响这个?
下面是我的多线程代码,供参考
感谢您的任何见解!
import threading
import <lots of other stuff>
class ThreadClass(threading.Thread):
def __init__(self, outputs_dict_split):
super(ThreadClass, self).__init__()
self.outputs_dict_split = outputs_dict_split
def run(self):
outputs_dict_split = get_output(self.outputs_dict_split)
return outputs_dict_split
def get_output(outputs_dict):
### PEXPECT STUFF TO LOGIN AND RUN COMMANDS ####
### WRITE DEVICE'S OUTPUTS TO DEVICE'S OUTPUTS_DICT RESULTS SUB-KEY ###
def backbone(outputs_dict):
filterbykey = lambda keys: {x: outputs_dict[x] for x in keys}
num_threads = 2
device_split = np.array_split(list(outputs_dict.keys()), num_threads)
outputs_dict_split_list = []
split_list1 = list(device_split[0])
split_list2 = list(device_split[1])
outputs_dict_split1 = filterbykey(split_list1)
outputs_dict_split2 = filterbykey(split_list2)
t1 = ThreadClass(outputs_dict_split1)
t2 = ThreadClass(outputs_dict_split2)
t1.start()
t2.start()
t1.join()
t2.join()
outputs_dict_split1 = t1.outputs_dict_split
outputs_dict_split2 = t2.outputs_dict_split
outputs_dict_split_list.append(outputs_dict_split1)
outputs_dict_split_list.append(outputs_dict_split2)
outputs_dict = ChainMap(*outputs_dict_split_list)
### Downstream Processing ###
这确实奏效了。但是,我必须扩展正在处理的设备数量才能看到整体处理时间的显着改进。
我们有大约 15,000 个节点可以通过 Pexpect 登录和提取数据。为了加快速度,我正在进行多处理——在 12 个内核之间平均分配负载。效果很好,但是每个核心仍然超过 1000 个节点 - 一次处理一个节点。
每个内核在执行此处理时的 CPU 利用率大约为 2%。这是有道理的,因为大部分时间只是等待在节点流输出时看到 Pexpect 期望值。为了尝试利用这一点并进一步加快速度,我想在每个内核的多处理中实现多线程。
为了避免共享变量出现任何问题,我将登录到一个节点所需的所有数据放入一个字典中(每个节点在字典中一个键),然后对字典进行切片,每个线程接收一个唯一的切片。然后在线程完成后,我将字典切片重新组合在一起。
但是,我仍然看到一个线程 完全 在移动到下一个线程之前完成。
我想知道什么构成了空闲状态,这样一个核心就可以移动到另一个线程上工作?它一直在寻找 Pexpect 期望值这一事实是否意味着它永远不会闲置?
此外,因为我对每个线程使用相同的目标函数。我不确定每个线程的目标函数是否相同(该函数的本地变量相同)是否会影响这个?
下面是我的多线程代码,供参考
感谢您的任何见解!
import threading
import <lots of other stuff>
class ThreadClass(threading.Thread):
def __init__(self, outputs_dict_split):
super(ThreadClass, self).__init__()
self.outputs_dict_split = outputs_dict_split
def run(self):
outputs_dict_split = get_output(self.outputs_dict_split)
return outputs_dict_split
def get_output(outputs_dict):
### PEXPECT STUFF TO LOGIN AND RUN COMMANDS ####
### WRITE DEVICE'S OUTPUTS TO DEVICE'S OUTPUTS_DICT RESULTS SUB-KEY ###
def backbone(outputs_dict):
filterbykey = lambda keys: {x: outputs_dict[x] for x in keys}
num_threads = 2
device_split = np.array_split(list(outputs_dict.keys()), num_threads)
outputs_dict_split_list = []
split_list1 = list(device_split[0])
split_list2 = list(device_split[1])
outputs_dict_split1 = filterbykey(split_list1)
outputs_dict_split2 = filterbykey(split_list2)
t1 = ThreadClass(outputs_dict_split1)
t2 = ThreadClass(outputs_dict_split2)
t1.start()
t2.start()
t1.join()
t2.join()
outputs_dict_split1 = t1.outputs_dict_split
outputs_dict_split2 = t2.outputs_dict_split
outputs_dict_split_list.append(outputs_dict_split1)
outputs_dict_split_list.append(outputs_dict_split2)
outputs_dict = ChainMap(*outputs_dict_split_list)
### Downstream Processing ###
这确实奏效了。但是,我必须扩展正在处理的设备数量才能看到整体处理时间的显着改进。