Scio Apache Beam - 如何正确分离管道代码?
Scio Apache Beam - How to properly separate a pipeline code?
我有一个包含一组 PTransform 的管道,我的方法变得很长。
我想在一个单独的包中编写我的 DoFns 和我的复合转换,然后在我的主要方法中使用它们。使用 python 非常简单,我如何使用 Scio 实现它?我没有看到任何这样做的例子。 :(
withFixedWindows(
FIXED_WINDOW_DURATION,
options = WindowOptions(
trigger = groupedWithinTrigger,
timestampCombiner = TimestampCombiner.END_OF_WINDOW,
accumulationMode = AccumulationMode.ACCUMULATING_FIRED_PANES,
allowedLateness = Duration.ZERO
)
)
.sumByKey
// How to write this in an another file and use it here?
.transform("Format Output") {
_
.withWindow[IntervalWindow]
.withTimestamp
}
您可以使用 map
函数来映射您的元素 example。
您可以传递来自另一个 class 的方法引用,而不是传递 lambda
示例 .map(MyClass.MyFunction)
如果我对你的问题的理解正确,你想将你的 map, groupBy, ...
转换捆绑在一个单独的包中,并在你的主管道中使用它们。
一种方法是使用 applyTransform
,但是你最终会使用 PTransforms,它对 scala 不友好。
您可以简单地编写一个接收 SCollection 和 returns 转换后的函数,例如:
def myTransform(input: SCollection[InputType]): Scollection[OutputType] = ???
但如果您打算自己编写 Source/Sink,请查看 ScioIO class
我认为解决这个问题的一种方法是在另一个包中定义一个对象,然后在该对象中创建一个方法,该方法将具有转换所需的逻辑。例如:
def main(cmdlineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdlineArgs)
val defaulTopic = "tweets"
val input = args.getOrElse("inputTopic", defaulTopic)
val output = args("outputTopic")
val inputStream: SCollection[Tweet] = sc.withName("read from pub sub").pubsubTopic(input)
.withName("map to tweet class").map(x => {parse(x).extract[Tweet]})
inputStream
.flatMap(sentiment.predict) // object sentiment with method predict
}
object sentiment {
def predict(tweet: Tweet): Option[List[TweetSentiment]] = {
val data = tweet.text
val emptyCase = Some("")
Some(data) match {
case `emptyCase` => None
case Some(v) => Some(entitySentimentFile(data)) // I used another method, //not defined
}
}
也请 link 作为 Scio examples
中给出的示例
我有一个包含一组 PTransform 的管道,我的方法变得很长。
我想在一个单独的包中编写我的 DoFns 和我的复合转换,然后在我的主要方法中使用它们。使用 python 非常简单,我如何使用 Scio 实现它?我没有看到任何这样做的例子。 :(
withFixedWindows(
FIXED_WINDOW_DURATION,
options = WindowOptions(
trigger = groupedWithinTrigger,
timestampCombiner = TimestampCombiner.END_OF_WINDOW,
accumulationMode = AccumulationMode.ACCUMULATING_FIRED_PANES,
allowedLateness = Duration.ZERO
)
)
.sumByKey
// How to write this in an another file and use it here?
.transform("Format Output") {
_
.withWindow[IntervalWindow]
.withTimestamp
}
您可以使用 map
函数来映射您的元素 example。
您可以传递来自另一个 class 的方法引用,而不是传递 lambda
示例 .map(MyClass.MyFunction)
如果我对你的问题的理解正确,你想将你的 map, groupBy, ...
转换捆绑在一个单独的包中,并在你的主管道中使用它们。
一种方法是使用 applyTransform
,但是你最终会使用 PTransforms,它对 scala 不友好。
您可以简单地编写一个接收 SCollection 和 returns 转换后的函数,例如:
def myTransform(input: SCollection[InputType]): Scollection[OutputType] = ???
但如果您打算自己编写 Source/Sink,请查看 ScioIO class
我认为解决这个问题的一种方法是在另一个包中定义一个对象,然后在该对象中创建一个方法,该方法将具有转换所需的逻辑。例如:
def main(cmdlineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdlineArgs)
val defaulTopic = "tweets"
val input = args.getOrElse("inputTopic", defaulTopic)
val output = args("outputTopic")
val inputStream: SCollection[Tweet] = sc.withName("read from pub sub").pubsubTopic(input)
.withName("map to tweet class").map(x => {parse(x).extract[Tweet]})
inputStream
.flatMap(sentiment.predict) // object sentiment with method predict
}
object sentiment {
def predict(tweet: Tweet): Option[List[TweetSentiment]] = {
val data = tweet.text
val emptyCase = Some("")
Some(data) match {
case `emptyCase` => None
case Some(v) => Some(entitySentimentFile(data)) // I used another method, //not defined
}
}
也请 link 作为 Scio examples
中给出的示例