数组创建太慢

Array creation too slow

我正在尝试从头开始创建一个图像数组。 我得到了代码 运行ning 但它需要大约 30 秒才能 运行 它。 我觉得使用 numpy 本机函数可能会更快。 我该怎么做?

import cv2
import numpy as np
import time

volumes = np.random.randint(low=0, high=200, size=10000)
print(volumes)

image_heigh = 128
image_width = 256
image_channel = 3

show_img = False


def nomralized(data, data_min, data_max, maximum_value):

    nomamized_data = maximum_value * ((data - data_min) / (data_max - data_min))

    return nomamized_data

start_time = time.time()

for ii in range(len(volumes)-image_width):
    # ===================== part to optimize start
    final_image = np.zeros((image_heigh, image_width, image_channel))

    start = ii
    end = ii + image_width

    current_vols = volumes[start:end]

    # nomalize data
    vol_min = 0
    vol_max = np.max(current_vols)

    vol_norm = nomralized(data=current_vols,
                      data_min=vol_min,
                      data_max=vol_max,
                      maximum_value=image_heigh)

    for xxx in range(image_width):
        final_image[:int(vol_norm[xxx]), xxx, :] = 1

    # ===================== part to optimize end

    if show_img:
        image = np.float32(final_image)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        cv2.imshow("ok", image)
        cv2.waitKey(27)

print("total running time: ", (time.time() - start_time))

如何才能更快地创建此图像阵列? 我需要在每个时间步都创建图像,因为我想模拟每个新时间步出现的真实实时数据流。

这就是为什么我只想优化这部分代码的原因:

for xxx in range(image_width):
    final_image[:int(vol_norm[xxx]), xxx, :] = 1

我该怎么做?

接下来是第一个最简单的优化:

  1. 使用比较值 np.arange(...) 而不是内部循环。
  2. 使用灰度图像而不是 3 通道 RGB。要处理的数据减少 3 倍。
  3. 使用 np.uint8 类型而不是 np.float32,这样处理速度更快,并且不需要转换为 float32 来进行 CV2 可视化。

以上所有这些优化都提供了巨大的加速(10x 倍),我的 运行ning 时间是 2.6 sec 而不是之前的 27 sec

我没有做的另一个非常有用的优化是,在当前 window 中的整个数据的 max/min 没有的情况下,您不需要重新计算以前的图像像素改变。只有在 max/min 改变的情况下,您才需要重新计算以前的图像数据。我预计您的 real-life 数据会像外汇或比特币价格一样逐渐变化,因此 max/min 在 window 内的变化非常 non-often.

上面提到的优化1)-3)在下一个代码中实现:

import cv2
import numpy as np
import time

volumes = np.random.randint(low=0, high=200, size=10000)
print(volumes)

image_heigh = 128
image_width = 256
image_channel = 3

show_img = False

def nomralized(data, data_min, data_max, maximum_value):

    nomamized_data = maximum_value * ((data - data_min) / (data_max - data_min))

    return nomamized_data

start_time = time.time()

aranges = np.arange(image_heigh, dtype = np.int32)[:, None]

for ii in range(len(volumes)-image_width):
    # ===================== part to optimize start
    #final_image = np.zeros((image_heigh, image_width, image_channel), dtype = np.float32)

    start = ii
    end = ii + image_width

    current_vols = volumes[start:end]

    # nomalize data
    vol_min = 0
    vol_max = np.max(current_vols)

    vol_norm = nomralized(data=current_vols,
                      data_min=vol_min,
                      data_max=vol_max,
                      maximum_value=image_heigh)

    final_image = (aranges < vol_norm[None, :].astype(np.int32)).astype(np.uint8) * 255

    # ===================== part to optimize end

    if show_img:
        cv2.imshow('ok', final_image)
        cv2.waitKey(27)

print("total running time: ", (time.time() - start_time))

