使用动态负载调度的并行图像卷积滤波器:伪影

Parallel image convolution filter using dynamic load scheduling: artifacts

我遇到以下问题:我使用动态负载调度方法基于 this paper 创建了一个并行化的 prewitt 过滤器。不幸的是,我遇到了我的序列化过滤器未显示的工件,这些工件出现在看似随机的位置,这对我来说意味着我在线程中存在同步问题。但是我不知道它在哪里。我有一个非常相似的灰度滤镜,它...直到现在没有显示出类似的问题,但现在却出现了。


左图是使用顺序算法实现的预期结果,右图在底部显示了上述工件。
通过进一步测试,我现在相当确定我的线程会跳过图像的某些部分而不过滤它们。我会继续调查。

我的代码结构如下:ParallelPrewittFilter 继承自 ParallelSobelianFilter 并且只实现了一个工厂方法来创建正确的 worker 类型,即下面的一个(class 实现了 runnable 接口)。 PrewittFilterWorker 继承自 SobelianFilterWorker(后者又继承自 ParallelFilterWorker)并且只实现了 returns 卷积核的一个方法。因此,我现在 post 来自 ParallelSobelianFilter 和 SobelianFilter worker 的相关代码。最后一个代码块是负载调度代码。

并行索贝尔滤波器:

