使用多线程进行奇偶排序 Java

Odd-Even sort Java using multithreading

我是这个小组的新手,所以我相信可以在这里获得帮助,因为我在 Google.

上找不到关于我的问题的任何信息

我正在尝试实现 Java EvenOdd 转置并行排序。因此,作为我的算法,我认为划分分区是个好主意:

  1. 如何知道我应该将数组列表分成多少部分?例如,我现在使用17个元素,以使其更易于理解。
  2. 此外,我不知道我是否应该使用一个叫做 ExecutorService 的东西,或者只是正常地创建线程。

我在这里添加我当前的逻辑:从奇数阶段开始分为两部分并分配两个线程进行这些比较,然后创建一个屏障等待线程并因此启动其他线程与偶数一起工作索引类似。感谢您能给我的任何帮助。目前,我不知道如何实现这个算法,所以任何的话都可能有帮助。

欢迎使用 Whosebug!

How to know how many parts should I divide my array list? For example, I use 17 elements for now to make it more understandable.

您将数组划分为子数组的直觉是正确的,因为它通常是并发排序算法的基础。正如你已经知道的算法,我们只需要讨论实现:

  1. 直观的解决方案是为每个奇数索引创建一个 threadstart() 所有这些索引用于 compare-and-swap 和 join() 它们到主线程等待结果。搓揉并重复 N 次。然而,这是非常低效的,因为创建和启动所有 O(N^2) 线程的开销对于快速 compare-and-swap.
  2. 来说太大了。
  3. 我们也可以为每个奇数索引创建线程,并在左右之间重复compare-and-swap。这是有问题的,因为我们必须反复锁定左右(以防止数据竞争),这会导致调度程序产生大量无用的开销,而且我们不知道何时完成。
  4. 最后,我们可以为每个奇数索引创建线程,也让它们反复左右交替,每次都让它们在屏障上等待。这对我来说似乎是正确的选择,因为它最大限度地减少了线程管理开销,并且还限制了无用的比较和数据竞争。

这使我们得到以下代码:

import java.util.Arrays;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class OddEvenSort {
    public static void main(String[] args) {
        int[] arr = {83, 71, 72, 26,  6, 81, 53, 72, 20, 35, 40, 79, 3, 90, 89, 52, 30};
        sortArr(arr);
        System.out.println(Arrays.toString(arr));
    }
    
    public static void sortArr(int[] arr) {
        int threadNum = arr.length/2;
        CyclicBarrier barr = new CyclicBarrier(threadNum);
        Thread[] threads = new Thread[threadNum];
        for (int i = 0; i < threadNum; i++) {
            threads[i] = new Thread(new CompareSwapThread(arr, 2*i + 1, barr));
            threads[i].start();
        }
        for (int i = 0; i < threadNum; i++) {
            try {
                threads[i].join();
            } catch (InterruptedException e) {e.printStackTrace();}
        }
    }
}

class CompareSwapThread implements Runnable {
    private int[] arr;
    private int index;
    private CyclicBarrier barr;
    
    public CompareSwapThread(int[] arr, int index, CyclicBarrier barr) {
        this.arr = arr;
        this.index = index;
        this.barr = barr;
    }
    
    @Override
    public void run() {
        for (int i = 0; i < arr.length; i++) {
            if (arr[index - 1] > arr[index]) {
                int t = arr[index - 1];
                arr[index - 1] = arr[index];
                arr[index] = t;
            }
            try {
                barr.await();
            } catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}
            if (index + 1 < arr.length && arr[index] > arr[index + 1]) {
                int t = arr[index];
                arr[index] = arr[index + 1];
                arr[index + 1] = t;
            }
            try {
                barr.await();
            } catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}
        }
    }   
}

请注意,此算法的运行时间为 O(n),这对于此类并行算法而言并不是最佳选择。您可以尝试并行实现的另一种算法是 MergeSort algorithm. There are a lot of things you can parallelize with this one, but the most important one is the merging, as it is the bottleneck in the sequential algorithm. You can look at Batcher Odd-Even Mergesort or look at other parallel merges.

Also, I do not know if I should use something called ExecutorService or just create threads normally.

Java 提供了许多不同的并行工具,它们在不同的抽象级别上运行。可以说 ExecutorService 比基本线程更 'high-level',因为它简化了线程管理。它还会优化任务的调度,以更好地执行。

这是我们的实现,使用 ExecutorService :

import java.util.Arrays;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class OddEvenSort {
    public static void main(String[] args) {
        int[] arr = {83, 71, 72, 26,  6, 81, 53, 72, 20, 35, 40, 79, 3, 90, 89, 52, 30};
        sortArr(arr);
        System.out.println(Arrays.toString(arr));
    }
    
    public static void sortArr(int[] arr) {
        int threadNum = arr.length/2;
        CyclicBarrier barr = new CyclicBarrier(threadNum);
        ExecutorService exec = Executors.newFixedThreadPool(threadNum);
        Future<?>[] awaits = new Future<?>[threadNum];
        for (int i = 0; i < threadNum; i++) {
            awaits[i] = exec.submit(new CompareSwapThread(arr, 2*i + 1, barr));
        }
        for (int i = 0; i < threadNum; i++) {
            try {
                awaits[i].get();
            } catch (InterruptedException | ExecutionException e) {e.printStackTrace();}
        }
    }
}

class CompareSwapThread implements Runnable {
    private int[] arr;
    private int index;
    private CyclicBarrier barr;
    
    public CompareSwapThread(int[] arr, int index, CyclicBarrier barr) {
        this.arr = arr;
        this.index = index;
        this.barr = barr;
    }
    
    @Override
    public void run() {
        for (int i = 0; i < arr.length; i++) {
            if (arr[index - 1] > arr[index]) {
                int t = arr[index - 1];
                arr[index - 1] = arr[index];
                arr[index] = t;
            }
            try {
                barr.await();
            } catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}
            if (index + 1 < arr.length && arr[index] > arr[index + 1]) {
                int t = arr[index];
                arr[index] = arr[index + 1];
                arr[index + 1] = t;
            }
            try {
                barr.await();
            } catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}
        }
    }   
}

如您所见,我们正在使用线程 factory newFixedThreadPool static method to generate and intantiate all the treads. We then add our tasks to the thread pool, which will return a Future variable. A Future will hold the value, when the thread finished (in our case null). Calling the Future.get() method will wait for the result (and thus the thread to be finished). Notice that is you want to implement some sort of nested thread parralelism (for example, when parallelizing MergeSort). You should use ForkJoinPool as it is made specifically for that. Finally, here 是关于 ExecutorService 的一个很好的教程。

如果您需要任何详细信息,请随时在评论中提问。