有什么方法可以将 Path 元素流展平为带有 Akka 流的内容行流吗?
Is there any way to flatten a stream of Path elements to a stream of content lines with Akka streams?
我正在尝试创建一个处理 Path 元素的 javadsl Flow。它应该发出与 Path 元素关联的文件内容行。
换句话说,我认为我需要将 Path 元素流展平为 String 元素流。
A Flow 确实有一个 flatten method but it involves using a FlattenStrategy,我不确定如何在我的案例中使用它。
final Flow<Path, String, BoxedUnit> toFileLines = Flow
.<Path>create()
.flatten(FlattenStrategy.
非常感谢任何帮助!
编辑1:
因此,据我了解,使用 StreamReader 读取文件并在 reader 达到 '\n' 时发出新字符串可能是个好主意。
所以现在的问题是如何从一个转换方法中发出多个元素。像这样
final Flow<Path, String, BoxedUnit> toFileLines = Flow
.<Path>create()
.mapAsync(
//create streamreader
//while streamreader has.next
//read line until \n
//emit line
);
这可能吗?
我建议使用 Flow.flatMapConcat
。首先,您必须编写一个 Source 来从文件中生成行,然后将这些 Sources 连接在一起。还有一些使用 Framing 进行的 ByteString 解析,我从 docs:
import java.io.File;
import akka.stream.javadsl.FileIO;
import akka.stream.javadsl.Flow;
import akka.stream.io.Framing;
import akka.util.ByteString;
int maxLineSize = 1024;
final ByteString delim = ByteString.fromString("\r\n");
final Flow<String, String, BoxedUnit> pathsToContents =
Flow.of(String.class)
.flatMapConcat(path -> FileIO.fromFile(new File(path))
.via(Framing.delimiter(delim, maxLineSize,true))
.map(byteStr -> byteStr.utf8String()));
我正在尝试创建一个处理 Path 元素的 javadsl Flow。它应该发出与 Path 元素关联的文件内容行。 换句话说,我认为我需要将 Path 元素流展平为 String 元素流。
A Flow 确实有一个 flatten method but it involves using a FlattenStrategy,我不确定如何在我的案例中使用它。
final Flow<Path, String, BoxedUnit> toFileLines = Flow
.<Path>create()
.flatten(FlattenStrategy.
非常感谢任何帮助!
编辑1: 因此,据我了解,使用 StreamReader 读取文件并在 reader 达到 '\n' 时发出新字符串可能是个好主意。 所以现在的问题是如何从一个转换方法中发出多个元素。像这样
final Flow<Path, String, BoxedUnit> toFileLines = Flow
.<Path>create()
.mapAsync(
//create streamreader
//while streamreader has.next
//read line until \n
//emit line
);
这可能吗?
我建议使用 Flow.flatMapConcat
。首先,您必须编写一个 Source 来从文件中生成行,然后将这些 Sources 连接在一起。还有一些使用 Framing 进行的 ByteString 解析,我从 docs:
import java.io.File;
import akka.stream.javadsl.FileIO;
import akka.stream.javadsl.Flow;
import akka.stream.io.Framing;
import akka.util.ByteString;
int maxLineSize = 1024;
final ByteString delim = ByteString.fromString("\r\n");
final Flow<String, String, BoxedUnit> pathsToContents =
Flow.of(String.class)
.flatMapConcat(path -> FileIO.fromFile(new File(path))
.via(Framing.delimiter(delim, maxLineSize,true))
.map(byteStr -> byteStr.utf8String()));