SideInputs 破坏了 DataFlow 管道中的数据
SideInputs corrupt the data in DataFlow's Pipeline
我有一个数据流管道(SDK 2.1.0,Apache Beam 2.2.0),它只是从 GCS 读取 RDF(在 N-Triples 中,所以它只是文本文件),以某种方式转换它并将其写回 GCS ,但在不同的桶中。在这个管道中,我使用了三个单独的文件(每个边输入一个文件)的侧输入,并在 ParDo 中使用它们。
为了在 Java 中使用 RDF,我使用 Apache Jena,因此每个文件都被读入模型 class 的一个实例中。由于 Dataflow 没有 Coder,因此我自己开发了它(RDFModelCoder
,见下文)。它在我创建的许多其他管道中运行良好。
这个特定管道的问题是当我添加侧输入时,执行失败并出现异常,表明数据已损坏,即添加了一些垃圾。一旦我删除了侧输入,管道就成功完成了执行。
异常(从RDFModelCoder
抛出,见下文):
Caused by: org.apache.jena.atlas.RuntimeIOException: java.nio.charset.MalformedInputException: Input length = 1
at org.apache.jena.atlas.io.IO.exception(IO.java:233)
at org.apache.jena.atlas.io.CharStreamBuffered$SourceReader.fill(CharStreamBuffered.java:77)
at org.apache.jena.atlas.io.CharStreamBuffered.fillArray(CharStreamBuffered.java:154)
at org.apache.jena.atlas.io.CharStreamBuffered.advance(CharStreamBuffered.java:137)
at org.apache.jena.atlas.io.PeekReader.advanceAndSet(PeekReader.java:235)
at org.apache.jena.atlas.io.PeekReader.init(PeekReader.java:229)
at org.apache.jena.atlas.io.PeekReader.peekChar(PeekReader.java:151)
at org.apache.jena.atlas.io.PeekReader.makeUTF8(PeekReader.java:92)
at org.apache.jena.riot.tokens.TokenizerFactory.makeTokenizerUTF8(TokenizerFactory.java:48)
at org.apache.jena.riot.lang.RiotParsers.createParser(RiotParsers.java:57)
at org.apache.jena.riot.RDFParserRegistry$ReaderRIOTLang.read(RDFParserRegistry.java:198)
at org.apache.jena.riot.RDFParser.read(RDFParser.java:298)
at org.apache.jena.riot.RDFParser.parseNotUri(RDFParser.java:288)
at org.apache.jena.riot.RDFParser.parse(RDFParser.java:237)
at org.apache.jena.riot.RDFParserBuilder.parse(RDFParserBuilder.java:417)
at org.apache.jena.riot.RDFDataMgr.parseFromInputStream(RDFDataMgr.java:870)
at org.apache.jena.riot.RDFDataMgr.read(RDFDataMgr.java:268)
at org.apache.jena.riot.RDFDataMgr.read(RDFDataMgr.java:254)
at org.apache.jena.riot.adapters.RDFReaderRIOT.read(RDFReaderRIOT.java:69)
at org.apache.jena.rdf.model.impl.ModelCom.read(ModelCom.java:305)
在这里你可以看到垃圾(最后):
<http://example.com/typeofrepresentative/08> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://www.w3.org/2002/07/owl#NamedIndividual> . ������** �����I��.�������������u�������
管道:
val one = p.apply(TextIO.read().from(config.getString("source.one")))
.apply(Combine.globally(SingleValue()))
.apply(ParDo.of(ConvertToRDFModel(RDFLanguages.NTRIPLES)))
val two = p.apply(TextIO.read().from(config.getString("source.two")))
.apply(Combine.globally(SingleValue()))
.apply(ParDo.of(ConvertToRDFModel(RDFLanguages.NTRIPLES)))
val three = p.apply(TextIO.read().from(config.getString("source.three")))
.apply(Combine.globally(SingleValue()))
.apply(ParDo.of(ConvertToRDFModel(RDFLanguages.NTRIPLES)))
val sideInput = PCollectionList.of(one).and(two).and(three)
.apply(Flatten.pCollections())
.apply(View.asList())
p.apply(RDFIO.Read
.from(options.getSource())
.withSuffix(RDFLanguages.strLangNTriples))
.apply(ParDo.of(SparqlConstructETL(config, sideInput))
.withSideInputs(sideInput))
.apply(RDFIO.Write
.to(options.getDestination())
.withSuffix(RDFLanguages.NTRIPLES))
为了提供全貌,这里提供 SingleValue
和 ConvertToRDFModel
ParDos 的实现:
class SingleValue : SerializableFunction<Iterable<String>, String> {
override fun apply(input: Iterable<String>?): String {
if (input != null) {
return input.joinToString(separator = " ")
}
return ""
}
}
class ConvertToRDFModel(outputLang: Lang) : DoFn<String, Model>() {
private val lang: String = outputLang.name
@ProcessElement
fun processElement(c: ProcessContext?) {
if (c != null) {
val model = ModelFactory.createDefaultModel()
model.read(StringReader(c.element()), null, lang)
c.output(model)
}
}
}
执行RDFModelCoder
:
class RDFModelCoder(private val decodeLang: String = RDFLanguages.strLangNTriples,
private val encodeLang: String = RDFLanguages.strLangNTriples)
: AtomicCoder<Model>() {
private val LOG = LoggerFactory.getLogger(RDFModelCoder::class.java)
override fun decode(inStream: InputStream): Model {
val bytes = StreamUtils.getBytes(inStream)
val model = ModelFactory.createDefaultModel()
model.read(ByteArrayInputStream(bytes), null, decodeLang) // the exception is thrown from here
return model
}
override fun encode(value: Model, outStream: OutputStream?) {
value.write(outStream, encodeLang, null)
}
}
我多次检查了侧输入文件,它们很好,它们有 UTF-8 编码。
最有可能的错误是在 RDFModelCoder
的实现中。在实现 encode
/decode
时,必须记住所提供的 InputStream
和 OutputStream
并非由当前实例 encoded/decoded 独占。例如。在您当前 Model
的编码形式之后的 InputStream
中可能有更多数据。使用 StreamUtils.getBytes(inStream)
时,您将同时获取当前编码的 Model
和流中的任何其他数据。
通常,在编写新的 Coder
时,最好只合并现有的 Coder
而不是手动解析流:这样不容易出错。我建议转换模型 to/from byte[]
并使用 ByteArrayCoder.of()
到 encode/decode 它。
Apache Jena 提供了 Elephas IO modules which have Hadoop IO support, since Beam supports Hadoop InputFormat IO 您应该能够使用它来读取 NTriples 文件。
这可能会更有效,因为 NTriples support in Elephas 能够并行化 IO 并避免将整个模型缓存到内存中(事实上它根本不会使用 Model
):
Configuration myHadoopConfiguration = new Configuration(false);
// Set Hadoop InputFormat, key and value class in configuration
myHadoopConfiguration.setClass("mapreduce.job.inputformat.class",
NTriplesInputFormat.class, InputFormat.class);
myHadoopConfiguration.setClass("key.class", LongWritable.class, Object.class);
myHadoopConfiguration.setClass("value.class", TripleWritable.class, Object.class);
// Set any other Hadoop config you might need
// Read data only with Hadoop configuration.
p.apply("read",
HadoopInputFormatIO.<LongWritable, TripleWritable>read()
.withConfiguration(myHadoopConfiguration);
当然,这可能需要您对整个管道进行一些重构。
我有一个数据流管道(SDK 2.1.0,Apache Beam 2.2.0),它只是从 GCS 读取 RDF(在 N-Triples 中,所以它只是文本文件),以某种方式转换它并将其写回 GCS ,但在不同的桶中。在这个管道中,我使用了三个单独的文件(每个边输入一个文件)的侧输入,并在 ParDo 中使用它们。
为了在 Java 中使用 RDF,我使用 Apache Jena,因此每个文件都被读入模型 class 的一个实例中。由于 Dataflow 没有 Coder,因此我自己开发了它(RDFModelCoder
,见下文)。它在我创建的许多其他管道中运行良好。
这个特定管道的问题是当我添加侧输入时,执行失败并出现异常,表明数据已损坏,即添加了一些垃圾。一旦我删除了侧输入,管道就成功完成了执行。
异常(从RDFModelCoder
抛出,见下文):
Caused by: org.apache.jena.atlas.RuntimeIOException: java.nio.charset.MalformedInputException: Input length = 1
at org.apache.jena.atlas.io.IO.exception(IO.java:233)
at org.apache.jena.atlas.io.CharStreamBuffered$SourceReader.fill(CharStreamBuffered.java:77)
at org.apache.jena.atlas.io.CharStreamBuffered.fillArray(CharStreamBuffered.java:154)
at org.apache.jena.atlas.io.CharStreamBuffered.advance(CharStreamBuffered.java:137)
at org.apache.jena.atlas.io.PeekReader.advanceAndSet(PeekReader.java:235)
at org.apache.jena.atlas.io.PeekReader.init(PeekReader.java:229)
at org.apache.jena.atlas.io.PeekReader.peekChar(PeekReader.java:151)
at org.apache.jena.atlas.io.PeekReader.makeUTF8(PeekReader.java:92)
at org.apache.jena.riot.tokens.TokenizerFactory.makeTokenizerUTF8(TokenizerFactory.java:48)
at org.apache.jena.riot.lang.RiotParsers.createParser(RiotParsers.java:57)
at org.apache.jena.riot.RDFParserRegistry$ReaderRIOTLang.read(RDFParserRegistry.java:198)
at org.apache.jena.riot.RDFParser.read(RDFParser.java:298)
at org.apache.jena.riot.RDFParser.parseNotUri(RDFParser.java:288)
at org.apache.jena.riot.RDFParser.parse(RDFParser.java:237)
at org.apache.jena.riot.RDFParserBuilder.parse(RDFParserBuilder.java:417)
at org.apache.jena.riot.RDFDataMgr.parseFromInputStream(RDFDataMgr.java:870)
at org.apache.jena.riot.RDFDataMgr.read(RDFDataMgr.java:268)
at org.apache.jena.riot.RDFDataMgr.read(RDFDataMgr.java:254)
at org.apache.jena.riot.adapters.RDFReaderRIOT.read(RDFReaderRIOT.java:69)
at org.apache.jena.rdf.model.impl.ModelCom.read(ModelCom.java:305)
在这里你可以看到垃圾(最后):
<http://example.com/typeofrepresentative/08> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://www.w3.org/2002/07/owl#NamedIndividual> . ������** �����I��.�������������u�������
管道:
val one = p.apply(TextIO.read().from(config.getString("source.one")))
.apply(Combine.globally(SingleValue()))
.apply(ParDo.of(ConvertToRDFModel(RDFLanguages.NTRIPLES)))
val two = p.apply(TextIO.read().from(config.getString("source.two")))
.apply(Combine.globally(SingleValue()))
.apply(ParDo.of(ConvertToRDFModel(RDFLanguages.NTRIPLES)))
val three = p.apply(TextIO.read().from(config.getString("source.three")))
.apply(Combine.globally(SingleValue()))
.apply(ParDo.of(ConvertToRDFModel(RDFLanguages.NTRIPLES)))
val sideInput = PCollectionList.of(one).and(two).and(three)
.apply(Flatten.pCollections())
.apply(View.asList())
p.apply(RDFIO.Read
.from(options.getSource())
.withSuffix(RDFLanguages.strLangNTriples))
.apply(ParDo.of(SparqlConstructETL(config, sideInput))
.withSideInputs(sideInput))
.apply(RDFIO.Write
.to(options.getDestination())
.withSuffix(RDFLanguages.NTRIPLES))
为了提供全貌,这里提供 SingleValue
和 ConvertToRDFModel
ParDos 的实现:
class SingleValue : SerializableFunction<Iterable<String>, String> {
override fun apply(input: Iterable<String>?): String {
if (input != null) {
return input.joinToString(separator = " ")
}
return ""
}
}
class ConvertToRDFModel(outputLang: Lang) : DoFn<String, Model>() {
private val lang: String = outputLang.name
@ProcessElement
fun processElement(c: ProcessContext?) {
if (c != null) {
val model = ModelFactory.createDefaultModel()
model.read(StringReader(c.element()), null, lang)
c.output(model)
}
}
}
执行RDFModelCoder
:
class RDFModelCoder(private val decodeLang: String = RDFLanguages.strLangNTriples,
private val encodeLang: String = RDFLanguages.strLangNTriples)
: AtomicCoder<Model>() {
private val LOG = LoggerFactory.getLogger(RDFModelCoder::class.java)
override fun decode(inStream: InputStream): Model {
val bytes = StreamUtils.getBytes(inStream)
val model = ModelFactory.createDefaultModel()
model.read(ByteArrayInputStream(bytes), null, decodeLang) // the exception is thrown from here
return model
}
override fun encode(value: Model, outStream: OutputStream?) {
value.write(outStream, encodeLang, null)
}
}
我多次检查了侧输入文件,它们很好,它们有 UTF-8 编码。
最有可能的错误是在 RDFModelCoder
的实现中。在实现 encode
/decode
时,必须记住所提供的 InputStream
和 OutputStream
并非由当前实例 encoded/decoded 独占。例如。在您当前 Model
的编码形式之后的 InputStream
中可能有更多数据。使用 StreamUtils.getBytes(inStream)
时,您将同时获取当前编码的 Model
和流中的任何其他数据。
通常,在编写新的 Coder
时,最好只合并现有的 Coder
而不是手动解析流:这样不容易出错。我建议转换模型 to/from byte[]
并使用 ByteArrayCoder.of()
到 encode/decode 它。
Apache Jena 提供了 Elephas IO modules which have Hadoop IO support, since Beam supports Hadoop InputFormat IO 您应该能够使用它来读取 NTriples 文件。
这可能会更有效,因为 NTriples support in Elephas 能够并行化 IO 并避免将整个模型缓存到内存中(事实上它根本不会使用 Model
):
Configuration myHadoopConfiguration = new Configuration(false);
// Set Hadoop InputFormat, key and value class in configuration
myHadoopConfiguration.setClass("mapreduce.job.inputformat.class",
NTriplesInputFormat.class, InputFormat.class);
myHadoopConfiguration.setClass("key.class", LongWritable.class, Object.class);
myHadoopConfiguration.setClass("value.class", TripleWritable.class, Object.class);
// Set any other Hadoop config you might need
// Read data only with Hadoop configuration.
p.apply("read",
HadoopInputFormatIO.<LongWritable, TripleWritable>read()
.withConfiguration(myHadoopConfiguration);
当然,这可能需要您对整个管道进行一些重构。