对于上面的代码,我只是对内部循环进行了一次优化,其中 speed-up 的代码甚至超过了 2x 倍,以获得 1.3 sec 的计时。但我也放回了 3 个通道加上 float32,这降低了速度导致最终 2.8 sechere is the code

如果不需要 re-computing 旧图像数据,则可以进行下一次优化。

要优化的主要事情是你 re-computing 在每个步骤上几乎是相同的整个图像,沿宽度有 1 个像素 shift-step。取而代之的是,您需要计算整个图像一次,然后向右移动不是 1 个像素而是整个图像宽度。

那么经过这次优化运行宁时间是0.08 sec.

并且只为了显示动画做1像素步进,而不是为了计算图像数据,如果你需要速度,图像数据应该只计算一次。

import cv2
import numpy as np
import time

volumes = np.random.randint(low=0, high=200, size=10000)
print(volumes)

image_heigh = 128
image_width = volumes.size #256
image_channel = 3
screen_width = 256

show_img = False


def nomralized(data, data_min, data_max, maximum_value):

    nomamized_data = maximum_value * ((data - data_min) / (data_max - data_min))

    return nomamized_data

start_time = time.time()

for ii in range(0, len(volumes), image_width):
    # ===================== part to optimize start
    final_image = np.zeros((image_heigh, image_width, image_channel))

    start = ii
    end = ii + image_width

    current_vols = volumes[start:end]

    # nomalize data
    vol_min = 0
    vol_max = np.max(current_vols)

    vol_norm = nomralized(data=current_vols,
                      data_min=vol_min,
                      data_max=vol_max,
                      maximum_value=image_heigh)

    for xxx in range(image_width):
        final_image[:int(vol_norm[xxx]), xxx, :] = 1

    # ===================== part to optimize end

    if show_img:
        for start in range(0, final_image.shape[1] - screen_width):
            image = np.float32(final_image[:, start : start + screen_width])
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            cv2.imshow("ok", image)
            cv2.waitKey(27)

print("total running time: ", (time.time() - start_time))

我还根据您的数据创建了动画图像:

如果您想创建相同的动画,只需将下一段代码附加到上面脚本的末尾即可:

# Needs: python -m pip install pillow
import PIL.Image
imgs = [PIL.Image.fromarray(final_image[:, start : start + screen_width].astype(np.uint8) * 255) for start in range(0, final_image.shape[1] - screen_width, 6)]
imgs[0].save('result.png', append_images = imgs[1:], save_all = True, lossless = True, duration = 100)

我还实现了 real-time 实时流数据的模拟 rendering/visualizing。

  1. live_stream()生成器在随机时间点吐出随机数量的数据,这是为了模拟数据生成过程。
  2. stream_fetcher() 监听实时流并将接收到的所有数据记录到 python 队列 q0,这个 fetcher 在一个线程中 运行。
  3. renderer()获取fetcher记录的数据,通过你的数学公式和归一化过程渲染成图像,它渲染尽可能多的数据,导致图像具有不同的宽度,渲染图像被保存到另一个队列q1.
  4. visualizer() 通过获取尽可能多的可用渲染图像来可视化渲染数据。

所有函数 运行 在单独的线程中不阻塞整个进程。此外,如果任何线程工作速度变慢,那么它会使用当前 real-time 数据将一些数据跳过到 catch-up,因此每个队列都不会溢出。

另外你可能会看到可视化过程是跳跃的,这不是因为功能有点慢,而是因为实时流在每个时间步长吐出不同数量的数据,这通常是 real-time 数据可能行为举止。

在接下来的代码中我也做了前面提到的额外优化,即 not-recomputing image if min/max didn't change.

import cv2, numpy as np
import time, random, threading, queue

image_height = 256
image_width = 512

# Make results reproducible and deterministic
np.random.seed(0)
random.seed(0)

def live_stream():
    last = 0.
    while True:
        a = np.random.uniform(low = -1., high = 1., size = random.randint(1, 20)).astype(np.float64).cumsum() + last
        yield a
        last = a[-1]
        time.sleep(random.random() * 0.1)

