不同线程数的不同结果
Different results with different number of threads
我尝试分块读取文件并将每个块传递给一个线程,该线程将计算块中每个字节被包含的次数。问题在于,当我将整个文件仅传递给一个线程时,我得到了正确的结果,但将其传递给多个线程时,结果变得非常奇怪。这是我的代码:
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.HashSet;
import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class Main{
public static void main(String[] args) throws InterruptedException, ExecutionException, IOException
{
// get number of threads to be run
Scanner in = new Scanner(System.in);
int numberOfThreads = in.nextInt();
// read file
File file = new File("testfile.txt");
long fileSize = file.length();
long chunkSize = fileSize / numberOfThreads;
FileInputStream input = new FileInputStream(file);
byte[] buffer = new byte[(int)chunkSize];
ExecutorService pool = Executors.newFixedThreadPool(numberOfThreads);
Set<Future<int[]>> set = new HashSet<Future<int[]>>();
while(input.available() > 0)
{
if(input.available() < chunkSize)
{
chunkSize = input.available();
}
input.read(buffer, 0, (int) chunkSize);
Callable<int[]> callable = new FrequenciesCounter(buffer);
Future<int[]> future = pool.submit(callable);
set.add(future);
}
// let`s assume we will use extended ASCII characters only
int alphabet = 256;
// hold how many times each character is contained in the input file
int[] frequencies = new int[alphabet];
// sum the frequencies from each thread
for(Future<int[]> future: set)
{
for(int i = 0; i < alphabet; i++)
{
frequencies[i] += future.get()[i];
}
}
input.close();
for(int i = 0; i< frequencies.length; i++)
{
if(frequencies[i] > 0) System.out.println((char)i + " " + frequencies[i]);
}
}
}
//help class for multithreaded frequencies` counting
class FrequenciesCounter implements Callable<int[]>
{
private int[] frequencies = new int[256];
private byte[] input;
public FrequenciesCounter(byte[] buffer)
{
input = buffer;
}
public int[] call()
{
for(int i = 0; i < input.length; i++)
{
frequencies[(int)input[i]]++;
}
return frequencies;
}
}
我的 testfile.txt 是 aaaaaaaaaaaaaabbbbcccccc
。
使用 1 个线程,输出为:
a 14
b 4
c 6`
2 个线程的输出是:
a 4
b 8
c 12
3 个线程的输出是:
b 6
c 18
还有其他我想不通的奇怪结果。有人可以帮忙吗?
每个线程都在使用相同的缓冲区,一个线程将覆盖缓冲区,因为另一个线程正在尝试处理它。
您需要确保每个线程都有自己的缓冲区,其他人无法修改。
为每个线程创建 byte[] 数组。
public static void main(String[] args) throws InterruptedException, ExecutionException, IOException {
// get number of threads to be run
Scanner in = new Scanner(System.in);
int numberOfThreads = in.nextInt();
// read file
File file = new File("testfile.txt");
long fileSize = file.length();
long chunkSize = fileSize / numberOfThreads;
FileInputStream input = new FileInputStream(file);
ExecutorService pool = Executors.newFixedThreadPool(numberOfThreads);
Set<Future<int[]>> set = new HashSet<Future<int[]>>();
while (input.available() > 0) {
//create buffer for every thread.
byte[] buffer = new byte[(int) chunkSize];
if (input.available() < chunkSize) {
chunkSize = input.available();
}
input.read(buffer, 0, (int) chunkSize);
Callable<int[]> callable = new FrequenciesCounter(buffer);
Future<int[]> future = pool.submit(callable);
set.add(future);
}
// let`s assume we will use extended ASCII characters only
int alphabet = 256;
// hold how many times each character is contained in the input file
int[] frequencies = new int[alphabet];
// sum the frequencies from each thread
for (Future<int[]> future : set) {
for (int i = 0; i < alphabet; i++) {
frequencies[i] += future.get()[i];
}
}
input.close();
for (int i = 0; i < frequencies.length; i++) {
if (frequencies[i] > 0)
System.out.println((char) i + " " + frequencies[i]);
}
}
}
我尝试分块读取文件并将每个块传递给一个线程,该线程将计算块中每个字节被包含的次数。问题在于,当我将整个文件仅传递给一个线程时,我得到了正确的结果,但将其传递给多个线程时,结果变得非常奇怪。这是我的代码:
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.HashSet;
import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class Main{
public static void main(String[] args) throws InterruptedException, ExecutionException, IOException
{
// get number of threads to be run
Scanner in = new Scanner(System.in);
int numberOfThreads = in.nextInt();
// read file
File file = new File("testfile.txt");
long fileSize = file.length();
long chunkSize = fileSize / numberOfThreads;
FileInputStream input = new FileInputStream(file);
byte[] buffer = new byte[(int)chunkSize];
ExecutorService pool = Executors.newFixedThreadPool(numberOfThreads);
Set<Future<int[]>> set = new HashSet<Future<int[]>>();
while(input.available() > 0)
{
if(input.available() < chunkSize)
{
chunkSize = input.available();
}
input.read(buffer, 0, (int) chunkSize);
Callable<int[]> callable = new FrequenciesCounter(buffer);
Future<int[]> future = pool.submit(callable);
set.add(future);
}
// let`s assume we will use extended ASCII characters only
int alphabet = 256;
// hold how many times each character is contained in the input file
int[] frequencies = new int[alphabet];
// sum the frequencies from each thread
for(Future<int[]> future: set)
{
for(int i = 0; i < alphabet; i++)
{
frequencies[i] += future.get()[i];
}
}
input.close();
for(int i = 0; i< frequencies.length; i++)
{
if(frequencies[i] > 0) System.out.println((char)i + " " + frequencies[i]);
}
}
}
//help class for multithreaded frequencies` counting
class FrequenciesCounter implements Callable<int[]>
{
private int[] frequencies = new int[256];
private byte[] input;
public FrequenciesCounter(byte[] buffer)
{
input = buffer;
}
public int[] call()
{
for(int i = 0; i < input.length; i++)
{
frequencies[(int)input[i]]++;
}
return frequencies;
}
}
我的 testfile.txt 是 aaaaaaaaaaaaaabbbbcccccc
。
使用 1 个线程,输出为:
a 14
b 4
c 6`
2 个线程的输出是:
a 4
b 8
c 12
3 个线程的输出是:
b 6
c 18
还有其他我想不通的奇怪结果。有人可以帮忙吗?
每个线程都在使用相同的缓冲区,一个线程将覆盖缓冲区,因为另一个线程正在尝试处理它。
您需要确保每个线程都有自己的缓冲区,其他人无法修改。
为每个线程创建 byte[] 数组。
public static void main(String[] args) throws InterruptedException, ExecutionException, IOException {
// get number of threads to be run
Scanner in = new Scanner(System.in);
int numberOfThreads = in.nextInt();
// read file
File file = new File("testfile.txt");
long fileSize = file.length();
long chunkSize = fileSize / numberOfThreads;
FileInputStream input = new FileInputStream(file);
ExecutorService pool = Executors.newFixedThreadPool(numberOfThreads);
Set<Future<int[]>> set = new HashSet<Future<int[]>>();
while (input.available() > 0) {
//create buffer for every thread.
byte[] buffer = new byte[(int) chunkSize];
if (input.available() < chunkSize) {
chunkSize = input.available();
}
input.read(buffer, 0, (int) chunkSize);
Callable<int[]> callable = new FrequenciesCounter(buffer);
Future<int[]> future = pool.submit(callable);
set.add(future);
}
// let`s assume we will use extended ASCII characters only
int alphabet = 256;
// hold how many times each character is contained in the input file
int[] frequencies = new int[alphabet];
// sum the frequencies from each thread
for (Future<int[]> future : set) {
for (int i = 0; i < alphabet; i++) {
frequencies[i] += future.get()[i];
}
}
input.close();
for (int i = 0; i < frequencies.length; i++) {
if (frequencies[i] > 0)
System.out.println((char) i + " " + frequencies[i]);
}
}
}