ConcurrentHashMap 值未更新
ConcurrentHashMap Values not updating
我正在尝试使用一组包含单词的文档创建一个简单的多线程 dictionary/index。该字典存储在具有 String 键和 Vector 值的 ConcurrentHashMap 中。对于字典中的每个单词,都有一个外观列表,它是一个包含一系列元组对象(自定义对象)的向量。(在我的例子中,元组是 2 个数字的组合)。
每个线程将一个文档作为输入,找到其中的所有单词并尝试更新ConcurrentHashMap。另外,我必须指出,2 个线程可能会尝试通过在其值上添加一个新元组来更新 Map 的相同键。我只对 Vector 进行写操作。
下面是提交新话题的代码。如您所见,我将字典作为输入,它是一个带有字符串键和向量值的 ConcurrentHashMap
public void run(Crawler crawler) throws InterruptedException {
while (!crawler.getFinishedPages().isEmpty()) {
this.INDEXING_SERVICE.submit(new IndexingTask(this.dictionary, sources,
crawler.getFinishedPages().take()));
}
this.INDEXING_SERVICE.shutdown();
}
下面是索引线程的代码:
public class IndexingTask implements Runnable {
private ConcurrentHashMap<String, Vector<Tuple>> dictionary;
private HtmlDocument document;
public IndexingTask(ConcurrentHashMap<String, Vector<Tuple>> dictionary,
ConcurrentHashMap<Integer, String> sources, HtmlDocument document) {
this.dictionary = dictionary;
this.document = document;
sources.putIfAbsent(document.getDocId(), document.getURL());
}
@Override
public void run() {
for (String word : document.getTerms()) {
this.dictionary.computeIfAbsent(word, k -> new Vector<Tuple>())
.add(new Tuple(document.getDocId(), document.getWordFrequency(word)));
}
}
}
代码似乎是正确的,但词典没有正确更新。我的意思是原始词典中缺少一些单词(键),而其他一些键在其 Vector 中的项目较少。
我做了一些调试,发现在线程实例终止之前,它已经计算出正确的键和值。虽然线程中作为输入给出的原始字典(查看第一段代码)没有更新 correctly.Do 你有什么想法或建议吗?
当您调用 this.INDEXING_SERVICE.shutdown()
时,可能 'IndexingTask' 还没有 运行,我更新了您的代码:
import java.util.Arrays;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
class Tuple {
private Integer key;
private String value;
public Tuple(Integer key, String value) {
this.key = key;
this.value = value;
}
@Override
public String toString() {
return "(" + key + ", " + value + ")";
}
}
class HtmlDocument {
private int docId;
private String URL;
private List<String> terms;
public int getDocId() {
return docId;
}
public void setDocId(int docId) {
this.docId = docId;
}
public String getURL() {
return URL;
}
public void setURL(String URL) {
this.URL = URL;
}
public List<String> getTerms() {
return terms;
}
public void setTerms(List<String> terms) {
this.terms = terms;
}
public String getWordFrequency(String word) {
return "query";
}
}
class IndexingTask implements Runnable {
private ConcurrentHashMap<String, Vector<Tuple>> dictionary;
private HtmlDocument document;
public IndexingTask(ConcurrentHashMap<String, Vector<Tuple>> dictionary,
ConcurrentHashMap<Integer, String> sources, HtmlDocument document) {
this.dictionary = dictionary;
this.document = document;
sources.putIfAbsent(document.getDocId(), document.getURL());
}
@Override
public void run() {
for (String word : document.getTerms()) {
this.dictionary.computeIfAbsent(word, k -> new Vector<Tuple>())
.add(new Tuple(document.getDocId(), document.getWordFrequency(word)));
}
Crawler.RUNNING_TASKS.decrementAndGet();
}
}
class Crawler {
protected BlockingQueue<HtmlDocument> finishedPages = new LinkedBlockingQueue<>();
public static final AtomicInteger RUNNING_TASKS = new AtomicInteger();
public BlockingQueue<HtmlDocument> getFinishedPages() {
return finishedPages;
}
}
public class ConcurrentHashMapExample {
private ConcurrentHashMap<Integer, String> sources = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, Vector<Tuple>> dictionary = new ConcurrentHashMap<>();
private static final ExecutorService INDEXING_SERVICE = Executors.newSingleThreadExecutor();
public void run(Crawler crawler) throws InterruptedException {
while (!crawler.getFinishedPages().isEmpty()) {
Crawler.RUNNING_TASKS.incrementAndGet();
this.INDEXING_SERVICE.submit(new IndexingTask(this.dictionary, sources,
crawler.getFinishedPages().take()));
}
//when you call ```this.INDEXING_SERVICE.shutdown()``` may 'IndexingTask' has not run yet
while (Crawler.RUNNING_TASKS.get() > 0)
Thread.sleep(3);
this.INDEXING_SERVICE.shutdown();
}
public ConcurrentHashMap<Integer, String> getSources() {
return sources;
}
public ConcurrentHashMap<String, Vector<Tuple>> getDictionary() {
return dictionary;
}
public static void main(String[] args) throws Exception {
ConcurrentHashMapExample example = new ConcurrentHashMapExample();
Crawler crawler = new Crawler();
HtmlDocument document = new HtmlDocument();
document.setDocId(1);
document.setURL("http://127.0.0.1/abc");
document.setTerms(Arrays.asList("hello", "world"));
crawler.getFinishedPages().add(document);
example.run(crawler);
System.out.println("source: " + example.getSources());
System.out.println("dictionary: " + example.getDictionary());
}
}
输出:
source: {1=http://127.0.0.1/abc}
dictionary: {world=[(1, query)], hello=[(1, query)]}
我认为,在您的业务中,您应该使用 'Producer'、'Consumer' 设计模式
我正在尝试使用一组包含单词的文档创建一个简单的多线程 dictionary/index。该字典存储在具有 String 键和 Vector 值的 ConcurrentHashMap 中。对于字典中的每个单词,都有一个外观列表,它是一个包含一系列元组对象(自定义对象)的向量。(在我的例子中,元组是 2 个数字的组合)。
每个线程将一个文档作为输入,找到其中的所有单词并尝试更新ConcurrentHashMap。另外,我必须指出,2 个线程可能会尝试通过在其值上添加一个新元组来更新 Map 的相同键。我只对 Vector 进行写操作。
下面是提交新话题的代码。如您所见,我将字典作为输入,它是一个带有字符串键和向量值的 ConcurrentHashMap
public void run(Crawler crawler) throws InterruptedException {
while (!crawler.getFinishedPages().isEmpty()) {
this.INDEXING_SERVICE.submit(new IndexingTask(this.dictionary, sources,
crawler.getFinishedPages().take()));
}
this.INDEXING_SERVICE.shutdown();
}
下面是索引线程的代码:
public class IndexingTask implements Runnable {
private ConcurrentHashMap<String, Vector<Tuple>> dictionary;
private HtmlDocument document;
public IndexingTask(ConcurrentHashMap<String, Vector<Tuple>> dictionary,
ConcurrentHashMap<Integer, String> sources, HtmlDocument document) {
this.dictionary = dictionary;
this.document = document;
sources.putIfAbsent(document.getDocId(), document.getURL());
}
@Override
public void run() {
for (String word : document.getTerms()) {
this.dictionary.computeIfAbsent(word, k -> new Vector<Tuple>())
.add(new Tuple(document.getDocId(), document.getWordFrequency(word)));
}
}
}
代码似乎是正确的,但词典没有正确更新。我的意思是原始词典中缺少一些单词(键),而其他一些键在其 Vector 中的项目较少。
我做了一些调试,发现在线程实例终止之前,它已经计算出正确的键和值。虽然线程中作为输入给出的原始字典(查看第一段代码)没有更新 correctly.Do 你有什么想法或建议吗?
当您调用 this.INDEXING_SERVICE.shutdown()
时,可能 'IndexingTask' 还没有 运行,我更新了您的代码:
import java.util.Arrays;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
class Tuple {
private Integer key;
private String value;
public Tuple(Integer key, String value) {
this.key = key;
this.value = value;
}
@Override
public String toString() {
return "(" + key + ", " + value + ")";
}
}
class HtmlDocument {
private int docId;
private String URL;
private List<String> terms;
public int getDocId() {
return docId;
}
public void setDocId(int docId) {
this.docId = docId;
}
public String getURL() {
return URL;
}
public void setURL(String URL) {
this.URL = URL;
}
public List<String> getTerms() {
return terms;
}
public void setTerms(List<String> terms) {
this.terms = terms;
}
public String getWordFrequency(String word) {
return "query";
}
}
class IndexingTask implements Runnable {
private ConcurrentHashMap<String, Vector<Tuple>> dictionary;
private HtmlDocument document;
public IndexingTask(ConcurrentHashMap<String, Vector<Tuple>> dictionary,
ConcurrentHashMap<Integer, String> sources, HtmlDocument document) {
this.dictionary = dictionary;
this.document = document;
sources.putIfAbsent(document.getDocId(), document.getURL());
}
@Override
public void run() {
for (String word : document.getTerms()) {
this.dictionary.computeIfAbsent(word, k -> new Vector<Tuple>())
.add(new Tuple(document.getDocId(), document.getWordFrequency(word)));
}
Crawler.RUNNING_TASKS.decrementAndGet();
}
}
class Crawler {
protected BlockingQueue<HtmlDocument> finishedPages = new LinkedBlockingQueue<>();
public static final AtomicInteger RUNNING_TASKS = new AtomicInteger();
public BlockingQueue<HtmlDocument> getFinishedPages() {
return finishedPages;
}
}
public class ConcurrentHashMapExample {
private ConcurrentHashMap<Integer, String> sources = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, Vector<Tuple>> dictionary = new ConcurrentHashMap<>();
private static final ExecutorService INDEXING_SERVICE = Executors.newSingleThreadExecutor();
public void run(Crawler crawler) throws InterruptedException {
while (!crawler.getFinishedPages().isEmpty()) {
Crawler.RUNNING_TASKS.incrementAndGet();
this.INDEXING_SERVICE.submit(new IndexingTask(this.dictionary, sources,
crawler.getFinishedPages().take()));
}
//when you call ```this.INDEXING_SERVICE.shutdown()``` may 'IndexingTask' has not run yet
while (Crawler.RUNNING_TASKS.get() > 0)
Thread.sleep(3);
this.INDEXING_SERVICE.shutdown();
}
public ConcurrentHashMap<Integer, String> getSources() {
return sources;
}
public ConcurrentHashMap<String, Vector<Tuple>> getDictionary() {
return dictionary;
}
public static void main(String[] args) throws Exception {
ConcurrentHashMapExample example = new ConcurrentHashMapExample();
Crawler crawler = new Crawler();
HtmlDocument document = new HtmlDocument();
document.setDocId(1);
document.setURL("http://127.0.0.1/abc");
document.setTerms(Arrays.asList("hello", "world"));
crawler.getFinishedPages().add(document);
example.run(crawler);
System.out.println("source: " + example.getSources());
System.out.println("dictionary: " + example.getDictionary());
}
}
输出:
source: {1=http://127.0.0.1/abc}
dictionary: {world=[(1, query)], hello=[(1, query)]}
我认为,在您的业务中,您应该使用 'Producer'、'Consumer' 设计模式