Threadpoolexecutor 不更新 concurrenthashmap
Threadpool executor not updating the concurrent hashmap
我正在使用线程池执行器生成 5 个线程来并行执行 5 个不同的命令。每个线程完成后,我将使用 threadid 的条目作为键更新并发哈希图,并作为值终止。但是我的线程池没有更新成功完成命令执行的哈希图。
主要Class:
package com.cisco.executor;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
public class MainExecutor {
static String element;
static ConcurrentHashMap<Integer, String> map = new ConcurrentHashMap<Integer, String>();
static Integer array[] = { 1, 2, 3, 4, 5 };
// static Integer array[] = { 1 };
static List<Integer> threadid = Arrays.asList(array);
static String SQOOP_XXCCS_DS_SAHDR_CORE = ReadProperties.getInstance().getProperty("SQOOP_XXCCS_DS_SAHDR_CORE");
static String SQOOP_XXCCS_DS_CVDPRDLINE_DETAIL = ReadProperties.getInstance()
.getProperty("SQOOP_XXCCS_DS_CVDPRDLINE_DETAIL");
static String SQOOP_XXCCS_DS_INSTANCE_DETAIL = ReadProperties.getInstance()
.getProperty("SQOOP_XXCCS_DS_INSTANCE_DETAIL");
static String SQOOP_XXCCS_SCDC_PRODUCT_PROFILE = ReadProperties.getInstance()
.getProperty("SQOOP_XXCCS_SCDC_PRODUCT_PROFILE");
static String SQOOP_MTL_SYSTEM_ITEMS_B = ReadProperties.getInstance().getProperty("SQOOP_MTL_SYSTEM_ITEMS_B");
public static void main(String[] args) {
ThreadPoolExecutor executors = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
// ThreadPoolExecutor executors = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
System.out.println("at executors step");
List<String> getlist = getList();
Iterator<Integer> itr2 = threadid.iterator();
for (Iterator<String> itr = getlist.iterator(); itr.hasNext() && itr2.hasNext();) {
String element = (String) itr.next();
int thread_id = itr2.next();
String[] command = { "ssh", "hddev-c01-edge-02", "\"" + element + "\"" };
System.out.println("the command is as below ");
System.out.println(Arrays.toString(command));
System.out.println("inside the iterator");
ParallelExecutor pe = new ParallelExecutor(command, thread_id, map);
executors.execute(pe);
}
// executors.shutdown();
for(Map.Entry<Integer, String> entry: map.entrySet())
{
Integer key = entry.getKey();
String value = entry.getValue();
System.out.println("The key is " + key + " The value is " + value);
System.out.println("Thread " + key + " is terminated");
}
}
public static List<String> getList() {
List<String> commandlist = new ArrayList<String>();
System.out.println("inside getList");
commandlist.add(SQOOP_XXCCS_DS_SAHDR_CORE);
commandlist.add(SQOOP_XXCCS_DS_CVDPRDLINE_DETAIL);
commandlist.add(SQOOP_XXCCS_DS_INSTANCE_DETAIL);
commandlist.add(SQOOP_XXCCS_SCDC_PRODUCT_PROFILE);
commandlist.add(SQOOP_MTL_SYSTEM_ITEMS_B);
return commandlist;
}
}
可运行Class:
package com.cisco.executor;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
public class ParallelExecutor implements Runnable {
private static Logger LOGGER = Logger.getLogger(ParallelExecutor.class);
String[] command;
int threadid;
ConcurrentHashMap<Integer, String> map;
public ParallelExecutor(String[] command, int threadid, ConcurrentHashMap<Integer, String> map) {
this.command = command;
this.threadid = threadid;
this.map = map;
}
@Override
public void run() {
ProcessBuilder processbuilder = new ProcessBuilder(command);
LOGGER.info(command);
try {
Process process = processbuilder.inheritIO().start();
System.out.println("inside process builder ");
process.waitFor();
BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
String readline;
while ((readline = reader.readLine()) != null) {
LOGGER.info(readline);
}
// getting the thread state and adding it to a collection
Thread.State state = Thread.currentThread().getState();
if (state == Thread.State.TERMINATED) {
map.put(threadid, "TERMINATED");
}
} catch (Exception e) {
LOGGER.error(e.getMessage());
}
}
}
是不是我的实现有误。有人可以帮我实现吗。
与其尝试在线程中捕获线程的结果(如果抛出 exception/error 则容易出错,尤其是)我建议您保留 Future 对象并检查它们。
ExecutorService exec = Executors.newFixedThreadPool(5);
System.out.println("at executors step");
Map<String, Future<?>> results = new HashMap<>();
for (String element : getList()) {
String[] command = { "ssh", "hddev-c01-edge-02", "\"" + element + "\"" };
results.put(element, exec.submit(new ParallelExecutor(command, thread_id, map)));
}
for(Map.Entry<String, Future<?>> entry: map.entrySet()) {
try {
entry.getValue().get();
System.out.println(entry.getKey()+ " is complete");
} catch (ExecutionException e) {
System.out.println(entry.getKey()+ " failed with");
e.getCause().printStackTrace(System.out);
}
}
ThreadPoolExecutor 在被要求之前不会终止。
所以,首先你必须打电话给
// executors.shutdown();
,您保留了评论。
第二,您需要等待线程正确终止。为此,在 for(Map.Entry entry: map.entrySet())
之前添加一个循环
while (!es.isTerminated()) {
}
但是,由于一个线程可能 运行 许多 运行nable,如果我没听错,您希望在一个 Runnable 执行完成后更新 CHM。
为此,您必须使用 CustomThread class。扩展线程
并仅覆盖 1 种方法,afterExecute()
您需要从中放置代码以使用 Runnable 的 ID 和终止状态更新 CHM。但请记住,这意味着完成传递的 Runnables 运行() 方法,而不是底层线程的终止。
我正在使用线程池执行器生成 5 个线程来并行执行 5 个不同的命令。每个线程完成后,我将使用 threadid 的条目作为键更新并发哈希图,并作为值终止。但是我的线程池没有更新成功完成命令执行的哈希图。
主要Class:
package com.cisco.executor;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
public class MainExecutor {
static String element;
static ConcurrentHashMap<Integer, String> map = new ConcurrentHashMap<Integer, String>();
static Integer array[] = { 1, 2, 3, 4, 5 };
// static Integer array[] = { 1 };
static List<Integer> threadid = Arrays.asList(array);
static String SQOOP_XXCCS_DS_SAHDR_CORE = ReadProperties.getInstance().getProperty("SQOOP_XXCCS_DS_SAHDR_CORE");
static String SQOOP_XXCCS_DS_CVDPRDLINE_DETAIL = ReadProperties.getInstance()
.getProperty("SQOOP_XXCCS_DS_CVDPRDLINE_DETAIL");
static String SQOOP_XXCCS_DS_INSTANCE_DETAIL = ReadProperties.getInstance()
.getProperty("SQOOP_XXCCS_DS_INSTANCE_DETAIL");
static String SQOOP_XXCCS_SCDC_PRODUCT_PROFILE = ReadProperties.getInstance()
.getProperty("SQOOP_XXCCS_SCDC_PRODUCT_PROFILE");
static String SQOOP_MTL_SYSTEM_ITEMS_B = ReadProperties.getInstance().getProperty("SQOOP_MTL_SYSTEM_ITEMS_B");
public static void main(String[] args) {
ThreadPoolExecutor executors = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
// ThreadPoolExecutor executors = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
System.out.println("at executors step");
List<String> getlist = getList();
Iterator<Integer> itr2 = threadid.iterator();
for (Iterator<String> itr = getlist.iterator(); itr.hasNext() && itr2.hasNext();) {
String element = (String) itr.next();
int thread_id = itr2.next();
String[] command = { "ssh", "hddev-c01-edge-02", "\"" + element + "\"" };
System.out.println("the command is as below ");
System.out.println(Arrays.toString(command));
System.out.println("inside the iterator");
ParallelExecutor pe = new ParallelExecutor(command, thread_id, map);
executors.execute(pe);
}
// executors.shutdown();
for(Map.Entry<Integer, String> entry: map.entrySet())
{
Integer key = entry.getKey();
String value = entry.getValue();
System.out.println("The key is " + key + " The value is " + value);
System.out.println("Thread " + key + " is terminated");
}
}
public static List<String> getList() {
List<String> commandlist = new ArrayList<String>();
System.out.println("inside getList");
commandlist.add(SQOOP_XXCCS_DS_SAHDR_CORE);
commandlist.add(SQOOP_XXCCS_DS_CVDPRDLINE_DETAIL);
commandlist.add(SQOOP_XXCCS_DS_INSTANCE_DETAIL);
commandlist.add(SQOOP_XXCCS_SCDC_PRODUCT_PROFILE);
commandlist.add(SQOOP_MTL_SYSTEM_ITEMS_B);
return commandlist;
}
}
可运行Class:
package com.cisco.executor;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
public class ParallelExecutor implements Runnable {
private static Logger LOGGER = Logger.getLogger(ParallelExecutor.class);
String[] command;
int threadid;
ConcurrentHashMap<Integer, String> map;
public ParallelExecutor(String[] command, int threadid, ConcurrentHashMap<Integer, String> map) {
this.command = command;
this.threadid = threadid;
this.map = map;
}
@Override
public void run() {
ProcessBuilder processbuilder = new ProcessBuilder(command);
LOGGER.info(command);
try {
Process process = processbuilder.inheritIO().start();
System.out.println("inside process builder ");
process.waitFor();
BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
String readline;
while ((readline = reader.readLine()) != null) {
LOGGER.info(readline);
}
// getting the thread state and adding it to a collection
Thread.State state = Thread.currentThread().getState();
if (state == Thread.State.TERMINATED) {
map.put(threadid, "TERMINATED");
}
} catch (Exception e) {
LOGGER.error(e.getMessage());
}
}
}
是不是我的实现有误。有人可以帮我实现吗。
与其尝试在线程中捕获线程的结果(如果抛出 exception/error 则容易出错,尤其是)我建议您保留 Future 对象并检查它们。
ExecutorService exec = Executors.newFixedThreadPool(5);
System.out.println("at executors step");
Map<String, Future<?>> results = new HashMap<>();
for (String element : getList()) {
String[] command = { "ssh", "hddev-c01-edge-02", "\"" + element + "\"" };
results.put(element, exec.submit(new ParallelExecutor(command, thread_id, map)));
}
for(Map.Entry<String, Future<?>> entry: map.entrySet()) {
try {
entry.getValue().get();
System.out.println(entry.getKey()+ " is complete");
} catch (ExecutionException e) {
System.out.println(entry.getKey()+ " failed with");
e.getCause().printStackTrace(System.out);
}
}
ThreadPoolExecutor 在被要求之前不会终止。 所以,首先你必须打电话给
// executors.shutdown();
,您保留了评论。 第二,您需要等待线程正确终止。为此,在 for(Map.Entry entry: map.entrySet())
之前添加一个循环while (!es.isTerminated()) {
}
但是,由于一个线程可能 运行 许多 运行nable,如果我没听错,您希望在一个 Runnable 执行完成后更新 CHM。
为此,您必须使用 CustomThread class。扩展线程
并仅覆盖 1 种方法,afterExecute()
您需要从中放置代码以使用 Runnable 的 ID 和终止状态更新 CHM。但请记住,这意味着完成传递的 Runnables 运行() 方法,而不是底层线程的终止。