使用共享通道调度对多个设备的定期请求

Scheduling periodic requests to multiple devices using a shared channel

我需要以可配置的时间间隔(每个设备)定期从可配置数量的设备请求数据。所有设备都连接到共享数据总线,因此同一时间只能有一个设备发送数据。

设备的内存非常小,所以每个设备只能将数据保存一段时间,然后才会被下一个块覆盖。这意味着我需要确保在任何给定设备仍然可用时从它请求数据,否则它将会丢失。

我正在寻找一种算法,在给定设备列表及其各自的时序属性的情况下,找到可行的时间表以实现最小的数据丢失。

我想每个设备都可以使用以下属性进行正式描述:

data_interval:下一个数据块可用的时间

max_request_interval: 不会导致数据丢失的请求之间的最大时间量

processing_time:发送请求并完全接收到包含请求数据的相应响应所花费的时间

基本上,我需要确保在数据准备就绪且尚未过期时从每个设备请求数据,同时牢记所有其他设备的截止日期。

有没有解决这类问题的算法?我非常怀疑我是第一个遇到这种情况的人。在网上搜索现有的解决方案并没有得到很多有用的结果,主要是因为调度算法主要用于操作系统等,调度进程可以随意暂停和恢复。然而,在我的情况下我不能这样做,因为请求和接收数据块的过程是原子的,即它只能完整地执行或根本不执行。

我使用 non-preemptive 截止单调调度解决了这个问题。

这里有一些 python 代码供感兴趣的人使用:

"""This module implements non-preemptive deadline monotonic scheduling (NPDMS) to compute a schedule of periodic,
non-preemptable requests to slave devices connected to a shared data bus"""

from math import gcd
from functools import reduce
from typing import List


class Slave:

    def __init__(self, name: str, period: int, processing_time: int, offset=0, deadline=None):
        self.name = name
        self.period = int(period)
        self.processing_time = int(processing_time)
        self.offset = int(offset)
        if self.offset >= self.period:
            raise ValueError("Slave %s: offset must be < period" % name)
        self.deadline = int(deadline) if deadline else self.period
        if self.deadline > self.period:
            raise ValueError("Slave %s: deadline must be <= period" % name)


class Request:

    def __init__(self, slave: Slave, start_time: int):
        self.slave = slave
        self.start_time = start_time
        self.end_time = start_time + slave.processing_time
        self.duration = self.end_time - self.start_time

    def overlaps_with(self, other: 'Request'):
        min_duration = self.duration + other.duration
        start = min(other.start_time, self.start_time)
        end = max(other.end_time, self.end_time)
        effective_duration = end - start
        return effective_duration < min_duration


