如何创建自定义流式数据源?

How to create a custom streaming data source?

我有一个用于从 WebSocket 读取数据的 Spark Streaming 的自定义 reader。我要尝试 Spark 结构化流。

如何在 Spark Structured Streaming 中创建流式数据源?

流式数据源实现 org.apache.spark.sql.execution.streaming.Source

org.apache.spark.sql.execution.streaming.Source 的 scaladoc 应该为您提供了足够的入门信息(只需按照类型开发可编译的 Scala 类型)。

获得 Source 后,您必须注册它才能在 DataStreamReaderformat 中使用它。使流媒体源可用以便您可以将其用于 format 的技巧是通过为流媒体源创建 DataSourceRegister 来注册它。您可以在 META-INF/services/org.apache.spark.sql.sources.DataSourceRegister:

中找到示例
org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
org.apache.spark.sql.execution.datasources.json.JsonFileFormat
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
org.apache.spark.sql.execution.datasources.text.TextFileFormat
org.apache.spark.sql.execution.streaming.ConsoleSinkProvider
org.apache.spark.sql.execution.streaming.TextSocketSourceProvider
org.apache.spark.sql.execution.streaming.RateSourceProvider

这是将 format 中的短名称链接到实现的文件。

我通常建议人们在我的 Spark 研讨会上做的是从两方面开始开发:

  1. 编写流式查询(使用format),例如

    val input = spark
      .readStream
      .format("yourCustomSource") // <-- your custom source here
      .load
    
  2. 实现流式传输Source和相应的DataSourceRegister(可以是相同的class)

  3. (可选)通过将完全限定的 class 名称(例如 com.mycompany.spark.MyDataSourceRegister 写入 META-INF/services/org.apache.spark.sql.sources.DataSourceRegister:[=33] 来注册 DataSourceRegister =]

    $ cat META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
    com.mycompany.spark.MyDataSourceRegister
    

为自定义 Source 注册 DataSourceRegister 实现的最后一步是可选的,仅用于注册最终用户在 DataFrameReader.format 方法中使用的数据源别名。

format(source: String): DataFrameReader Specifies the input data source format.

查看 org.apache.spark.sql.execution.streaming.RateSourceProvider 的代码以获得良好的开端。

随着 Spark 转向 V2 API,您现在必须实施 DataSourceV2, MicroBatchReadSupport, and DataSourceRegister

这将涉及创建您自己的 OffsetMicroBatchReaderDataReader<Row>DataReaderFactory<Row>

有一些 examples 在线自定义结构化流示例(在 Scala 中),这对我编写我的代码很有帮助。

实现自定义源后,您可以按照 Jacek Laskowski 的回答注册源。

此外,根据您将从套接字接收的消息的编码,您可以只使用默认套接字源并使用自定义映射函数将信息解析为您将使用的任何 Bean .尽管请注意 Spark 表示不应在生产中使用默认套接字流源!

希望对您有所帮助!

此外,Here 是自定义 WebSocket 流 Reader/Writer 的示例实现,它实现了 Offset, MicroBatchReader, DataReader<Row>DataReaderFactory<Row>

由于 Spark 3.0 对数据源进行了一些重大更改API,这里是更新版本:

名为 DefaultSource 扩展 TableProvider 的 class 是 API 的 entry-point。 getTable 方法 returns table class 扩展 SupportsRead。此 class 必须提供 ScanBuilder 并定义源功能,在本例中为 TableCapability.MICRO_BATCH_READ.

ScanBuilder 创建一个 class 扩展 Scan 必须实现 toMicroBatchStream 方法(对于 non-streaming 用例,我们将实现 toBatch 方法)。 toMicroBatchStream 现在 returns 作为 class 扩展 MicroBatchStream 实现了哪些数据可用以及如何对其进行分区的逻辑 (docs)。

现在唯一剩下的是 PartitionReaderFactory,它创建一个 PartitionReader 负责实际读取数据分区,get 逐行返回。您可以使用 InternalRow.fromSeq(List(1,2,3)) 将数据转换为 InternalRow.

我创建了一个最小的示例项目:here