使用共享通道调度对多个设备的定期请求
Scheduling periodic requests to multiple devices using a shared channel
我需要以可配置的时间间隔(每个设备)定期从可配置数量的设备请求数据。所有设备都连接到共享数据总线,因此同一时间只能有一个设备发送数据。
设备的内存非常小,所以每个设备只能将数据保存一段时间,然后才会被下一个块覆盖。这意味着我需要确保在任何给定设备仍然可用时从它请求数据,否则它将会丢失。
我正在寻找一种算法,在给定设备列表及其各自的时序属性的情况下,找到可行的时间表以实现最小的数据丢失。
我想每个设备都可以使用以下属性进行正式描述:
data_interval
:下一个数据块可用的时间
max_request_interval
: 不会导致数据丢失的请求之间的最大时间量
processing_time
:发送请求并完全接收到包含请求数据的相应响应所花费的时间
基本上,我需要确保在数据准备就绪且尚未过期时从每个设备请求数据,同时牢记所有其他设备的截止日期。
有没有解决这类问题的算法?我非常怀疑我是第一个遇到这种情况的人。在网上搜索现有的解决方案并没有得到很多有用的结果,主要是因为调度算法主要用于操作系统等,调度进程可以随意暂停和恢复。然而,在我的情况下我不能这样做,因为请求和接收数据块的过程是原子的,即它只能完整地执行或根本不执行。
我使用 non-preemptive 截止单调调度解决了这个问题。
这里有一些 python 代码供感兴趣的人使用:
"""This module implements non-preemptive deadline monotonic scheduling (NPDMS) to compute a schedule of periodic,
non-preemptable requests to slave devices connected to a shared data bus"""
from math import gcd
from functools import reduce
from typing import List
class Slave:
def __init__(self, name: str, period: int, processing_time: int, offset=0, deadline=None):
self.name = name
self.period = int(period)
self.processing_time = int(processing_time)
self.offset = int(offset)
if self.offset >= self.period:
raise ValueError("Slave %s: offset must be < period" % name)
self.deadline = int(deadline) if deadline else self.period
if self.deadline > self.period:
raise ValueError("Slave %s: deadline must be <= period" % name)
class Request:
def __init__(self, slave: Slave, start_time: int):
self.slave = slave
self.start_time = start_time
self.end_time = start_time + slave.processing_time
self.duration = self.end_time - self.start_time
def overlaps_with(self, other: 'Request'):
min_duration = self.duration + other.duration
start = min(other.start_time, self.start_time)
end = max(other.end_time, self.end_time)
effective_duration = end - start
return effective_duration < min_duration
class Scenario:
def __init__(self, *slaves: Slave):
self.slaves = list(slaves)
self.slaves.sort(key=lambda slave: slave.deadline)
# LCM of all slave periods
self.cycle_period = reduce(lambda a, b: a * b // gcd(a, b), [slave.period for slave in slaves])
def compute_schedule(self, resolution=1) -> 'Schedule':
request_pool = []
for t in range(0, self.cycle_period, resolution):
for slave in self.slaves:
if (t - slave.offset) % slave.period == 0 and t >= slave.offset:
request_pool.append(Request(slave, t))
request_pool.reverse()
scheduled_requests = []
current_request = request_pool.pop()
t = current_request.start_time
while t < self.cycle_period:
ongoing_request = Request(current_request.slave, t)
while ongoing_request.start_time <= t < ongoing_request.end_time:
t += resolution
scheduled_requests.append(ongoing_request)
if len(request_pool):
current_request = request_pool.pop()
t = max(current_request.start_time, t)
else:
current_request = None
break
if current_request:
request_pool.append(current_request)
return Schedule(self, scheduled_requests, request_pool)
class Schedule:
def __init__(self, scenario: Scenario, requests: List[Request], unscheduled: List[Request] = None):
self.scenario = scenario
self.requests = requests
self.unscheduled_requests = unscheduled if unscheduled else []
self._utilization = 0
for slave in self.scenario.slaves:
self._utilization += float(slave.processing_time) / float(slave.period)
self._missed_deadlines_dict = {}
for slave in self.scenario.slaves:
periods = scenario.cycle_period // slave.period
missed_deadlines = []
for period in range(periods):
start = period * slave.period
end = start + slave.period
request = self._find_request(slave, start, end)
if request:
if request.start_time < (start + slave.offset) or request.end_time > start + slave.deadline:
missed_deadlines.append(request)
if missed_deadlines:
self._missed_deadlines_dict[slave] = missed_deadlines
self._overlapping_requests = []
for i in range(0, len(requests)):
if i == 0:
continue
previous_request = requests[i - 1]
current_request = requests[i]
if current_request.overlaps_with(previous_request):
self._overlapping_requests.append((current_request, previous_request))
self._incomplete_requests = []
for request in self.requests:
if request.duration < request.slave.processing_time:
self._incomplete_requests.append(request)
@property
def is_feasible(self) -> bool:
return self.utilization <= 1 \
and not self.has_missed_deadlines \
and not self.has_overlapping_requests \
and not self.has_unscheduled_requests \
and not self.has_incomplete_requests
@property
def utilization(self) -> float:
return self._utilization
@property
def has_missed_deadlines(self) -> bool:
return len(self._missed_deadlines_dict) > 0
@property
def has_overlapping_requests(self) -> bool:
return len(self._overlapping_requests) > 0
@property
def has_unscheduled_requests(self) -> bool:
return len(self.unscheduled_requests) > 0
@property
def has_incomplete_requests(self) -> bool:
return len(self._incomplete_requests) > 0
def _find_request(self, slave, start, end) -> [Request, None]:
for r in self.requests:
if r.slave == slave and r.start_time >= start and r.end_time < end:
return r
return None
def read_scenario(file) -> Scenario:
from csv import DictReader
return Scenario(*[Slave(**row) for row in DictReader(file)])
def write_schedule(schedule: Schedule, file):
from csv import DictWriter
writer = DictWriter(file, fieldnames=["name", "start", "end"])
writer.writeheader()
for request in schedule.requests:
writer.writerow({"name": request.slave.name, "start": request.start_time, "end": request.end_time})
if __name__ == '__main__':
import argparse
import sys
parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter,
description='Use non-preemptive deadline monotonic scheduling (NPDMS) to\n'
'compute a schedule of periodic, non-preemptable requests to\n'
'slave devices connected to a shared data bus.\n\n'
'Prints the computed schedule to stdout as CSV. Returns with\n'
'exit code 0 if the schedule is feasible, else 1.')
parser.add_argument("csv_file", metavar="SCENARIO", type=str,
help="A csv file describing the scenario, i.e. a list\n"
"of slave devices with the following properties:\n"
"* name: name/id of the slave device\n\n"
"* period: duration of the period of time during\n"
" which requests must be dispatched\n\n"
"* processing_time: amount of time it takes to\n"
" fully process a request (worst-case)\n\n"
"* offset: offset for initial phase-shifting\n"
" (default: 0)\n\n"
"* deadline: amount of time during which data is\n"
" available after the start of each period\n"
" (default: <period>)")
parser.add_argument("-r", "--resolution", type=int, default=1,
help="The resolution used to simulate the passage of time (default: 1)")
args = parser.parse_args()
with open(args.csv_file, 'r') as f:
schedule = read_scenario(f).compute_schedule(args.resolution)
write_schedule(schedule, sys.stdout)
exit(0 if schedule.is_feasible else 1)
我需要以可配置的时间间隔(每个设备)定期从可配置数量的设备请求数据。所有设备都连接到共享数据总线,因此同一时间只能有一个设备发送数据。
设备的内存非常小,所以每个设备只能将数据保存一段时间,然后才会被下一个块覆盖。这意味着我需要确保在任何给定设备仍然可用时从它请求数据,否则它将会丢失。
我正在寻找一种算法,在给定设备列表及其各自的时序属性的情况下,找到可行的时间表以实现最小的数据丢失。
我想每个设备都可以使用以下属性进行正式描述:
data_interval
:下一个数据块可用的时间
max_request_interval
: 不会导致数据丢失的请求之间的最大时间量
processing_time
:发送请求并完全接收到包含请求数据的相应响应所花费的时间
基本上,我需要确保在数据准备就绪且尚未过期时从每个设备请求数据,同时牢记所有其他设备的截止日期。
有没有解决这类问题的算法?我非常怀疑我是第一个遇到这种情况的人。在网上搜索现有的解决方案并没有得到很多有用的结果,主要是因为调度算法主要用于操作系统等,调度进程可以随意暂停和恢复。然而,在我的情况下我不能这样做,因为请求和接收数据块的过程是原子的,即它只能完整地执行或根本不执行。
我使用 non-preemptive 截止单调调度解决了这个问题。
这里有一些 python 代码供感兴趣的人使用:
"""This module implements non-preemptive deadline monotonic scheduling (NPDMS) to compute a schedule of periodic,
non-preemptable requests to slave devices connected to a shared data bus"""
from math import gcd
from functools import reduce
from typing import List
class Slave:
def __init__(self, name: str, period: int, processing_time: int, offset=0, deadline=None):
self.name = name
self.period = int(period)
self.processing_time = int(processing_time)
self.offset = int(offset)
if self.offset >= self.period:
raise ValueError("Slave %s: offset must be < period" % name)
self.deadline = int(deadline) if deadline else self.period
if self.deadline > self.period:
raise ValueError("Slave %s: deadline must be <= period" % name)
class Request:
def __init__(self, slave: Slave, start_time: int):
self.slave = slave
self.start_time = start_time
self.end_time = start_time + slave.processing_time
self.duration = self.end_time - self.start_time
def overlaps_with(self, other: 'Request'):
min_duration = self.duration + other.duration
start = min(other.start_time, self.start_time)
end = max(other.end_time, self.end_time)
effective_duration = end - start
return effective_duration < min_duration
class Scenario:
def __init__(self, *slaves: Slave):
self.slaves = list(slaves)
self.slaves.sort(key=lambda slave: slave.deadline)
# LCM of all slave periods
self.cycle_period = reduce(lambda a, b: a * b // gcd(a, b), [slave.period for slave in slaves])
def compute_schedule(self, resolution=1) -> 'Schedule':
request_pool = []
for t in range(0, self.cycle_period, resolution):
for slave in self.slaves:
if (t - slave.offset) % slave.period == 0 and t >= slave.offset:
request_pool.append(Request(slave, t))
request_pool.reverse()
scheduled_requests = []
current_request = request_pool.pop()
t = current_request.start_time
while t < self.cycle_period:
ongoing_request = Request(current_request.slave, t)
while ongoing_request.start_time <= t < ongoing_request.end_time:
t += resolution
scheduled_requests.append(ongoing_request)
if len(request_pool):
current_request = request_pool.pop()
t = max(current_request.start_time, t)
else:
current_request = None
break
if current_request:
request_pool.append(current_request)
return Schedule(self, scheduled_requests, request_pool)
class Schedule:
def __init__(self, scenario: Scenario, requests: List[Request], unscheduled: List[Request] = None):
self.scenario = scenario
self.requests = requests
self.unscheduled_requests = unscheduled if unscheduled else []
self._utilization = 0
for slave in self.scenario.slaves:
self._utilization += float(slave.processing_time) / float(slave.period)
self._missed_deadlines_dict = {}
for slave in self.scenario.slaves:
periods = scenario.cycle_period // slave.period
missed_deadlines = []
for period in range(periods):
start = period * slave.period
end = start + slave.period
request = self._find_request(slave, start, end)
if request:
if request.start_time < (start + slave.offset) or request.end_time > start + slave.deadline:
missed_deadlines.append(request)
if missed_deadlines:
self._missed_deadlines_dict[slave] = missed_deadlines
self._overlapping_requests = []
for i in range(0, len(requests)):
if i == 0:
continue
previous_request = requests[i - 1]
current_request = requests[i]
if current_request.overlaps_with(previous_request):
self._overlapping_requests.append((current_request, previous_request))
self._incomplete_requests = []
for request in self.requests:
if request.duration < request.slave.processing_time:
self._incomplete_requests.append(request)
@property
def is_feasible(self) -> bool:
return self.utilization <= 1 \
and not self.has_missed_deadlines \
and not self.has_overlapping_requests \
and not self.has_unscheduled_requests \
and not self.has_incomplete_requests
@property
def utilization(self) -> float:
return self._utilization
@property
def has_missed_deadlines(self) -> bool:
return len(self._missed_deadlines_dict) > 0
@property
def has_overlapping_requests(self) -> bool:
return len(self._overlapping_requests) > 0
@property
def has_unscheduled_requests(self) -> bool:
return len(self.unscheduled_requests) > 0
@property
def has_incomplete_requests(self) -> bool:
return len(self._incomplete_requests) > 0
def _find_request(self, slave, start, end) -> [Request, None]:
for r in self.requests:
if r.slave == slave and r.start_time >= start and r.end_time < end:
return r
return None
def read_scenario(file) -> Scenario:
from csv import DictReader
return Scenario(*[Slave(**row) for row in DictReader(file)])
def write_schedule(schedule: Schedule, file):
from csv import DictWriter
writer = DictWriter(file, fieldnames=["name", "start", "end"])
writer.writeheader()
for request in schedule.requests:
writer.writerow({"name": request.slave.name, "start": request.start_time, "end": request.end_time})
if __name__ == '__main__':
import argparse
import sys
parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter,
description='Use non-preemptive deadline monotonic scheduling (NPDMS) to\n'
'compute a schedule of periodic, non-preemptable requests to\n'
'slave devices connected to a shared data bus.\n\n'
'Prints the computed schedule to stdout as CSV. Returns with\n'
'exit code 0 if the schedule is feasible, else 1.')
parser.add_argument("csv_file", metavar="SCENARIO", type=str,
help="A csv file describing the scenario, i.e. a list\n"
"of slave devices with the following properties:\n"
"* name: name/id of the slave device\n\n"
"* period: duration of the period of time during\n"
" which requests must be dispatched\n\n"
"* processing_time: amount of time it takes to\n"
" fully process a request (worst-case)\n\n"
"* offset: offset for initial phase-shifting\n"
" (default: 0)\n\n"
"* deadline: amount of time during which data is\n"
" available after the start of each period\n"
" (default: <period>)")
parser.add_argument("-r", "--resolution", type=int, default=1,
help="The resolution used to simulate the passage of time (default: 1)")
args = parser.parse_args()
with open(args.csv_file, 'r') as f:
schedule = read_scenario(f).compute_schedule(args.resolution)
write_schedule(schedule, sys.stdout)
exit(0 if schedule.is_feasible else 1)