使用 As Actor 的文件操作
File Operations using Akka Actor
与普通的文件操作方法相比,使用 Akka Actor 有什么优势?。我试图计算分析日志文件所花费的时间。操作是找出登录次数超过50次的IP地址并显示出来。与 Akka Actor 模型相比,普通文件操作更快。为什么会这样?
使用普通文件操作
public static void main(String[] args) {
// TODO Auto-generated method stub
//long startTime = System.currentTimeMillis();
File file = new File("log.txt");
Map<String, Long> ipMap = new HashMap<>();
try {
FileReader fr = new FileReader(file);
BufferedReader br = new BufferedReader(fr);
String line = br.readLine();
while(line!=null) {
int idx = line.indexOf('-');
String ipAddress = line.substring(0, idx).trim();
long count = ipMap.getOrDefault(ipAddress, 0L);
ipMap.put(ipAddress, ++count);
line = br.readLine();
}
System.out.println("================================");
System.out.println("||\tCount\t||\t\tIP");
System.out.println("================================");
fr.close();
br.close();
Map<String, Long> result = new HashMap<>();
// Sort by value and put it into the "result" map
ipMap.entrySet().stream()
.sorted(Map.Entry.<String, Long>comparingByValue().reversed())
.forEachOrdered(x -> result.put(x.getKey(), x.getValue()));
// Print only if count > 50
result.entrySet().stream().filter(entry -> entry.getValue() > 50).forEach(entry ->
System.out.println("||\t" + entry.getValue() + " \t||\t" + entry.getKey())
);
// long endTime = System.currentTimeMillis();
// System.out.println("Time: "+(endTime-startTime));
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
Using Actors:
1. The Main Class
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
// Create actorSystem
ActorSystem akkaSystem = ActorSystem.create("akkaSystem");
// Create first actor based on the specified class
ActorRef coordinator = akkaSystem.actorOf(Props.create(FileAnalysisActor.class));
// Create a message including the file path
FileAnalysisMessage msg = new FileAnalysisMessage("log.txt");
// Send a message to start processing the file. This is a synchronous call using 'ask' with a timeout.
Timeout timeout = new Timeout(6, TimeUnit.SECONDS);
Future<Object> future = Patterns.ask(coordinator, msg, timeout);
// Process the results
final ExecutionContext ec = akkaSystem.dispatcher();
future.onSuccess(new OnSuccess<Object>() {
@Override
public void onSuccess(Object message) throws Throwable {
if (message instanceof FileProcessedMessage) {
printResults((FileProcessedMessage) message);
// Stop the actor system
akkaSystem.shutdown();
}
}
private void printResults(FileProcessedMessage message) {
System.out.println("================================");
System.out.println("||\tCount\t||\t\tIP");
System.out.println("================================");
Map<String, Long> result = new LinkedHashMap<>();
// Sort by value and put it into the "result" map
message.getData().entrySet().stream()
.sorted(Map.Entry.<String, Long>comparingByValue().reversed())
.forEachOrdered(x -> result.put(x.getKey(), x.getValue()));
// Print only if count > 50
result.entrySet().stream().filter(entry -> entry.getValue() > 50).forEach(entry ->
System.out.println("||\t" + entry.getValue() + " \t||\t" + entry.getKey())
);
long endTime = System.currentTimeMillis();
System.out.println("Total time: "+(endTime - startTime));
}
}, ec);
}
2.File 分析器 Class
public class FileAnalysisActor extends UntypedActor {
private Map<String, Long> ipMap = new HashMap<>();
private long fileLineCount;
private long processedCount;
private ActorRef analyticsSender = null;
@Override
public void onReceive(Object message) throws Exception {
/*
This actor can receive two different messages, FileAnalysisMessage or LineProcessingResult, any
other type will be discarded using the unhandled method
*/
//System.out.println(Thread.currentThread().getName());
if (message instanceof FileAnalysisMessage) {
List<String> lines = FileUtils.readLines(new File(
((FileAnalysisMessage) message).getFileName()));
fileLineCount = lines.size();
processedCount = 0;
// stores a reference to the original sender to send back the results later on
analyticsSender = this.getSender();
for (String line : lines) {
// creates a new actor per each line of the log file
Props props = Props.create(LogLineProcessor.class);
ActorRef lineProcessorActor = this.getContext().actorOf(props);
// sends a message to the new actor with the line payload
lineProcessorActor.tell(new LogLineMessage(line), this.getSelf());
}
} else if (message instanceof LineProcessingResult) {
// a result message is received after a LogLineProcessor actor has finished processing a line
String ip = ((LineProcessingResult) message).getIpAddress();
// increment ip counter
Long count = ipMap.getOrDefault(ip, 0L);
ipMap.put(ip, ++count);
// if the file has been processed entirely, send a termination message to the main actor
processedCount++;
if (fileLineCount == processedCount) {
// send done message
analyticsSender.tell(new FileProcessedMessage(ipMap), ActorRef.noSender());
}
} else {
// Ignore message
this.unhandled(message);
}
}
}
3.Logline 处理器 Class
public class LogLineProcessor extends UntypedActor {
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof LogLineMessage) {
// What data each actor process?
//System.out.println("Line: " + ((LogLineMessage) message).getData());
// Uncomment this line to see the thread number and the actor name relationship
//System.out.println("Thread ["+Thread.currentThread().getId()+"] handling ["+ getSelf().toString()+"]");
// get the message payload, this will be just one line from the log file
String messageData = ((LogLineMessage) message).getData();
int idx = messageData.indexOf('-');
if (idx != -1) {
// get the ip address
String ipAddress = messageData.substring(0, idx).trim();
// tell the sender that we got a result using a new type of message
this.getSender().tell(new LineProcessingResult(ipAddress), this.getSelf());
}
} else {
// ignore any other message type
this.unhandled(message);
}
}
}
留言Classes
文件分析消息
public class FileAnalysisMessage {
private String fileName;
public FileAnalysisMessage(String file) {
this.fileName = file;
}
public String getFileName() {
return fileName;
}
}
2.File 已处理邮件
public class FileProcessedMessage {
private Map<String, Long> data;
public FileProcessedMessage(Map<String, Long> data) {
this.data = data;
}
public Map<String, Long> getData() {
return data;
}
}
线路处理结果
public class LineProcessingResult {
private String ipAddress;
public LineProcessingResult(String ipAddress) {
this.ipAddress = ipAddress;
}
public String getIpAddress() {
return ipAddress;
}
}
4.Logline留言
public class LogLineMessage {
private String data;
public LogLineMessage(String data) {
this.data = data;
}
public String getData() {
return data;
}
}
我正在为文件中的每一行创建一个演员。
对于所有并发框架,部署的并发量与每个并发单元所涉及的复杂性之间始终存在 trade-off。阿卡也不例外。
在您的 non-akka 方法中,每行的步骤序列相对简单:
- 从文件中读取一行
- 用'-'分割行
- 将 ip 地址提交到哈希图中并增加计数
相比之下,你的 akka 方法每一行都要复杂得多:
- 创建一个 Actor
- 创建
LogLineMessage
消息
- 向演员发送消息
- 用'-'分割行
- 创建
LineProcessingResult
消息
- 将消息发送回协调参与者
- 将 ip 地址提交到哈希图中并增加计数
如果我们天真地假设上述每个步骤花费相同的时间,那么您需要 2 个带 akka 的线程才能 运行 与不带 akka 的 1 个线程相同的速度。
让每个并发单元做更多的工作
不是每 1 行 1 Actor
,而是让每个 actor 将 N 行处理成它自己的 sub-hashmap(例如,每个 Actor 处理 1000 行):
public class LogLineMessage {
private String[] data;
public LogLineMessage(String[] data) {
this.data = data;
}
public String[] getData() {
return data;
}
}
那么 Actor 就不会发回像 IP 地址这样简单的东西了。相反,它将为其行子集发送计数哈希:
public class LineProcessingResult {
private HashMap<String, Long> ipAddressCount;
public LineProcessingResult(HashMap<String, Long> count) {
this.ipAddressCount = Count;
}
public HashMap<String, Long> getIpAddress() {
return ipAddressCount;
}
}
协调 Actor 可以负责组合所有各种 sub-counts:
//inside of FileAnalysisActor
else if (message instanceof LineProcessingResult) {
HashMap<String,Long> localCount = ((LineProcessingResult) message).getIpAddressCount();
localCount.foreach((ipAddress, count) -> {
ipMap.put(ipAddress, ipMap.getOrDefault(ipAddress, 0L) + count);
})
然后您可以改变 N 以查看您的特定系统在何处获得最佳性能。
不要将整个文件读入内存
您的并发解决方案的另一个缺点是它首先将整个文件读入内存。这对 JVM 来说是不必要的和繁重的。
相反,一次读取文件 N 行。一旦你在内存中有了这些行,就会像前面提到的那样从 Actor 中产生。
FileReader fr = new FileReader(file);
BufferedReader br = new BufferedReader(fr);
String[] lineBuffer;
int bufferCount = 0;
int N = 1000;
String line = br.readLine();
while(line!=null) {
if(0 == bufferCount)
lineBuffer = new String[N];
else if(N == bufferCount) {
Props props = Props.create(LogLineProcessor.class);
ActorRef lineProcessorActor = this.getContext().actorOf(props);
lineProcessorActor.tell(new LogLineMessage(lineBuffer),
this.getSelf());
bufferCount = 0;
continue;
}
lineBuffer[bufferCount] = line;
br.readLine();
bufferCount++;
}
//handle the final buffer
if(bufferCount > 0) {
Props props = Props.create(LogLineProcessor.class);
ActorRef lineProcessorActor = this.getContext().actorOf(props);
lineProcessorActor.tell(new LogLineMessage(lineBuffer),
this.getSelf());
}
这将允许文件 IO、行处理和 sub-map 并行合并到所有 运行。
与普通的文件操作方法相比,使用 Akka Actor 有什么优势?。我试图计算分析日志文件所花费的时间。操作是找出登录次数超过50次的IP地址并显示出来。与 Akka Actor 模型相比,普通文件操作更快。为什么会这样?
使用普通文件操作
public static void main(String[] args) {
// TODO Auto-generated method stub
//long startTime = System.currentTimeMillis();
File file = new File("log.txt");
Map<String, Long> ipMap = new HashMap<>();
try {
FileReader fr = new FileReader(file);
BufferedReader br = new BufferedReader(fr);
String line = br.readLine();
while(line!=null) {
int idx = line.indexOf('-');
String ipAddress = line.substring(0, idx).trim();
long count = ipMap.getOrDefault(ipAddress, 0L);
ipMap.put(ipAddress, ++count);
line = br.readLine();
}
System.out.println("================================");
System.out.println("||\tCount\t||\t\tIP");
System.out.println("================================");
fr.close();
br.close();
Map<String, Long> result = new HashMap<>();
// Sort by value and put it into the "result" map
ipMap.entrySet().stream()
.sorted(Map.Entry.<String, Long>comparingByValue().reversed())
.forEachOrdered(x -> result.put(x.getKey(), x.getValue()));
// Print only if count > 50
result.entrySet().stream().filter(entry -> entry.getValue() > 50).forEach(entry ->
System.out.println("||\t" + entry.getValue() + " \t||\t" + entry.getKey())
);
// long endTime = System.currentTimeMillis();
// System.out.println("Time: "+(endTime-startTime));
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
Using Actors:
1. The Main Class
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
// Create actorSystem
ActorSystem akkaSystem = ActorSystem.create("akkaSystem");
// Create first actor based on the specified class
ActorRef coordinator = akkaSystem.actorOf(Props.create(FileAnalysisActor.class));
// Create a message including the file path
FileAnalysisMessage msg = new FileAnalysisMessage("log.txt");
// Send a message to start processing the file. This is a synchronous call using 'ask' with a timeout.
Timeout timeout = new Timeout(6, TimeUnit.SECONDS);
Future<Object> future = Patterns.ask(coordinator, msg, timeout);
// Process the results
final ExecutionContext ec = akkaSystem.dispatcher();
future.onSuccess(new OnSuccess<Object>() {
@Override
public void onSuccess(Object message) throws Throwable {
if (message instanceof FileProcessedMessage) {
printResults((FileProcessedMessage) message);
// Stop the actor system
akkaSystem.shutdown();
}
}
private void printResults(FileProcessedMessage message) {
System.out.println("================================");
System.out.println("||\tCount\t||\t\tIP");
System.out.println("================================");
Map<String, Long> result = new LinkedHashMap<>();
// Sort by value and put it into the "result" map
message.getData().entrySet().stream()
.sorted(Map.Entry.<String, Long>comparingByValue().reversed())
.forEachOrdered(x -> result.put(x.getKey(), x.getValue()));
// Print only if count > 50
result.entrySet().stream().filter(entry -> entry.getValue() > 50).forEach(entry ->
System.out.println("||\t" + entry.getValue() + " \t||\t" + entry.getKey())
);
long endTime = System.currentTimeMillis();
System.out.println("Total time: "+(endTime - startTime));
}
}, ec);
}
2.File 分析器 Class
public class FileAnalysisActor extends UntypedActor {
private Map<String, Long> ipMap = new HashMap<>();
private long fileLineCount;
private long processedCount;
private ActorRef analyticsSender = null;
@Override
public void onReceive(Object message) throws Exception {
/*
This actor can receive two different messages, FileAnalysisMessage or LineProcessingResult, any
other type will be discarded using the unhandled method
*/
//System.out.println(Thread.currentThread().getName());
if (message instanceof FileAnalysisMessage) {
List<String> lines = FileUtils.readLines(new File(
((FileAnalysisMessage) message).getFileName()));
fileLineCount = lines.size();
processedCount = 0;
// stores a reference to the original sender to send back the results later on
analyticsSender = this.getSender();
for (String line : lines) {
// creates a new actor per each line of the log file
Props props = Props.create(LogLineProcessor.class);
ActorRef lineProcessorActor = this.getContext().actorOf(props);
// sends a message to the new actor with the line payload
lineProcessorActor.tell(new LogLineMessage(line), this.getSelf());
}
} else if (message instanceof LineProcessingResult) {
// a result message is received after a LogLineProcessor actor has finished processing a line
String ip = ((LineProcessingResult) message).getIpAddress();
// increment ip counter
Long count = ipMap.getOrDefault(ip, 0L);
ipMap.put(ip, ++count);
// if the file has been processed entirely, send a termination message to the main actor
processedCount++;
if (fileLineCount == processedCount) {
// send done message
analyticsSender.tell(new FileProcessedMessage(ipMap), ActorRef.noSender());
}
} else {
// Ignore message
this.unhandled(message);
}
}
}
3.Logline 处理器 Class
public class LogLineProcessor extends UntypedActor {
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof LogLineMessage) {
// What data each actor process?
//System.out.println("Line: " + ((LogLineMessage) message).getData());
// Uncomment this line to see the thread number and the actor name relationship
//System.out.println("Thread ["+Thread.currentThread().getId()+"] handling ["+ getSelf().toString()+"]");
// get the message payload, this will be just one line from the log file
String messageData = ((LogLineMessage) message).getData();
int idx = messageData.indexOf('-');
if (idx != -1) {
// get the ip address
String ipAddress = messageData.substring(0, idx).trim();
// tell the sender that we got a result using a new type of message
this.getSender().tell(new LineProcessingResult(ipAddress), this.getSelf());
}
} else {
// ignore any other message type
this.unhandled(message);
}
}
}
留言Classes
文件分析消息
public class FileAnalysisMessage {
private String fileName; public FileAnalysisMessage(String file) { this.fileName = file; } public String getFileName() { return fileName; }
}
2.File 已处理邮件
public class FileProcessedMessage {
private Map<String, Long> data;
public FileProcessedMessage(Map<String, Long> data) {
this.data = data;
}
public Map<String, Long> getData() {
return data;
}
}
线路处理结果
public class LineProcessingResult {
private String ipAddress; public LineProcessingResult(String ipAddress) { this.ipAddress = ipAddress; } public String getIpAddress() { return ipAddress; }
}
4.Logline留言
public class LogLineMessage {
private String data;
public LogLineMessage(String data) {
this.data = data;
}
public String getData() {
return data;
}
}
我正在为文件中的每一行创建一个演员。
对于所有并发框架,部署的并发量与每个并发单元所涉及的复杂性之间始终存在 trade-off。阿卡也不例外。
在您的 non-akka 方法中,每行的步骤序列相对简单:
- 从文件中读取一行
- 用'-'分割行
- 将 ip 地址提交到哈希图中并增加计数
相比之下,你的 akka 方法每一行都要复杂得多:
- 创建一个 Actor
- 创建
LogLineMessage
消息 - 向演员发送消息
- 用'-'分割行
- 创建
LineProcessingResult
消息 - 将消息发送回协调参与者
- 将 ip 地址提交到哈希图中并增加计数
如果我们天真地假设上述每个步骤花费相同的时间,那么您需要 2 个带 akka 的线程才能 运行 与不带 akka 的 1 个线程相同的速度。
让每个并发单元做更多的工作
不是每 1 行 1 Actor
,而是让每个 actor 将 N 行处理成它自己的 sub-hashmap(例如,每个 Actor 处理 1000 行):
public class LogLineMessage {
private String[] data;
public LogLineMessage(String[] data) {
this.data = data;
}
public String[] getData() {
return data;
}
}
那么 Actor 就不会发回像 IP 地址这样简单的东西了。相反,它将为其行子集发送计数哈希:
public class LineProcessingResult {
private HashMap<String, Long> ipAddressCount;
public LineProcessingResult(HashMap<String, Long> count) {
this.ipAddressCount = Count;
}
public HashMap<String, Long> getIpAddress() {
return ipAddressCount;
}
}
协调 Actor 可以负责组合所有各种 sub-counts:
//inside of FileAnalysisActor
else if (message instanceof LineProcessingResult) {
HashMap<String,Long> localCount = ((LineProcessingResult) message).getIpAddressCount();
localCount.foreach((ipAddress, count) -> {
ipMap.put(ipAddress, ipMap.getOrDefault(ipAddress, 0L) + count);
})
然后您可以改变 N 以查看您的特定系统在何处获得最佳性能。
不要将整个文件读入内存
您的并发解决方案的另一个缺点是它首先将整个文件读入内存。这对 JVM 来说是不必要的和繁重的。
相反,一次读取文件 N 行。一旦你在内存中有了这些行,就会像前面提到的那样从 Actor 中产生。
FileReader fr = new FileReader(file);
BufferedReader br = new BufferedReader(fr);
String[] lineBuffer;
int bufferCount = 0;
int N = 1000;
String line = br.readLine();
while(line!=null) {
if(0 == bufferCount)
lineBuffer = new String[N];
else if(N == bufferCount) {
Props props = Props.create(LogLineProcessor.class);
ActorRef lineProcessorActor = this.getContext().actorOf(props);
lineProcessorActor.tell(new LogLineMessage(lineBuffer),
this.getSelf());
bufferCount = 0;
continue;
}
lineBuffer[bufferCount] = line;
br.readLine();
bufferCount++;
}
//handle the final buffer
if(bufferCount > 0) {
Props props = Props.create(LogLineProcessor.class);
ActorRef lineProcessorActor = this.getContext().actorOf(props);
lineProcessorActor.tell(new LogLineMessage(lineBuffer),
this.getSelf());
}
这将允许文件 IO、行处理和 sub-map 并行合并到所有 运行。