在 ConcurrentHashMap 中创建了多个键而不是一个键
Multiple keys created instead of one key in ConcurrentHashMap
我在这里尝试使用数据包到达时间为传入的 UDP 数据包创建 10 秒的存储桶,但总是在删除后的 10 秒内创建多个密钥。
public static void main(String[] args) {
ConcurrentHashMap<Long, String> tenSecondBucket =
new ConcurrentHashMap<Long, String>();
此线程尝试连续写入哈希映射。添加新条目时,它会按键(时间戳)比较旧条目,是否超过 10 秒,如果是,则创建新条目,否则将更新它。
Thread writingThread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
Thread.sleep(1);
if(tenSecondBucket.size() > 0) {
// getting last key
long lastKey = 0;
for (long keyValue : tenSecondBucket.keySet()) {
lastKey = keyValue;
}
if(System.currentTimeMillis() - lastKey > 10000) {
tenSecondBucket.put(System.currentTimeMillis(), "secondEntry");
} else {
tenSecondBucket.put(lastKey, "updatedEntry");
}
} else {
tenSecondBucket.put(System.currentTimeMillis(), "newEntry");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
writingThread.start();
此线程删除了 10 秒前的密钥。
Thread removingThread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
Thread.sleep(4000);
if(tenSecondBucket.size() > 0) {
tenSecondBucket.keySet().stream().forEach(key -> {
if(System.currentTimeMillis() - key > 10000) {
tenSecondBucket.remove(key);
}
});
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
removingThread.start();
此线程试图读取那里发生的事情。
Thread readingThread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
Thread.sleep(4000);
if(tenSecondBucket.size() > 0) {
tenSecondBucket.keySet().stream().forEach(key -> {
System.out.println("testing key which is timestamp " + key);
});
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
readingThread.start();
}
正如史蒂夫在评论中所说,您获取最后一个密钥的方法不正确,并且会产生随机值。
您还在评论中提到,您需要它对多个编写器线程是线程安全的。
我会尝试类似下面的操作,使用共享 AtomicLong
保存 "last key",并使用 updateAndGet
:
自动更新它
AtomicLong lastKey = new AtomicLong();
Thread writingThread = new Thread(() -> {
while (true) {
try {
Thread.sleep(100);
long now = System.currentTimeMillis();
long localLastKey = lastKey.updateAndGet(oldValue -> oldValue < now - 10000 ? now : oldValue);
if (localLastKey == now) {
tenSecondBucket.put(now, "newEntry");
} else {
tenSecondBucket.put(localLastKey, "updatedEntry@" + now);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
我在这里尝试使用数据包到达时间为传入的 UDP 数据包创建 10 秒的存储桶,但总是在删除后的 10 秒内创建多个密钥。
public static void main(String[] args) {
ConcurrentHashMap<Long, String> tenSecondBucket =
new ConcurrentHashMap<Long, String>();
此线程尝试连续写入哈希映射。添加新条目时,它会按键(时间戳)比较旧条目,是否超过 10 秒,如果是,则创建新条目,否则将更新它。
Thread writingThread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
Thread.sleep(1);
if(tenSecondBucket.size() > 0) {
// getting last key
long lastKey = 0;
for (long keyValue : tenSecondBucket.keySet()) {
lastKey = keyValue;
}
if(System.currentTimeMillis() - lastKey > 10000) {
tenSecondBucket.put(System.currentTimeMillis(), "secondEntry");
} else {
tenSecondBucket.put(lastKey, "updatedEntry");
}
} else {
tenSecondBucket.put(System.currentTimeMillis(), "newEntry");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
writingThread.start();
此线程删除了 10 秒前的密钥。
Thread removingThread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
Thread.sleep(4000);
if(tenSecondBucket.size() > 0) {
tenSecondBucket.keySet().stream().forEach(key -> {
if(System.currentTimeMillis() - key > 10000) {
tenSecondBucket.remove(key);
}
});
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
removingThread.start();
此线程试图读取那里发生的事情。
Thread readingThread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
Thread.sleep(4000);
if(tenSecondBucket.size() > 0) {
tenSecondBucket.keySet().stream().forEach(key -> {
System.out.println("testing key which is timestamp " + key);
});
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
readingThread.start();
}
正如史蒂夫在评论中所说,您获取最后一个密钥的方法不正确,并且会产生随机值。
您还在评论中提到,您需要它对多个编写器线程是线程安全的。
我会尝试类似下面的操作,使用共享 AtomicLong
保存 "last key",并使用 updateAndGet
:
AtomicLong lastKey = new AtomicLong();
Thread writingThread = new Thread(() -> {
while (true) {
try {
Thread.sleep(100);
long now = System.currentTimeMillis();
long localLastKey = lastKey.updateAndGet(oldValue -> oldValue < now - 10000 ? now : oldValue);
if (localLastKey == now) {
tenSecondBucket.put(now, "newEntry");
} else {
tenSecondBucket.put(localLastKey, "updatedEntry@" + now);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});