为什么 Java 并发处理不适用于新实例化的对象,而它适用于相同 class 的反序列化对象?

Why does Java concurrent processing not work with newly instantiated objects, while it works with deserialized objects of the same class?

我正在使用 java.util.concurrent.ExecutorService 对所有可用处理资源执行并发计算。在下面的代码中,MyProcessor class 的实例在其 performParallelProcessing 方法中创建了多个 ProcessingExecutor class 实例并将它们提交给 ExecutorService期望获得相应回调的实例。

处理发生在 ProcessingExecutor class 的 performProcessing 方法中。我用于处理的数据是 class ComputationData 的对象实例。它们既可以从文件系统中检索(如果存在序列化数据),也可以初始化为新实例。

这是问题所在:

ComputationData 个对象实例从文件系统反序列化的情况下,并发处理按我预期的方式执行。它在所有处理核心上并行运行,占用100%的处理资源。

如果 ComputationData 个对象实例是新初始化的,并发处理不会像我预期的那样执行。它像单线程执行一样运行,占用大约 15% 的处理资源。

正如我所猜测的,我新初始化的 ComputationData 对象实例有问题。但我不知道它们有什么问题,以及为什么并发对它们不起作用,而它适用于它们的序列化->反序列化版本。任何提示或想法将不胜感激。

public class MyProcessor {
    private boolean processingFinished = false;

    public void performParallelProcessing(){
        int count = 0;
        boolean continueProcessing = true;

        int nrOfProcessors = Runtime.getRuntime().availableProcessors();
        ExecutorService es = Executors.newFixedThreadPool(nrOfProcessors);

        while (continueProcessing){
            ProcessingExecutor task = new ProcessingExecutor(count);
            task.setCaller(this);
            es.submit(task);
            count++;

            if (!processingFinished){
                try{
                    Thread.sleep(50);
                }
                catch(SecurityException | InterruptedException e){
                    //Exception handling
                }
            }
            else{
                continueProcessing = false;
            }
        }
    }

    public void callBack(ProcessingResult result) {
        if(result.allDataProcessed()){
            this.processingFinished = true;
        }
    }
}

public class ProcessingExecutor implements Callable {
    private MyProcessor processor;
    private int count;

    public ProcessingExecutor(int count){
        this.count = count;
    }

    public Object call() {
        ProcessingResult result = null;
        try {
            result = performProcessing();
        }
        catch (SecurityException e) {
            //Exception handling
        }

        processor.callBack(result);     
        return null;
    }

    public void setCaller(MyProcessor processor) {
       this.processor = processor;
    }

    public MyProcessor getCaller() {
        return this.processor;
    }

    private ProcessingResult performProcessing(){
        ComputationData data = null;

        if(serializedDataExist()){
            data = getSerializedData(count);
        }
        else{
            data = initializeNewData(count);
        }

        ProcessingResult result = new ProcessingResult(data, count);
        return result;
    }

    private ComputationData getSerializedData(int count){
        ComputationData data = null;
        // code to retrieve a ComputationData object from the file system
        // based on 'count' value.
        return data;
    }

    private ComputationData initializeNewData(int count){
        ComputationData data = null;
        // code to initialize a new instance of ComputationData class
        // based on 'count' value.
        return data;
    }

    private boolean serializedDataExist(){
        boolean dataFound = false;
        // code to verify whether serialized ComputationData objects are
        // present on the file system.
        return dataFound;
    }
}

为什么需要 Thread.sleep(50)?这就是使并发执行成为顺序执行的原因,尤其是当每次计算 <= 50 毫秒时。我的猜测是反序列化时间 + 计算时间超过 50 毫秒,这就是为什么在反序列化对象场景中你有更多 CPU activity,因为你实际上有多个任务 运行 同时在执行程序线程中。您应该尝试不使用 Thread.sleep(50) 或至少使用更短的超时时间。