public BufferedImage applyFilter(BufferedImage image) {
  //taking the red and alpha channels from image and placing them
  //in the arrays red[width*height] and alpha[width*height]

  ParallelFilterWorker.resetDynamicLoadCounter();
  for (SobelianFilterWorker worker : workers) {
    worker.setSourceArrays(width, height, alpha, red, green, blue, hasAlpha);
    worker.setDestImage(result);
  }

  for (Thread thread : threads) {
    System.out.println("starting thread ");
    thread.start();
  }

  for (Thread thread : threads) {
    try {
      thread.join();
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
  return result;
}

SobelianFilterWorker:

protected void filterPixel(int index) {
  //[..]
  if (x < 1 || y < 1 || x > width - 2 || y > height - 2) {
    //do nothing
    color = red[index];
  } else {
    firstPass = red[index - 1] * kernel[0][1] + red[index + 1] * kernel[2][1]
        + red[index - width - 1] * kernel[0][0] + red[index - width] * kernel[1][0] 
        + red[index - width + 1] * kernel[2][0] + red[index + width - 1] * kernel[0][2]
        + red[index + width] * kernel[1][2] + red[index + width + 1] * kernel[2][2];

    //transposed kernel
    secondPass = red[index - 1] * kernel[1][0] + red[index + 1] * kernel[1][2]
        + red[index - width - 1] * kernel[0][0] + red[index - width] * kernel[0][1] 
        + red[index - width + 1] * kernel[0][2] + red[index + width - 1] * kernel[2][0]
        + red[index + width] * kernel[2][1] + red[index + width + 1] * kernel[2][2];
    color = (int) Math.floor(Math.sqrt(firstPass * firstPass + secondPass * secondPass));
  }
  if (color > 255) {
    color = 255;
  }
  // ... color turned into an ARGB integer argb
    destImage.setRGB(x, y, argb);      
  }

}

我怀疑错误出在上面的两个块中,因为当 filterPixel 是一个简单的灰度滤镜时,以下代码可以正常工作:

并行过滤器工作器:

private static final int loadPerInterval = 500;

private static volatile int dynamicLoadCounter = 0;

public static synchronized void resetDynamicLoadCounter() {
  dynamicLoadCounter = -loadPerInterval;
}

public void run() {
  if (checkNull()) {
    return;
  }

  int localCounter = loadPerInterval - 1;
  int start = 0;
  while (dynamicLoadCounter < width * height) {
    localCounter++;
    if (localCounter == loadPerInterval) {
      //fetch a package of pixels to work on and mark them as being worked on
      start = syncCounterUp();
      System.out.println("#" + threadID + " starting at " + start);
      localCounter = 0;
    }
    if (start + localCounter < width * height) {
      filterPixel(start + localCounter);
    } else {
      return;
    }
  }
}

private static synchronized int syncCounterUp() {
  dynamicLoadCounter += loadPerInterval;
  return dynamicLoadCounter;
}

怎么了,我是不是没有同步?我对解释我的线程到底在做什么以及为什么会出现这些工件非常感兴趣。感谢您的观看!

这可以通过使用 ThreadPool 并为它提供 Callable 来解决。由于锁的缘故,这种接缝有点过度设计,但它使你的线程分开,即使一个图像没有完成第一个过滤器通道并在第二个通道中使用。 这是一个关于它如何工作的例子:

import java.awt.image.BufferedImage;
import java.util.*;
import java.util.concurrent.*;

public class Filter {
    int lockcount = 0;
    Worker worker = new Worker();
    List<Worker> fpThreads = new ArrayList<Worker>();
    List<Worker> spThreads = new ArrayList<Worker>();
    ExecutorService executor = Executors.newCachedThreadPool();
    Map<Integer, Object> lockMap = Collections.synchronizedMap(new Hashtable<Integer, Object>());


    public static void main(String[] args) {
        Filter filter = new Filter();

        for (int i = 0; i < 1000; i++) {
            Worker w1 = new Worker();
            filter.fpThreads.add(w1);
        }
        for (int i = 0; i < 1000; i++) {
            Worker w1 = new Worker();
            filter.spThreads.add(w1);
        }

        filter.filer();
    }


    public void filer() {
        runPass(lockMap, fpThreads);
        runPass(lockMap, spThreads);
    }

    private BufferedImage runPass(Map<Integer, Object> lockMap, List<Worker> threads) {
        Future<BufferedImage> future = null;
        Object lock = null;
        for (Worker thread : threads) {
            lock = lockMap.get(worker.hashCode());
            if (lock == null) {
                lock = thread;
                lockMap.put(thread.hashCode(), lock);
                future = executor.submit(thread);
            } else { //we have a lock
                waitOnLock(thread, lock);
            }
        }
        try {
            //get() waits until it gets an result
            return future.get();

        } catch (InterruptedException | ExecutionException ex) {
            // TODO Auto-generated catch block
            ex.printStackTrace();
        }
        synchronized (lock) {
            lockMap.remove(lock.hashCode());
            lock.notifyAll();
            System.out.println("Notify: " + lock.hashCode() + " with " + lockcount + " locks in use.");
        }
        return null;
    }

    private void waitOnLock(Worker thread, Object lock) {
        synchronized (lock) {
            try {
                lockcount++;
                System.out.println("Wait: " + thread.hashCode() + " with " + lockcount + " locks in use.");
                lock.wait();
                lockcount--;
                System.out.println("Continuing: " + thread.hashCode() + " with " + lockcount + " locks in use.");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

class Worker implements Callable<BufferedImage> {

    public BufferedImage call() throws Exception {
        //Here you do you filtering
        System.out.println("Thread called: " + this.hashCode());
        Thread.sleep(1000);
        return null;
    }
}

为了您的目的,您可以实施工作人员来进行过滤。

嘿!我的问题比我想象的要简单得多。起初,我的灰度滤镜似乎工作正常,但事实并非如此,这让我大吃一惊。解决方法如下:

public void run() {
  if (checkNull()) {
    return;
  }

  int localCounter = loadPerInterval - 1;
  int start = 0;
  while (start + localCounter < width * height) {
    localCounter++;
    if (localCounter == loadPerInterval) {
      start = syncCounterUp();
      localCounter = 0;
    }
    if (start + localCounter < width * height) { 
      filterPixel(start + localCounter);
    }
  }
}

这是ParallelFilterworker中的运行方法。我已经更改了 while 循环的条件。

发生了什么,对于某些线程,while 条件变为假,因为其他线程已经获取了最后剩余的像素,而该线程仍在忙于处理一些像素。这导致线程放弃它所担保的像素,因为它认为一切都已完成。多么愚蠢的错误。非常感谢大家的回答和时间!