Apache Beam 获取源文件名
Apache Beam get source File Name
编辑:已解决!
我有多个来自多种语言的文本文件。我想使用 Apache Beam 为每一行添加一个语言标签。
示例:
文件text_en:
This is a sentence.
文件 text_de: Dies ist ein Satz.
我要的是这个:
zh: 这是一个句子。
de: Dies ist ein Satz。
我试过的:
我最初尝试只使用一个 TextIO.Read.From(dataSetDirectory+"/*")
并寻找一个看起来像 .getSource()
的选项。然而,这似乎并不存在。
接下来我试着像这样一个一个地阅读每个文件:
File[] files = new File(datasetDirectory).listFiles();
PCollectionList<String> dataSet=null;
for (File f: files) {
String language = f.getName();
logger.debug(language);
PCollection<String> newPCollection = p.apply(
TextIO.Read.from(f.getAbsolutePath()))
.apply(ParDo.of(new LanguageTagAdder(language)));
if (dataSet==null) {
dataSet=PCollectionList.of(newPCollection);
} else {
dataSet.and(newPCollection);
}
}
PCollection<String> completeDataset= dataSet.apply(Flatten.<String>pCollections())
以这种方式读取文件非常好,但是我的 DoFn LanguageTagAdder
仅使用第一种语言进行了初始化,因此所有文件都具有相同的添加语言。
LanguageTagAdder
看起来像这样:
public class LanguageTagAdder
extends DoFn<String,String> {
private String language;
public LanguageTagAdder(String language) {
this.language=language;
}
@ProcessElement
public void processElement(ProcessContext c) {
c.output(language+c.element());
}
}
我意识到这种行为是有意且需要的,以便可以并行化数据,但我将如何解决我的问题?有没有Beam的方法来解决?
PS:我在第二次创建 new LanguageTagAdder
时收到以下警告(使用第二语言):
DEBUG 2016-12-05 17:09:55,070 [main] de.kdld16.hpi.FusionDataset - en
DEBUG 2016-12-05 17:09:56,216 [main] de.kdld16.hpi.FusionDataset - de
WARN 2016-12-05 17:09:56,219 [main] org.apache.beam.sdk.Pipeline - Transform TextIO.Read2 does not have a stable unique name. This will prevent updating of pipelines.
编辑:
问题是行
dataSet.and(newPCollection);
需要改写为:
dataSet=dataSet.and(newPCollection);
原来,dataSet 只包含第一个文件....难怪他们都有相同的语言标签!
问题出在线路上
dataSet.and(newPCollection);
需要重写为:
dataSet=dataSet.and(newPCollection);
原来如此,dataSet 只包含第一个文件。
编辑:已解决!
我有多个来自多种语言的文本文件。我想使用 Apache Beam 为每一行添加一个语言标签。
示例:
文件text_en:
This is a sentence.
文件 text_de: Dies ist ein Satz.
我要的是这个:
zh: 这是一个句子。 de: Dies ist ein Satz。
我试过的:
我最初尝试只使用一个 TextIO.Read.From(dataSetDirectory+"/*")
并寻找一个看起来像 .getSource()
的选项。然而,这似乎并不存在。
接下来我试着像这样一个一个地阅读每个文件:
File[] files = new File(datasetDirectory).listFiles();
PCollectionList<String> dataSet=null;
for (File f: files) {
String language = f.getName();
logger.debug(language);
PCollection<String> newPCollection = p.apply(
TextIO.Read.from(f.getAbsolutePath()))
.apply(ParDo.of(new LanguageTagAdder(language)));
if (dataSet==null) {
dataSet=PCollectionList.of(newPCollection);
} else {
dataSet.and(newPCollection);
}
}
PCollection<String> completeDataset= dataSet.apply(Flatten.<String>pCollections())
以这种方式读取文件非常好,但是我的 DoFn LanguageTagAdder
仅使用第一种语言进行了初始化,因此所有文件都具有相同的添加语言。
LanguageTagAdder
看起来像这样:
public class LanguageTagAdder
extends DoFn<String,String> {
private String language;
public LanguageTagAdder(String language) {
this.language=language;
}
@ProcessElement
public void processElement(ProcessContext c) {
c.output(language+c.element());
}
}
我意识到这种行为是有意且需要的,以便可以并行化数据,但我将如何解决我的问题?有没有Beam的方法来解决?
PS:我在第二次创建 new LanguageTagAdder
时收到以下警告(使用第二语言):
DEBUG 2016-12-05 17:09:55,070 [main] de.kdld16.hpi.FusionDataset - en
DEBUG 2016-12-05 17:09:56,216 [main] de.kdld16.hpi.FusionDataset - de
WARN 2016-12-05 17:09:56,219 [main] org.apache.beam.sdk.Pipeline - Transform TextIO.Read2 does not have a stable unique name. This will prevent updating of pipelines.
编辑: 问题是行
dataSet.and(newPCollection);
需要改写为:
dataSet=dataSet.and(newPCollection);
原来,dataSet 只包含第一个文件....难怪他们都有相同的语言标签!
问题出在线路上
dataSet.and(newPCollection);
需要重写为:
dataSet=dataSet.and(newPCollection);
原来如此,dataSet 只包含第一个文件。