q0 = queue.Queue()
def stream_fetcher():
    for e in live_stream():
        q0.put(e)

threading.Thread(target = stream_fetcher, daemon = True).start()

aranges = np.arange(image_height, dtype = np.int32)[:, None]

q1 = queue.Queue()
def renderer():
    def normalized(data, data_min, data_max, maximum_value):
        nomamized_data = maximum_value * ((data - data_min) / (data_max - data_min))
        return nomamized_data

    prev_image = np.zeros((image_height, 0), dtype = np.uint8)
    prev_vols = np.zeros((0,), dtype = np.float64)
        
    while True:        
        data = []
        data.append(q0.get())
        try:
            while True:
                data.append(q0.get(block = False))
        except queue.Empty:
            pass
                
        vols = np.concatenate(data)[-image_width:]
        prev_vols = prev_vols[-(image_width - vols.size) or prev_vols.size:]
        concat_vols = np.concatenate((prev_vols, vols))[-image_width:]
        vols_min, vols_max = np.amin(concat_vols), np.amax(concat_vols)
        if prev_vols.size > 0 and (vols_min < np.amin(prev_vols) - 10 ** -8 or vols_max > np.amax(prev_vols) + 10 ** -8):
            vols = concat_vols
            prev_image = prev_image[:, :-prev_vols.size]
            prev_vols = prev_vols[:0]

        vols_norm = normalized(
            data = vols, data_min = vols_min,
            data_max = vols_max, maximum_value = image_height,
        )
        
        image = (aranges < vols_norm.astype(np.int32)[None, :]).astype(np.uint8) * 255
        whole_image = np.concatenate((prev_image, image), axis = 1)[:, -image_width:]
        
        q1.put(whole_image)
        
        prev_image = whole_image
        prev_vols = concat_vols

threading.Thread(target = renderer, daemon = True).start()


def visualizer():
    imgs = []
    
    while True:
        data = []
        data.append(q1.get())
        try:
            while True:
                data.append(q1.get(block = False))
        except queue.Empty:
            pass
        image = np.concatenate(data, axis = 1)[:, -image_width:]
        cv2.imshow('ok', image)
        cv2.waitKey(1)

        if imgs is not None:
            try:
                # Needs: python -m pip install pillow
                import PIL.Image
                has_pil = True
            except:
                has_pil = False
                imgs = None
            if has_pil:
                imgs.append(PIL.Image.fromarray(np.pad(image, ((0, 0), (image_width - image.shape[1], 0)), constant_values = 0)))

                if len(imgs) >= 1000:
                    print('saving...', flush = True)
                    imgs[0].save('result.png', append_images = imgs[1:], save_all = True, lossless = True, duration = 100)
                    imgs = None
                    print('saved!', flush = True)

threading.Thread(target = visualizer, daemon = True).start()

while True:
    time.sleep(0.1)

上面的实时过程模拟被渲染成 result.png,我在下面显示:

我还决定改进可视化,使用更高级的 matplotlib 而不是 cv2 来显示轴并进行 real-time 绘图。可视化图像如下:

接下来是一个matplotlib-based代码,对应上图最后一张:

import cv2, numpy as np
import time, random, threading, queue

image_height = 256
image_width = 512
save_nsec = 20
dpi, fps = 100, 15

# Make results reproducible and deterministic
np.random.seed(0)
random.seed(0)

def live_stream():
    last = 0.
    pos = 0
    while True:
        a = np.random.uniform(low = -1., high = 1., size = random.randint(1, 30)).astype(np.float64).cumsum() + last
        yield a, pos, pos + a.size - 1
        pos += a.size
        last = a[-1]
        time.sleep(random.random() * 2.2 / fps)

q0 = queue.Queue()
def stream_fetcher():
    for e in live_stream():
        q0.put(e)

threading.Thread(target = stream_fetcher, daemon = True).start()

aranges = np.arange(image_height, dtype = np.int32)[:, None]

