使用 PriorityBlockingQueue 提供记录的对象进行处理
Using a PriorityBlockingQueue to feed in logged objects for processing
我有一个应用程序从多个序列化对象日志中读取对象并将它们交给另一个 class 进行处理。我的问题集中在如何高效、干净地读取对象并将它们发送出去。
代码是从旧版本的应用程序中提取的,但我们最终保持原样。直到上周才真正使用它,但我最近开始更仔细地查看代码以尝试改进它。
打开N个ObjectInputStream
,从每个流中读取一个对象存储到一个数组中(假设下面的inputStreams
只是一个ObjectInputStream
对象的数组,对应于每个日志文件):
for (int i = 0; i < logObjects.length; i++) {
if (inputStreams[i] == null) {
continue;
}
try {
if (logObjects[i] == null) {
logObjects[i] = (LogObject) inputStreams[i].readObject();
}
} catch (final InvalidClassException e) {
LOGGER.warn("Invalid object read from " + logFileList.get(i).getAbsolutePath(), e);
} catch (final EOFException e) {
inputStreams[i] = null;
}
}
序列化到文件的对象是LogObject
个对象。这里是 LogObject
class:
public class LogObject implements Serializable {
private static final long serialVersionUID = -5686286252863178498L;
private Object logObject;
private long logTime;
public LogObject(Object logObject) {
this.logObject = logObject;
this.logTime = System.currentTimeMillis();
}
public Object getLogObject() {
return logObject;
}
public long getLogTime() {
return logTime;
}
}
一旦对象在数组中,它就会比较日志时间并发送时间最早的对象:
// handle the LogObject with the earliest log time
minTime = Long.MAX_VALUE;
for (int i = 0; i < logObjects.length; i++) {
logObject = logObjects[i];
if (logObject == null) {
continue;
}
if (logObject.getLogTime() < minTime) {
index = i;
minTime = logObject.getLogTime();
}
}
handler.handleOutput(logObjects[index].getLogObject());
我的第一个想法是为每个读入并将对象放入 PriorityBlockingQueue
的文件创建一个线程(使用使用 LogObject
日志时间进行比较的自定义比较器)。然后另一个线程可以取出值并将它们发送出去。
这里唯一的问题是一个线程可以将一个对象放入队列并在另一个线程可以将对象放入队列之前将其取下,这可能有更早的时间。这就是为什么在检查日志时间之前首先将对象读入并存储在数组中的原因。
这个约束是否禁止我实现多线程设计?或者有什么方法可以调整我的解决方案以使其更有效率?
我同意你的看法。扔掉它并使用 PriorityBlockingQueue.
The only issue here is that if Thread 1 has read an object from File 1 in and put it in the queue (and the object File 2 was going to read in has an earlier log time), the reading Thread could take it and send it off, resulting in a log object with a later time being sent first
这与平衡合并 (Knuth ACP vol 3) 的合并阶段完全一样。您必须从与上一个最低元素相同的文件中读取下一个输入。
Does this constraint prohibit me from implementing a multi-threaded design?
这不是约束。虚无缥缈。
Or is there a way I can tweak my solution to make it more efficient?
优先队列已经很高效了。无论如何,您当然应该首先担心正确性。然后添加缓冲 ;-) 将 ObjectInputStreams
包裹在 BufferedInputStreams
周围,并确保输出堆栈中有一个 BufferedOutputStream
。
据我了解您的问题,您需要严格按顺序处理 LogObjects
。在那种情况下,您的代码的初始部分是完全正确的。这段代码所做的是 merge sort 几个输入流。您需要为每个流读取一个对象(这就是需要临时数组的原因)然后采取适当的 (minimum/maximum) LogObject
并处理处理器。
根据您的上下文,您或许可以在多个线程中进行处理。您唯一需要更改的是将 LogObjects
放在 ArrayBlockingQueue
中,处理器可能会在多个独立线程上运行。另一种选择是发送 LogObjects
以在 ThreadPoolExecutor
中处理。最后一个选项更简单直接。
但要注意途中的几个陷阱:
- 要使此算法正常工作,必须已经对各个流进行排序。否则你的程序就坏了;
- 当你并行处理消息时,处理顺序严格来说是没有定义的。所以提出的算法只保证消息处理的开始顺序(dispatch order)。这可能不是你想要的。
所以现在你应该面临几个问题:
- 真的需要处理顺序吗?
- 如果需要,是需要全局顺序(针对所有消息)还是本地顺序(针对独立的消息组)?
这些问题的答案将对您进行并行处理的能力产生重大影响。
如果第一个问题的答案是是,遗憾的是,并行处理不是一个选项。
我有一个应用程序从多个序列化对象日志中读取对象并将它们交给另一个 class 进行处理。我的问题集中在如何高效、干净地读取对象并将它们发送出去。
代码是从旧版本的应用程序中提取的,但我们最终保持原样。直到上周才真正使用它,但我最近开始更仔细地查看代码以尝试改进它。
打开N个ObjectInputStream
,从每个流中读取一个对象存储到一个数组中(假设下面的inputStreams
只是一个ObjectInputStream
对象的数组,对应于每个日志文件):
for (int i = 0; i < logObjects.length; i++) {
if (inputStreams[i] == null) {
continue;
}
try {
if (logObjects[i] == null) {
logObjects[i] = (LogObject) inputStreams[i].readObject();
}
} catch (final InvalidClassException e) {
LOGGER.warn("Invalid object read from " + logFileList.get(i).getAbsolutePath(), e);
} catch (final EOFException e) {
inputStreams[i] = null;
}
}
序列化到文件的对象是LogObject
个对象。这里是 LogObject
class:
public class LogObject implements Serializable {
private static final long serialVersionUID = -5686286252863178498L;
private Object logObject;
private long logTime;
public LogObject(Object logObject) {
this.logObject = logObject;
this.logTime = System.currentTimeMillis();
}
public Object getLogObject() {
return logObject;
}
public long getLogTime() {
return logTime;
}
}
一旦对象在数组中,它就会比较日志时间并发送时间最早的对象:
// handle the LogObject with the earliest log time
minTime = Long.MAX_VALUE;
for (int i = 0; i < logObjects.length; i++) {
logObject = logObjects[i];
if (logObject == null) {
continue;
}
if (logObject.getLogTime() < minTime) {
index = i;
minTime = logObject.getLogTime();
}
}
handler.handleOutput(logObjects[index].getLogObject());
我的第一个想法是为每个读入并将对象放入 PriorityBlockingQueue
的文件创建一个线程(使用使用 LogObject
日志时间进行比较的自定义比较器)。然后另一个线程可以取出值并将它们发送出去。
这里唯一的问题是一个线程可以将一个对象放入队列并在另一个线程可以将对象放入队列之前将其取下,这可能有更早的时间。这就是为什么在检查日志时间之前首先将对象读入并存储在数组中的原因。
这个约束是否禁止我实现多线程设计?或者有什么方法可以调整我的解决方案以使其更有效率?
我同意你的看法。扔掉它并使用 PriorityBlockingQueue.
The only issue here is that if Thread 1 has read an object from File 1 in and put it in the queue (and the object File 2 was going to read in has an earlier log time), the reading Thread could take it and send it off, resulting in a log object with a later time being sent first
这与平衡合并 (Knuth ACP vol 3) 的合并阶段完全一样。您必须从与上一个最低元素相同的文件中读取下一个输入。
Does this constraint prohibit me from implementing a multi-threaded design?
这不是约束。虚无缥缈。
Or is there a way I can tweak my solution to make it more efficient?
优先队列已经很高效了。无论如何,您当然应该首先担心正确性。然后添加缓冲 ;-) 将 ObjectInputStreams
包裹在 BufferedInputStreams
周围,并确保输出堆栈中有一个 BufferedOutputStream
。
据我了解您的问题,您需要严格按顺序处理 LogObjects
。在那种情况下,您的代码的初始部分是完全正确的。这段代码所做的是 merge sort 几个输入流。您需要为每个流读取一个对象(这就是需要临时数组的原因)然后采取适当的 (minimum/maximum) LogObject
并处理处理器。
根据您的上下文,您或许可以在多个线程中进行处理。您唯一需要更改的是将 LogObjects
放在 ArrayBlockingQueue
中,处理器可能会在多个独立线程上运行。另一种选择是发送 LogObjects
以在 ThreadPoolExecutor
中处理。最后一个选项更简单直接。
但要注意途中的几个陷阱:
- 要使此算法正常工作,必须已经对各个流进行排序。否则你的程序就坏了;
- 当你并行处理消息时,处理顺序严格来说是没有定义的。所以提出的算法只保证消息处理的开始顺序(dispatch order)。这可能不是你想要的。
所以现在你应该面临几个问题:
- 真的需要处理顺序吗?
- 如果需要,是需要全局顺序(针对所有消息)还是本地顺序(针对独立的消息组)?
这些问题的答案将对您进行并行处理的能力产生重大影响。
如果第一个问题的答案是是,遗憾的是,并行处理不是一个选项。