如何创建自定义流式数据源?
How to create a custom streaming data source?
我有一个用于从 WebSocket 读取数据的 Spark Streaming 的自定义 reader。我要尝试 Spark 结构化流。
如何在 Spark Structured Streaming 中创建流式数据源?
流式数据源实现 org.apache.spark.sql.execution.streaming.Source。
org.apache.spark.sql.execution.streaming.Source
的 scaladoc 应该为您提供了足够的入门信息(只需按照类型开发可编译的 Scala 类型)。
获得 Source
后,您必须注册它才能在 DataStreamReader
的 format
中使用它。使流媒体源可用以便您可以将其用于 format
的技巧是通过为流媒体源创建 DataSourceRegister
来注册它。您可以在 META-INF/services/org.apache.spark.sql.sources.DataSourceRegister:
中找到示例
org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
org.apache.spark.sql.execution.datasources.json.JsonFileFormat
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
org.apache.spark.sql.execution.datasources.text.TextFileFormat
org.apache.spark.sql.execution.streaming.ConsoleSinkProvider
org.apache.spark.sql.execution.streaming.TextSocketSourceProvider
org.apache.spark.sql.execution.streaming.RateSourceProvider
这是将 format
中的短名称链接到实现的文件。
我通常建议人们在我的 Spark 研讨会上做的是从两方面开始开发:
编写流式查询(使用format
),例如
val input = spark
.readStream
.format("yourCustomSource") // <-- your custom source here
.load
实现流式传输Source
和相应的DataSourceRegister
(可以是相同的class)
(可选)通过将完全限定的 class 名称(例如 com.mycompany.spark.MyDataSourceRegister
写入 META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
:[=33] 来注册 DataSourceRegister
=]
$ cat META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
com.mycompany.spark.MyDataSourceRegister
为自定义 Source
注册 DataSourceRegister
实现的最后一步是可选的,仅用于注册最终用户在 DataFrameReader.format 方法中使用的数据源别名。
format(source: String): DataFrameReader
Specifies the input data source format.
查看 org.apache.spark.sql.execution.streaming.RateSourceProvider 的代码以获得良好的开端。
随着 Spark 转向 V2 API,您现在必须实施 DataSourceV2, MicroBatchReadSupport, and DataSourceRegister。
这将涉及创建您自己的 Offset
、MicroBatchReader
、DataReader<Row>
和 DataReaderFactory<Row>
。
有一些 examples 在线自定义结构化流示例(在 Scala 中),这对我编写我的代码很有帮助。
实现自定义源后,您可以按照 Jacek Laskowski 的回答注册源。
此外,根据您将从套接字接收的消息的编码,您可以只使用默认套接字源并使用自定义映射函数将信息解析为您将使用的任何 Bean .尽管请注意 Spark 表示不应在生产中使用默认套接字流源!
希望对您有所帮助!
此外,Here 是自定义 WebSocket 流 Reader/Writer 的示例实现,它实现了 Offset, MicroBatchReader, DataReader<Row>
和 DataReaderFactory<Row>
由于 Spark 3.0 对数据源进行了一些重大更改API,这里是更新版本:
名为 DefaultSource
扩展 TableProvider
的 class 是 API 的 entry-point。 getTable
方法 returns table class 扩展 SupportsRead
。此 class 必须提供 ScanBuilder
并定义源功能,在本例中为 TableCapability.MICRO_BATCH_READ
.
ScanBuilder
创建一个 class 扩展 Scan
必须实现 toMicroBatchStream
方法(对于 non-streaming 用例,我们将实现 toBatch
方法)。 toMicroBatchStream
现在 returns 作为 class 扩展 MicroBatchStream
实现了哪些数据可用以及如何对其进行分区的逻辑 (docs)。
现在唯一剩下的是 PartitionReaderFactory
,它创建一个 PartitionReader
负责实际读取数据分区,get
逐行返回。您可以使用 InternalRow.fromSeq(List(1,2,3))
将数据转换为 InternalRow
.
我创建了一个最小的示例项目:here
我有一个用于从 WebSocket 读取数据的 Spark Streaming 的自定义 reader。我要尝试 Spark 结构化流。
如何在 Spark Structured Streaming 中创建流式数据源?
流式数据源实现 org.apache.spark.sql.execution.streaming.Source。
org.apache.spark.sql.execution.streaming.Source
的 scaladoc 应该为您提供了足够的入门信息(只需按照类型开发可编译的 Scala 类型)。
获得 Source
后,您必须注册它才能在 DataStreamReader
的 format
中使用它。使流媒体源可用以便您可以将其用于 format
的技巧是通过为流媒体源创建 DataSourceRegister
来注册它。您可以在 META-INF/services/org.apache.spark.sql.sources.DataSourceRegister:
org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
org.apache.spark.sql.execution.datasources.json.JsonFileFormat
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
org.apache.spark.sql.execution.datasources.text.TextFileFormat
org.apache.spark.sql.execution.streaming.ConsoleSinkProvider
org.apache.spark.sql.execution.streaming.TextSocketSourceProvider
org.apache.spark.sql.execution.streaming.RateSourceProvider
这是将 format
中的短名称链接到实现的文件。
我通常建议人们在我的 Spark 研讨会上做的是从两方面开始开发:
编写流式查询(使用
format
),例如val input = spark .readStream .format("yourCustomSource") // <-- your custom source here .load
实现流式传输
Source
和相应的DataSourceRegister
(可以是相同的class)(可选)通过将完全限定的 class 名称(例如
com.mycompany.spark.MyDataSourceRegister
写入META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
:[=33] 来注册DataSourceRegister
=]$ cat META-INF/services/org.apache.spark.sql.sources.DataSourceRegister com.mycompany.spark.MyDataSourceRegister
为自定义 Source
注册 DataSourceRegister
实现的最后一步是可选的,仅用于注册最终用户在 DataFrameReader.format 方法中使用的数据源别名。
format(source: String): DataFrameReader Specifies the input data source format.
查看 org.apache.spark.sql.execution.streaming.RateSourceProvider 的代码以获得良好的开端。
随着 Spark 转向 V2 API,您现在必须实施 DataSourceV2, MicroBatchReadSupport, and DataSourceRegister。
这将涉及创建您自己的 Offset
、MicroBatchReader
、DataReader<Row>
和 DataReaderFactory<Row>
。
有一些 examples 在线自定义结构化流示例(在 Scala 中),这对我编写我的代码很有帮助。
实现自定义源后,您可以按照 Jacek Laskowski 的回答注册源。
此外,根据您将从套接字接收的消息的编码,您可以只使用默认套接字源并使用自定义映射函数将信息解析为您将使用的任何 Bean .尽管请注意 Spark 表示不应在生产中使用默认套接字流源!
希望对您有所帮助!
此外,Here 是自定义 WebSocket 流 Reader/Writer 的示例实现,它实现了 Offset, MicroBatchReader, DataReader<Row>
和 DataReaderFactory<Row>
由于 Spark 3.0 对数据源进行了一些重大更改API,这里是更新版本:
名为 DefaultSource
扩展 TableProvider
的 class 是 API 的 entry-point。 getTable
方法 returns table class 扩展 SupportsRead
。此 class 必须提供 ScanBuilder
并定义源功能,在本例中为 TableCapability.MICRO_BATCH_READ
.
ScanBuilder
创建一个 class 扩展 Scan
必须实现 toMicroBatchStream
方法(对于 non-streaming 用例,我们将实现 toBatch
方法)。 toMicroBatchStream
现在 returns 作为 class 扩展 MicroBatchStream
实现了哪些数据可用以及如何对其进行分区的逻辑 (docs)。
现在唯一剩下的是 PartitionReaderFactory
,它创建一个 PartitionReader
负责实际读取数据分区,get
逐行返回。您可以使用 InternalRow.fromSeq(List(1,2,3))
将数据转换为 InternalRow
.
我创建了一个最小的示例项目:here