Akka:如何在一个图形阶段提取一个值并在下一个阶段使用它
Akka: How to extract a value in one graph stage and use it in the next
我正在使用 Alpakka 和 Akka 处理 CSV 文件。由于我有一堆必须添加到同一流的 CSV 文件,我想添加一个包含文件名或请求信息的字段。目前我有这样的东西:
val source = FileIO.fromPath(Paths.get("10002070.csv"))
.via(CsvParsing.lineScanner())
它流式传输 ByteString(字段)的列表(行)序列。目标类似于:
val filename = "10002070.csv"
val source = FileIO.fromPath(Path.get(filename))
.via(CsvParsing.lineScanner())
.via(AddCSVFieldHere(filename))
创建类似于以下的结构:
10002070.csv,max,estimated,12,1,0
其中文件名是原始来源中不存在的字段。
我认为在流中注入值看起来不太漂亮,而且最终我想在读取目录的流阶段确定传递给解析的文件名。
通过流阶段传递值以供以后重用的 correct/canonical 方法是什么?
您可以使用 map
转换流以将文件名添加到每个 List[ByteString]
:
val fileName = "10002070.csv"
val source =
FileIO.fromPath(Path.get(fileName))
.via(CsvParsing.lineScanner())
.map(List(ByteString(fileName)) ++ _)
例如:
Source.single(ByteString("""header1,header2,header3
|1,2,3
|4,5,6""".stripMargin))
.via(CsvParsing.lineScanner())
.map(List(ByteString("myfile.csv")) ++ _)
.runForeach(row => println(row.map(_.utf8String)))
// The above code prints the following:
// List(myfile.csv, header1, header2, header3)
// List(myfile.csv, 1, 2, 3)
// List(myfile.csv, 4, 5, 6)
同样的方法适用于您事先不知道文件名的更一般情况。如果你想读取目录中的所有文件(假设所有这些文件都是 csv 文件),将文件连接成一个流,并在每个流元素中保留文件名,那么你可以使用 Alpakka 的 Directory
以下列方式实用:
val source =
Directory.ls(Paths.get("/my/dir")) // Source[Path, NotUsed]
.flatMapConcat { path =>
FileIO.fromPath(path)
.via(CsvParsing.lineScanner())
.map(List(ByteString(path.getFileName.toString)) ++ _)
}
我正在使用 Alpakka 和 Akka 处理 CSV 文件。由于我有一堆必须添加到同一流的 CSV 文件,我想添加一个包含文件名或请求信息的字段。目前我有这样的东西:
val source = FileIO.fromPath(Paths.get("10002070.csv"))
.via(CsvParsing.lineScanner())
它流式传输 ByteString(字段)的列表(行)序列。目标类似于:
val filename = "10002070.csv"
val source = FileIO.fromPath(Path.get(filename))
.via(CsvParsing.lineScanner())
.via(AddCSVFieldHere(filename))
创建类似于以下的结构:
10002070.csv,max,estimated,12,1,0
其中文件名是原始来源中不存在的字段。
我认为在流中注入值看起来不太漂亮,而且最终我想在读取目录的流阶段确定传递给解析的文件名。
通过流阶段传递值以供以后重用的 correct/canonical 方法是什么?
您可以使用 map
转换流以将文件名添加到每个 List[ByteString]
:
val fileName = "10002070.csv"
val source =
FileIO.fromPath(Path.get(fileName))
.via(CsvParsing.lineScanner())
.map(List(ByteString(fileName)) ++ _)
例如:
Source.single(ByteString("""header1,header2,header3
|1,2,3
|4,5,6""".stripMargin))
.via(CsvParsing.lineScanner())
.map(List(ByteString("myfile.csv")) ++ _)
.runForeach(row => println(row.map(_.utf8String)))
// The above code prints the following:
// List(myfile.csv, header1, header2, header3)
// List(myfile.csv, 1, 2, 3)
// List(myfile.csv, 4, 5, 6)
同样的方法适用于您事先不知道文件名的更一般情况。如果你想读取目录中的所有文件(假设所有这些文件都是 csv 文件),将文件连接成一个流,并在每个流元素中保留文件名,那么你可以使用 Alpakka 的 Directory
以下列方式实用:
val source =
Directory.ls(Paths.get("/my/dir")) // Source[Path, NotUsed]
.flatMapConcat { path =>
FileIO.fromPath(path)
.via(CsvParsing.lineScanner())
.map(List(ByteString(path.getFileName.toString)) ++ _)
}