Scio Apache Beam - 如何正确分离管道代码?

Scio Apache Beam - How to properly separate a pipeline code?

我有一个包含一组 PTransform 的管道,我的方法变得很长。

我想在一个单独的包中编写我的 DoFns 和我的复合转换,然后在我的主要方法中使用它们。使用 python 非常简单,我如何使用 Scio 实现它?我没有看到任何这样做的例子。 :(

     withFixedWindows(
        FIXED_WINDOW_DURATION,
        options = WindowOptions(
          trigger = groupedWithinTrigger,
          timestampCombiner = TimestampCombiner.END_OF_WINDOW,
          accumulationMode = AccumulationMode.ACCUMULATING_FIRED_PANES,
          allowedLateness = Duration.ZERO
        )
      )
      .sumByKey
      // How to write this in an another file and use it here?
      .transform("Format Output") {
        _
          .withWindow[IntervalWindow]
          .withTimestamp
      }

您可以使用 map 函数来映射您的元素 example

您可以传递来自另一个 class 的方法引用,而不是传递 lambda 示例 .map(MyClass.MyFunction)

如果我对你的问题的理解正确,你想将你的 map, groupBy, ... 转换捆绑在一个单独的包中,并在你的主管道中使用它们。

一种方法是使用 applyTransform,但是你最终会使用 PTransforms,它对 scala 不友好。

您可以简单地编写一个接收 SCollection 和 returns 转换后的函数,例如:

def myTransform(input: SCollection[InputType]): Scollection[OutputType] = ???

但如果您打算自己编写 Source/Sink,请查看 ScioIO class

我认为解决这个问题的一种方法是在另一个包中定义一个对象,然后在该对象中创建一个方法,该方法将具有转换所需的逻辑。例如:

def main(cmdlineArgs: Array[String]): Unit = {
    val (sc, args) = ContextAndArgs(cmdlineArgs)

    val defaulTopic = "tweets"
    val input = args.getOrElse("inputTopic", defaulTopic)
    val output = args("outputTopic")

    val inputStream: SCollection[Tweet] = sc.withName("read from pub sub").pubsubTopic(input)
      .withName("map to tweet class").map(x => {parse(x).extract[Tweet]})

    inputStream
      .flatMap(sentiment.predict) // object sentiment with method predict

  }
object sentiment  {

  def predict(tweet: Tweet): Option[List[TweetSentiment]] = {
    val data = tweet.text
    val emptyCase = Some("")
    Some(data) match {
      case `emptyCase` => None
      case Some(v) => Some(entitySentimentFile(data)) // I used another method, //not defined
    }

  }

也请 link 作为 Scio examples

中给出的示例