class Scenario:

    def __init__(self, *slaves: Slave):
        self.slaves = list(slaves)
        self.slaves.sort(key=lambda slave: slave.deadline)
        # LCM of all slave periods
        self.cycle_period = reduce(lambda a, b: a * b // gcd(a, b), [slave.period for slave in slaves])

    def compute_schedule(self, resolution=1) -> 'Schedule':
        request_pool = []
        for t in range(0, self.cycle_period, resolution):
            for slave in self.slaves:
                if (t - slave.offset) % slave.period == 0 and t >= slave.offset:
                    request_pool.append(Request(slave, t))
        request_pool.reverse()

        scheduled_requests = []
        current_request = request_pool.pop()
        t = current_request.start_time
        while t < self.cycle_period:
            ongoing_request = Request(current_request.slave, t)
            while ongoing_request.start_time <= t < ongoing_request.end_time:
                t += resolution
            scheduled_requests.append(ongoing_request)
            if len(request_pool):
                current_request = request_pool.pop()
                t = max(current_request.start_time, t)
            else:
                current_request = None
                break

        if current_request:
            request_pool.append(current_request)

        return Schedule(self, scheduled_requests, request_pool)


class Schedule:

    def __init__(self, scenario: Scenario, requests: List[Request], unscheduled: List[Request] = None):
        self.scenario = scenario
        self.requests = requests
        self.unscheduled_requests = unscheduled if unscheduled else []

        self._utilization = 0
        for slave in self.scenario.slaves:
            self._utilization += float(slave.processing_time) / float(slave.period)

        self._missed_deadlines_dict = {}
        for slave in self.scenario.slaves:
            periods = scenario.cycle_period // slave.period
            missed_deadlines = []
            for period in range(periods):
                start = period * slave.period
                end = start + slave.period
                request = self._find_request(slave, start, end)
                if request:
                    if request.start_time < (start + slave.offset) or request.end_time > start + slave.deadline:
                        missed_deadlines.append(request)
            if missed_deadlines:
                self._missed_deadlines_dict[slave] = missed_deadlines

        self._overlapping_requests = []
        for i in range(0, len(requests)):
            if i == 0:
                continue
            previous_request = requests[i - 1]
            current_request = requests[i]
            if current_request.overlaps_with(previous_request):
                self._overlapping_requests.append((current_request, previous_request))

        self._incomplete_requests = []
        for request in self.requests:
            if request.duration < request.slave.processing_time:
                self._incomplete_requests.append(request)

    @property
    def is_feasible(self) -> bool:
        return self.utilization <= 1 \
               and not self.has_missed_deadlines \
               and not self.has_overlapping_requests \
               and not self.has_unscheduled_requests \
               and not self.has_incomplete_requests

    @property
    def utilization(self) -> float:
        return self._utilization

    @property
    def has_missed_deadlines(self) -> bool:
        return len(self._missed_deadlines_dict) > 0

    @property
    def has_overlapping_requests(self) -> bool:
        return len(self._overlapping_requests) > 0

    @property
    def has_unscheduled_requests(self) -> bool:
        return len(self.unscheduled_requests) > 0

    @property
    def has_incomplete_requests(self) -> bool:
        return len(self._incomplete_requests) > 0

    def _find_request(self, slave, start, end) -> [Request, None]:
        for r in self.requests:
            if r.slave == slave and r.start_time >= start and r.end_time < end:
                return r
        return None


def read_scenario(file) -> Scenario:
    from csv import DictReader
    return Scenario(*[Slave(**row) for row in DictReader(file)])


def write_schedule(schedule: Schedule, file):
    from csv import DictWriter
    writer = DictWriter(file, fieldnames=["name", "start", "end"])
    writer.writeheader()
    for request in schedule.requests:
        writer.writerow({"name": request.slave.name, "start": request.start_time, "end": request.end_time})


if __name__ == '__main__':
    import argparse
    import sys

    parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter,
                                     description='Use non-preemptive deadline monotonic scheduling (NPDMS) to\n'
                                                 'compute a schedule of periodic, non-preemptable requests to\n'
                                                 'slave devices connected to a shared data bus.\n\n'
                                                 'Prints the computed schedule to stdout as CSV. Returns with\n'
                                                 'exit code 0 if the schedule is feasible, else 1.')
    parser.add_argument("csv_file", metavar="SCENARIO", type=str,
                        help="A csv file describing the scenario, i.e. a list\n"
                             "of slave devices with the following properties:\n"
                             "* name:            name/id of the slave device\n\n"
                             "* period:          duration of the period of time during\n"
                             "                   which requests must be dispatched\n\n"
                             "* processing_time: amount of time it takes to\n"
                             "                   fully process a request (worst-case)\n\n"
                             "* offset:          offset for initial phase-shifting\n"
                             "                   (default: 0)\n\n"
                             "* deadline:        amount of time during which data is\n"
                             "                   available after the start of each period\n"
                             "                   (default: <period>)")

    parser.add_argument("-r", "--resolution", type=int, default=1,
                        help="The resolution used to simulate the passage of time (default: 1)")

    args = parser.parse_args()

    with open(args.csv_file, 'r') as f:
        schedule = read_scenario(f).compute_schedule(args.resolution)
        write_schedule(schedule, sys.stdout)
        exit(0 if schedule.is_feasible else 1)