Spark Structured-Streaming Error:-pyspark.sql.utils.StreamingQueryException: 'assertion failed: Invalid batch:
Spark Structured-Streaming Error:-pyspark.sql.utils.StreamingQueryException: 'assertion failed: Invalid batch:
我有一个 Spark Structured-Streaming 应用程序,它从 s3 读取 JSON 数据并进行一些转换并将其写回 s3。
当 运行 应用程序时,有时作业会出错并重新尝试(没有任何可见的丢失或数据损坏 - 所以一切看起来都很好),但提供的错误消息描述性不强
错误信息如下:
pyspark.sql.utils.StreamingQueryException: u'assertion failed: Invalid batch: _ra_guest_gid#1883,_ra_sess_ts#1884,_ra_evt_ts#1885,event#1886,brand#1887,category#1888,funding_daysRemaining#1889,funding_dollarsRemaining#1890,funding_goal#1891,funding_totalBackers#1892L,funding_totalFunded#1893,id#1894,name#1895,price#1896,projectInfo_memberExclusive#1897,projectInfo_memberExclusiveHoursRemaining#1898,projectInfo_numberOfEpisodes#1899,projectInfo_projectState#1900,variant#1901 != _ra_guest_gid#2627,_ra_sess_ts#2628,_
我猜这可能与列不匹配有关,其中
传入的 JSON 记录不符合架构。
或者传入 JSON 记录的数据类型可能与架构中提供的数据类型不匹配。
但我不确定如何查明导致错误的记录或特定字段。
关于错误的含义或我如何以更好的方式记录错误的任何帮助或建议。
谢谢
我想我已经解决了这个问题,它与架构不匹配无关。
在我的案例中发生的事情是我有两个并行的流操作 运行ning。
1) 从 S3 存储桶中读取原始传入数据,然后执行一些操作并将其写回输出文件夹中的 S3 'a'
2) 从文件夹 'a' 中读取处理后的流数据(步骤 1),然后再次执行一些操作并在输出文件夹 'b'
中写回 S3
现在根据我的观察,如果我 运行 单独执行上述步骤,那么它工作正常,但如果我 运行 它们一起我得到错误
'pyspark.sql.utils.StreamingQueryException: 断言失败:无效批次:'
所以我认为当它尝试从同一位置读取和写入时会遇到问题,即一个流的目的地是另一个流的源
我有一个 Spark Structured-Streaming 应用程序,它从 s3 读取 JSON 数据并进行一些转换并将其写回 s3。
当 运行 应用程序时,有时作业会出错并重新尝试(没有任何可见的丢失或数据损坏 - 所以一切看起来都很好),但提供的错误消息描述性不强
错误信息如下:
pyspark.sql.utils.StreamingQueryException: u'assertion failed: Invalid batch: _ra_guest_gid#1883,_ra_sess_ts#1884,_ra_evt_ts#1885,event#1886,brand#1887,category#1888,funding_daysRemaining#1889,funding_dollarsRemaining#1890,funding_goal#1891,funding_totalBackers#1892L,funding_totalFunded#1893,id#1894,name#1895,price#1896,projectInfo_memberExclusive#1897,projectInfo_memberExclusiveHoursRemaining#1898,projectInfo_numberOfEpisodes#1899,projectInfo_projectState#1900,variant#1901 != _ra_guest_gid#2627,_ra_sess_ts#2628,_
我猜这可能与列不匹配有关,其中
传入的 JSON 记录不符合架构。
或者传入 JSON 记录的数据类型可能与架构中提供的数据类型不匹配。
但我不确定如何查明导致错误的记录或特定字段。
关于错误的含义或我如何以更好的方式记录错误的任何帮助或建议。
谢谢
我想我已经解决了这个问题,它与架构不匹配无关。 在我的案例中发生的事情是我有两个并行的流操作 运行ning。
1) 从 S3 存储桶中读取原始传入数据,然后执行一些操作并将其写回输出文件夹中的 S3 'a'
2) 从文件夹 'a' 中读取处理后的流数据(步骤 1),然后再次执行一些操作并在输出文件夹 'b'
中写回 S3现在根据我的观察,如果我 运行 单独执行上述步骤,那么它工作正常,但如果我 运行 它们一起我得到错误
'pyspark.sql.utils.StreamingQueryException: 断言失败:无效批次:'
所以我认为当它尝试从同一位置读取和写入时会遇到问题,即一个流的目的地是另一个流的源