q1 = queue.Queue()
def renderer():
    def normalized(data, data_min, data_max, maximum_value):
        nomamized_data = maximum_value * ((data - data_min) / (data_max - data_min))
        return nomamized_data

    prev_image = np.zeros((image_height, 0), dtype = np.uint8)
    prev_vols = np.zeros((0,), dtype = np.float64)
        
    while True:        
        data = []
        data.append(q0.get())
        try:
            while True:
                data.append(q0.get(block = False))
        except queue.Empty:
            pass
            
        data_vols = [e[0] for e in data]
        data_minx, data_maxx = data[0][1], data[-1][2]

        vols = np.concatenate(data_vols)[-image_width:]
        prev_vols = prev_vols[-(image_width - vols.size) or prev_vols.size:]
        concat_vols = np.concatenate((prev_vols, vols))[-image_width:]
        vols_min, vols_max = np.amin(concat_vols), np.amax(concat_vols)
        if prev_vols.size > 0 and (vols_min < np.amin(prev_vols) - 10 ** -8 or vols_max > np.amax(prev_vols) + 10 ** -8):
            vols = concat_vols
            prev_image = prev_image[:, :-prev_vols.size]
            prev_vols = prev_vols[:0]

        vols_norm = normalized(
            data = vols, data_min = vols_min,
            data_max = vols_max, maximum_value = image_height,
        )
        
        image = (aranges < vols_norm.astype(np.int32)[None, :]).astype(np.uint8) * 255
        whole_image = np.concatenate((prev_image, image), axis = 1)[:, -image_width:]
        
        q1.put((whole_image, data_maxx - whole_image.shape[1] + 1, data_maxx, vols_min, vols_max))
        
        prev_image = whole_image
        prev_vols = concat_vols

threading.Thread(target = renderer, daemon = True).start()


def visualizer():
    import matplotlib.pyplot as plt, matplotlib.animation
    
    def images():
        while True:
            data = []
            data.append(q1.get())
            try:
                while True:
                    data.append(q1.get(block = False))
            except queue.Empty:
                pass
            minx = min([e[1] for e in data])
            maxx = min([e[2] for e in data])
            miny = min([e[3] for e in data])
            maxy = min([e[4] for e in data])
            image = np.concatenate([e[0] for e in data], axis = 1)[:, -image_width:]
            image = np.pad(image, ((0, 0), (image_width - image.shape[1], 0)), constant_values = 0)
            image = np.repeat(image[:, :, None], 3, axis = -1)
            yield image, minx, maxx, miny, maxy
            
    it = images()
    im = None
    fig = plt.figure(figsize = (image_width / dpi, image_height / dpi), dpi = dpi)
            
    def animate_func(i):
        nonlocal it, im, fig
        image, minx, maxx, miny, maxy = next(it)
        print(f'.', end = '', flush = True)
        if im is None:
            im = plt.imshow(image, interpolation = 'none', aspect = 'auto')
        else:
            im.set_array(image)
        im.set_extent((minx, maxx, miny, maxy))
        return [im]
            
    anim = matplotlib.animation.FuncAnimation(fig, animate_func, frames = round(save_nsec * fps), interval = 1000 / fps)
    
    print('saving...', end = '', flush = True)
    #anim.save('result.mp4', fps = fps, dpi = dpi, extra_args = ['-vcodec', 'libx264'])
    anim.save('result.gif', fps = fps, dpi = dpi, writer = 'imagemagick')
    print('saved!', end = '', flush = True)
    
    plt.show()

threading.Thread(target = visualizer, daemon = True).start()

while True:
    time.sleep(0.1)

然后我决定玩一点,用RGB调色板给最后一张图上色,峰值越高越多red-ish,中间越多则越多green-ish,如果足够低则更多blue-ish。下面的结果图像是由 this coloring code:

实现的

下面还有一个彩色动画,line-style 而不是 bar-style,在 this code:

的帮助下