如何将 PyDeequ 作业从 Jupyter Notebook 提交到 Spark/YARN

How to submit a PyDeequ job from Jupyter Notebook to a Spark/YARN

如何配置环境以将 PyDeequ 作业从 Jupyter 笔记本提交到 Spark/YARN(客户端模式)。除了使用环境的那些之外,没有全面的解释。如何设置环境以与非 AWS 环境一起使用?

如果只是按照示例进行操作,则会导致 TypeError: 'JavaPackage' object is not callable 等错误,例如Testing data quality at scale with PyDeequ.

from pydeequ.analyzers import *
analysisResult = AnalysisRunner(spark) \
    .onData(df) \
    .addAnalyzer(Size()) \
    .addAnalyzer(Completeness("review_id")) \
    .addAnalyzer(ApproxCountDistinct("review_id")) \
    .addAnalyzer(Mean("star_rating")) \
    .addAnalyzer(Compliance("top star_rating", "star_rating >= 4.0")) \
    .addAnalyzer(Correlation("total_votes", "star_rating")) \
    .addAnalyzer(Correlation("total_votes", "helpful_votes")) \
    .run()
                    
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
/tmp/ipykernel_499599/1388970492.py in <module>
      1 from pydeequ.analyzers import *
----> 2 analysisResult = AnalysisRunner(spark) \
      3     .onData(df) \
      4     .addAnalyzer(Size()) \
      5     .addAnalyzer(Completeness("review_id")) \

~/home/repository/git/oonisim/aws/venv/lib/python3.8/site-packages/pydeequ/analyzers.py in onData(self, df)
     50         """
     51         df = ensure_pyspark_df(self._spark_session, df)
---> 52         return AnalysisRunBuilder(self._spark_session, df)
     53 
     54 

~/home/repository/git/oonisim/aws/venv/lib/python3.8/site-packages/pydeequ/analyzers.py in __init__(self, spark_session, df)
    122         self._jspark_session = spark_session._jsparkSession
    123         self._df = df
--> 124         self._AnalysisRunBuilder = self._jvm.com.amazon.deequ.analyzers.runners.AnalysisRunBuilder(df._jdf)
    125 
    126     def addAnalyzer(self, analyzer: _AnalyzerObject):

TypeError: 'JavaPackage' object is not callable

HADOOP_CONF_DIR

$HADOOP_HOME/etc/hadoop的内容从Hadoop/YARN主节点复制到本地主机,并设置HADOOP_CONF_DIR环境变量指向该目录

Ensure that HADOOP_CONF_DIR or YARN_CONF_DIR points to the directory which contains the (client side) configuration files for the Hadoop cluster. These configs are used to write to HDFS and connect to the YARN ResourceManager. The configuration contained in this directory will be distributed to the YARN cluster so that all containers used by the application use the same configuration.

os.environ['HADOOP_CONF_DIR'] = "/opt/hadoop/hadoop-3.2.2/etc/hadoop"

Python 路径

pyspark

需要能够加载 pyspark python 模块。使用 pip 或 conda 安装 pyspark 以安装 Spark 运行时库(独立)。或者从 Spark 安装中复制 pyspark python 模块 $SPARK_HOME/python/lib

Ensure the SPARK_HOME environment variable points to the directory where the tar file has been extracted. Update PYTHONPATH environment variable such that it can find the PySpark and Py4J under SPARK_HOME/python/lib. One example of doing this is shown below:

export PYTHONPATH=$(ZIPS=("$SPARK_HOME"/python/lib/*.zip); IFS=:; echo >"${ZIPS[*]}"):$PYTHONPATH
sys.path.extend([
    "/opt/spark/spark-3.1.2/python/lib/py4j-0.10.9-src.zip",
    "/opt/spark/spark-3.1.2/python/lib/pyspark.zip"
])

PyDeequ

使用 pip 或 conda 安装 pydeequ。注意,这还不足以使用pydeequ。

Deequ JAR 文件

deequ jar到库路径

要使用PyDeequ,需要deequ jar 文件。从 Maven repository com.amazon.deequ.

下载 Spark/Deequ 版本的那个
import os
import sys

root = os.path.dirname(os.path.realpath(os.getcwd()))
deequ_jar = "https://repo1.maven.org/maven2/com/amazon/deequ/deequ/2.0.0-spark-3.1/deequ-2.0.0-spark-3.1.jar"
classpath = f"{root}/jar/deequ-2.0.0-spark-3.1.jar"

!wget -q -O $classpath $deequ_jar

火花Session

将 Deequ jar 文件指定为指定的 Spark jar 属性:

spark = SparkSession.builder\
    .master('yarn') \
    .config('spark.submit.deployMode', 'client') \
    .config("spark.driver.extraClassPath", classpath) \
    .config("spark.jars.packages", pydeequ.deequ_maven_coord) \
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord) \
    .config('spark.debug.maxToStringFields', 100) \
    .config('spark.executor.memory', '2g') \
    .getOrCreate()

Deequ 工作

使用亚马逊产品评论数据的摘录。

df = spark.read.csv(
    path=f"file:///{root}/data/amazon_product_reviews.csv.gz",
    header=True,
)
df.printSchema()
-----
root
 |-- review_id: string (nullable = true)
 |-- marketplace: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- year: string (nullable = true)
 |-- star_rating: string (nullable = true)
 |-- total_votes: string (nullable = true)
 |-- helpful_votes: string (nullable = true)
 |-- product_category: string (nullable = true)
from pydeequ.analyzers import *
analysisResult = AnalysisRunner(spark) \
    .onData(df) \
    .addAnalyzer(Size()) \
    .addAnalyzer(Completeness("review_id")) \
    .addAnalyzer(ApproxCountDistinct("review_id")) \
    .addAnalyzer(Mean("star_rating")) \
    .addAnalyzer(Compliance("top star_rating", "star_rating >= 4.0")) \
    .addAnalyzer(Correlation("total_votes", "star_rating")) \
    .addAnalyzer(Correlation("total_votes", "helpful_votes")) \
    .run()
                    
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()
-----
21/08/16 11:17:00 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                
+-------+---------------+-------------------+------+
| entity|       instance|               name| value|
+-------+---------------+-------------------+------+
| Column|      review_id|       Completeness|   1.0|
| Column|      review_id|ApproxCountDistinct|1040.0|
|Dataset|              *|               Size|1000.0|
| Column|top star_rating|         Compliance| 0.657|
+-------+---------------+-------------------+------+