使用 Dataflow 将 PubSub 流写入 Cloud Storage 时出错
error writing PubSub stream to Cloud Storage using Dataflow
使用 SCIO from spotify
to write a job for Dataflow
, following 2 examples e.g1 and e.g2 将 PubSub
流写入 GCS
,但以下代码出现以下错误
错误
Exception in thread "main" java.lang.IllegalArgumentException: Write can only be applied to a Bounded PCollection
代码
object StreamingPubSub {
def main(cmdlineArgs: Array[String]): Unit = {
// set up example wiring
val (opts, args) = ScioContext.parseArguments[ExampleOptions](cmdlineArgs)
val dataflowUtils = new DataflowExampleUtils(opts)
dataflowUtils.setup()
val sc = ScioContext(opts)
sc.pubsubTopic(opts.getPubsubTopic)
.timestampBy {
_ => new Instant(System.currentTimeMillis() - (scala.math.random * RAND_RANGE).toLong)
}
.withFixedWindows((Duration.standardHours(1)))
.groupBy(_ => Unit)
.toWindowed
.toSCollection
.saveAsTextFile(args("output"))
val result = sc.close()
// CTRL-C to cancel the streaming pipeline
dataflowUtils.waitToFinish(result.internal)
}
}
我可能混淆了 window 概念和 Bounded PCollection,有没有办法实现这个,或者我需要应用一些转换来实现这个,任何人都可以在这方面提供帮助
我相信 SCIO 的 saveAsTextFile
下面使用 Dataflow 的 Write
转换,它仅支持有界 PCollections。 Dataflow 尚未提供直接 API 将无界 PCollection 写入 Google Cloud Storage,尽管这是我们正在调查的内容。
要在某处持久化无界 PCollection,请考虑 BigQuery、Datastore 或 Bigtable 等。例如,在 SCIO 的 API 中,您可以使用 saveAsBigQuery
.
使用 SCIO from spotify
to write a job for Dataflow
, following 2 examples e.g1 and e.g2 将 PubSub
流写入 GCS
,但以下代码出现以下错误
错误
Exception in thread "main" java.lang.IllegalArgumentException: Write can only be applied to a Bounded PCollection
代码
object StreamingPubSub {
def main(cmdlineArgs: Array[String]): Unit = {
// set up example wiring
val (opts, args) = ScioContext.parseArguments[ExampleOptions](cmdlineArgs)
val dataflowUtils = new DataflowExampleUtils(opts)
dataflowUtils.setup()
val sc = ScioContext(opts)
sc.pubsubTopic(opts.getPubsubTopic)
.timestampBy {
_ => new Instant(System.currentTimeMillis() - (scala.math.random * RAND_RANGE).toLong)
}
.withFixedWindows((Duration.standardHours(1)))
.groupBy(_ => Unit)
.toWindowed
.toSCollection
.saveAsTextFile(args("output"))
val result = sc.close()
// CTRL-C to cancel the streaming pipeline
dataflowUtils.waitToFinish(result.internal)
}
}
我可能混淆了 window 概念和 Bounded PCollection,有没有办法实现这个,或者我需要应用一些转换来实现这个,任何人都可以在这方面提供帮助
我相信 SCIO 的 saveAsTextFile
下面使用 Dataflow 的 Write
转换,它仅支持有界 PCollections。 Dataflow 尚未提供直接 API 将无界 PCollection 写入 Google Cloud Storage,尽管这是我们正在调查的内容。
要在某处持久化无界 PCollection,请考虑 BigQuery、Datastore 或 Bigtable 等。例如,在 SCIO 的 API 中,您可以使用 saveAsBigQuery
.