如何有效地找到从每个值到下一个 lower/higher 值的距离?

How can I efficiently find distances from each value to the next lower/higher value?

我会告诉你我使用的是什么结构,请随时推荐任何更改,如 numpy 数组或其他内容。

无论如何,我拥有的是与股票价格相对应的 500 万个连续条目的列表。

然后我还有 2 个列表,每个列表的长度都相同 - 500 万个条目。这些列表对应于预期的“上限”和预期的“下限”,我希望股票从序列中的那个点达到。

我想做的是遍历下限列表中的所有 500 万个条目,并记录序列中价格最终达到该下限所需的时间。然后我想对上限列表做同样的事情。

以下是仅包含 10 个条目的股票价格列表的潜在解决方案示例:

prices =       [15,16,18,22,23,17,15,19,15,18]
upper_limits = [17,18,21,23,25,22,18,21,18,20]
lower_limits = [14,15,16,18,19,15,13,17,14,16]


solved_upper = [2,1,1,1,x,x,1,x,1,x]
solved_lower = [x,5,4,2,1,1,x,1,x,x]

#I think I got this right?  Anyways as you can see, the solved lists simply show
#how many entries we have to look at until we find a value that is >= to it for upper, or <= to it
#for lower

那么问题来了,对于海量条目,如何合理快速的解决这个问题呢? (实际上,我有 10 个上限列表和 10 个下限列表.. 所以需要更高的效率)

我要澄清效率。用真实的数据对象替换 Dictionary 对象可能是个好主意。

首先我们需要将您的时间序列变成可搜索的树。

def make_tree (series, i=None, j=None):
    if i is None:
        i = 0
    if j is None:
        j = len(series) - 1

    if i == j:
        return {
            "min_i": i,
            "max_i": i,
            "min_value": series[i],
            "max_value": series[i],
            "left": None,
            "right": None
        }
    else:
        mid = (i + j) // 2
        left = make_tree(series, i, mid)
        right = make_tree(series, mid+1, j)
        return {
            "min_i": i,
            "max_i": j,
            "min_value": min(left['min_value'], right['min_value']),
            "max_value": max(left['max_value'], right['max_value']),
            "left": left,
            "right": right
        }

接下来我们需要函数来搜索那棵树:

def find_next_after_at_least(tree, min_i, min_value):
    if tree['max_i'] <= min_i or tree['max_value'] < min_value:
        return None
    elif tree['min_i'] == tree['max_i']:
        return tree['min_i'] - min_i
    else:
        answer = find_next_after_at_least(tree['left'], min_i, min_value)
        if answer is None:
            answer = find_next_after_at_least(tree['right'], min_i, min_value)
        return answer

def find_next_after_at_most(tree, min_i, max_value):
    if tree['max_i'] <= min_i or max_value < tree['min_value']:
        return None
    elif tree['min_i'] == tree['max_i']:
        return tree['min_i'] - min_i
    else:
        answer = find_next_after_at_most(tree['left'], min_i, max_value)
        if answer is None:
            answer = find_next_after_at_most(tree['right'], min_i, max_value)
        return answer

现在您可以轻松编写搜索:

def solve_upper(tree, limits):
    return [
        find_next_after_at_least(tree, i, limits[i])
            for i in range(len(limits))
    ]

def solve_lower(tree, limits):
    return [
        find_next_after_at_most(tree, i, limits[i])
            for i in range(len(limits))
    ]

现在你的示例问题:

t = make_tree([15,16,18,22,23,17,15,19,15,18])
print(solve_upper(t, [17,18,21,23,25,22,18,21,18,20]))
print(solve_lower(t, [14,15,16,18,19,15,13,17,14,16]))

您可以使用类似于所谓的“单调队列”的数据结构有效地解决这个问题(在 O(N log N) 时间内)。你可以 google 那个,但通常的用例与你的有很大不同,所以我只是解释一下。 (奇怪的是,这是我一周内在这里看到的第三个问题,答案需要这样的结构。)

在您的例子中,您将从价格数组的末尾开始工作,将每个价格添加到单调队列的前面。每次你输入一个价格,一些其他的可能会被丢弃,所以队列只保留比之前所有的都大的项目。这些是唯一可能成为 'next higher price' 的项目。它们在队列中也是单调递增的,因此您可以使用二分查找找到第一个 >= 目标。由于您需要知道下一个更高值的索引,因此可以存储索引而不是值本身。

那就解决了上限问题。下限类似,但队列单调递减。

在python中,它看起来像这样:

def solve_upper(prices, limits):
    solved = [0]*len(prices)
    q = [0]*len(prices)
    qstart = len(q)
    for i in range(len(prices)-1, -1, -1):
        price = prices[i]
        while qstart < len(q) and prices[q[qstart]] <= price:
            # the price at the start of q needs to be discarded, since
            # it isn't greater than the new one
            qstart += 1
        # prepend the new price
        qstart -= 1
        q[qstart] = i
        limit = limits[i]

        # binary search to find the first price >= limit
        minpos = qstart
        maxpos = len(q)
        while minpos < maxpos:
            testpos = minpos + (maxpos - minpos)//2
            if prices[q[testpos]] < limit:
                # too low
                minpos = testpos+1
            else:
                # high enough
                maxpos = testpos
        if minpos < len(q):
            solved[i] = q[minpos]-i
        else:
            solved[i] = None
    return solved

def solve_lower(prices, limits):
    solved = [0]*len(prices)
    q = [0]*len(prices)
    qstart = len(q)
    for i in range(len(prices)-1, -1, -1):
        price = prices[i]
        while qstart < len(q) and prices[q[qstart]] >= price:
            # the price at the start of q needs to be discarded, since
            # it isn't less than the new one
            qstart += 1
        # prepend the new price
        qstart -= 1
        q[qstart] = i
        limit = limits[i]

        # binary search to find the first price <= limit
        minpos = qstart
        maxpos = len(q)
        while minpos < maxpos:
            testpos = minpos + (maxpos - minpos)//2
            if prices[q[testpos]] > limit:
                # too low
                minpos = testpos+1
            else:
                # high enough
                maxpos = testpos
        if minpos < len(q):
            solved[i] = q[minpos]-i
        else:
            solved[i] = None
    return solved

prices =       [15,16,18,22,23,17,15,19,15,18]
upper_limits = [17,18,21,23,25,22,18,21,18,20]
lower_limits = [14,15,16,18,19,15,13,17,14,16]
print(solve_upper(prices, upper_limits))
print(solve_lower(prices, lower_limits))

输出:

[2, 1, 1, 1, None, None, 1, None, 1, None]
[None, 5, 4, 2, 1, 1, None, 1, None, None]

注意:如果您将此答案与@btilly 的进行对比,请在评论中包含结果!