使用 Streams 按特定字符限制对文件中的行进行分组
Group lines from a file by a certain char limit using Streams
我是 Java 8 Stream API 的新手,在以下情况下使用它时遇到了问题:
我必须逐行读取文件并以文件大小最接近特定字符限制的方式对行进行分组,然后将其发布到 Kafka。
public void publishStringToKafka(File outputFile) {
try {
Files.lines(outputFile.toPath())
.forEach(s -> kafkaProducer.publishMessageOnTopic(s, KAFKA_INGESTION_TOPIC));
} catch (IOException e) {
LOG.error("Could not read buffered file to send message on kafka.", e);
} finally {
try {
Files.deleteIfExists(outputFile.toPath());
} catch (IOException e) {
LOG.error("Problem in deleting the buffered file {}.", outputFile.getName(), e);
}
}
}
现在我完全可以使用传统或声明式的方式来执行此操作,即逐行读取文件,使用循环将它们组合起来,并在大小最接近 1024 个字符时继续在 kafka 上发布消息。
但我想为此使用流。
注意:这段代码我还面临另一个问题,Files.deleteIfExists(outputFile.toPath());
命令在执行后不会删除文件,也不会发生异常。而如果我使用声明式样式,则文件将被成功删除。
请帮忙。
Collectors.groupingBy()
在这种情况下会很有用。
Map<T, List<String>> result = Files.lines(outputFile.toPath())
.collect(Collectors.groupingBy(Your::classifier, Collectors.toList()))
因此,您得到 Map<T,List<String>>
。 T
是 Your::classifier 返回的类型。现在您已经将所有内容分组并可以继续 for-each。
现在您可以提取一个条目集,对其进行排序,对其进行平面映射,然后发布到 Kafka。 flatMap
是必要的,因为如果你不展平你的结构,你最终会迭代 Stream<List<>>
。这不一定是坏事,但我认为这不是理想的情况。
collect.entrySet().stream()
.sorted(Comparator.comparing(Map.Entry::getKey))
.flatMap(e -> e.getValue().stream())
.forEach(s -> kafkaProducer.publishMessageOnTopic(s, KAFKA_INGESTION_TOPIC));
唯一棘手的部分是适当地实现分类器方法,但从这个问题我了解到你知道如何做到这一点。
问题陈述你想要做的是将流中的所有字符串按顺序组合到尽可能接近最大字符数并创建一个新的字符串列表.然后可以使用这个新创建的列表流式传输到 Kafka。这不是一个容易解决的问题,因为你必须处理状态。
解决方案
使用Collector
来累积值
List<String> result = someStrings.stream()
.collect(ArrayList::new, (list, string) -> accumulate(list, string), List::addAll);
accumulate
方法包含最大字符逻辑:
private void accumulate(ArrayList<String> list, String string) {
if (list.isEmpty() || list.get(list.size() -1).length() + string.length() > MAXIMUM_CHARACTERS){
list.add(string);
} else {
list.set(list.size()-1, list.get(list.size()-1) + string);
}
}
如果您输入列表 [as, 1234, 213, bd, de] 并将 MAXIMUM_CHARACTERS 设置为 5,它将 return 所需的输出 [as, 1234, 213bd, de ].
我是 Java 8 Stream API 的新手,在以下情况下使用它时遇到了问题:
我必须逐行读取文件并以文件大小最接近特定字符限制的方式对行进行分组,然后将其发布到 Kafka。
public void publishStringToKafka(File outputFile) {
try {
Files.lines(outputFile.toPath())
.forEach(s -> kafkaProducer.publishMessageOnTopic(s, KAFKA_INGESTION_TOPIC));
} catch (IOException e) {
LOG.error("Could not read buffered file to send message on kafka.", e);
} finally {
try {
Files.deleteIfExists(outputFile.toPath());
} catch (IOException e) {
LOG.error("Problem in deleting the buffered file {}.", outputFile.getName(), e);
}
}
}
现在我完全可以使用传统或声明式的方式来执行此操作,即逐行读取文件,使用循环将它们组合起来,并在大小最接近 1024 个字符时继续在 kafka 上发布消息。 但我想为此使用流。
注意:这段代码我还面临另一个问题,Files.deleteIfExists(outputFile.toPath());
命令在执行后不会删除文件,也不会发生异常。而如果我使用声明式样式,则文件将被成功删除。
请帮忙。
Collectors.groupingBy()
在这种情况下会很有用。
Map<T, List<String>> result = Files.lines(outputFile.toPath())
.collect(Collectors.groupingBy(Your::classifier, Collectors.toList()))
因此,您得到 Map<T,List<String>>
。 T
是 Your::classifier 返回的类型。现在您已经将所有内容分组并可以继续 for-each。
现在您可以提取一个条目集,对其进行排序,对其进行平面映射,然后发布到 Kafka。 flatMap
是必要的,因为如果你不展平你的结构,你最终会迭代 Stream<List<>>
。这不一定是坏事,但我认为这不是理想的情况。
collect.entrySet().stream()
.sorted(Comparator.comparing(Map.Entry::getKey))
.flatMap(e -> e.getValue().stream())
.forEach(s -> kafkaProducer.publishMessageOnTopic(s, KAFKA_INGESTION_TOPIC));
唯一棘手的部分是适当地实现分类器方法,但从这个问题我了解到你知道如何做到这一点。
问题陈述你想要做的是将流中的所有字符串按顺序组合到尽可能接近最大字符数并创建一个新的字符串列表.然后可以使用这个新创建的列表流式传输到 Kafka。这不是一个容易解决的问题,因为你必须处理状态。
解决方案
使用Collector
来累积值
List<String> result = someStrings.stream()
.collect(ArrayList::new, (list, string) -> accumulate(list, string), List::addAll);
accumulate
方法包含最大字符逻辑:
private void accumulate(ArrayList<String> list, String string) {
if (list.isEmpty() || list.get(list.size() -1).length() + string.length() > MAXIMUM_CHARACTERS){
list.add(string);
} else {
list.set(list.size()-1, list.get(list.size()-1) + string);
}
}
如果您输入列表 [as, 1234, 213, bd, de] 并将 MAXIMUM_CHARACTERS 设置为 5,它将 return 所需的输出 [as, 1234, 213bd, de ].