如何将 PyDeequ 作业从 Jupyter Notebook 提交到 Spark/YARN
How to submit a PyDeequ job from Jupyter Notebook to a Spark/YARN
如何配置环境以将 PyDeequ 作业从 Jupyter 笔记本提交到 Spark/YARN(客户端模式)。除了使用环境的那些之外,没有全面的解释。如何设置环境以与非 AWS 环境一起使用?
如果只是按照示例进行操作,则会导致 TypeError: 'JavaPackage' object is not callable
等错误,例如Testing data quality at scale with PyDeequ.
from pydeequ.analyzers import *
analysisResult = AnalysisRunner(spark) \
.onData(df) \
.addAnalyzer(Size()) \
.addAnalyzer(Completeness("review_id")) \
.addAnalyzer(ApproxCountDistinct("review_id")) \
.addAnalyzer(Mean("star_rating")) \
.addAnalyzer(Compliance("top star_rating", "star_rating >= 4.0")) \
.addAnalyzer(Correlation("total_votes", "star_rating")) \
.addAnalyzer(Correlation("total_votes", "helpful_votes")) \
.run()
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
/tmp/ipykernel_499599/1388970492.py in <module>
1 from pydeequ.analyzers import *
----> 2 analysisResult = AnalysisRunner(spark) \
3 .onData(df) \
4 .addAnalyzer(Size()) \
5 .addAnalyzer(Completeness("review_id")) \
~/home/repository/git/oonisim/aws/venv/lib/python3.8/site-packages/pydeequ/analyzers.py in onData(self, df)
50 """
51 df = ensure_pyspark_df(self._spark_session, df)
---> 52 return AnalysisRunBuilder(self._spark_session, df)
53
54
~/home/repository/git/oonisim/aws/venv/lib/python3.8/site-packages/pydeequ/analyzers.py in __init__(self, spark_session, df)
122 self._jspark_session = spark_session._jsparkSession
123 self._df = df
--> 124 self._AnalysisRunBuilder = self._jvm.com.amazon.deequ.analyzers.runners.AnalysisRunBuilder(df._jdf)
125
126 def addAnalyzer(self, analyzer: _AnalyzerObject):
TypeError: 'JavaPackage' object is not callable
HADOOP_CONF_DIR
将$HADOOP_HOME/etc/hadoop
的内容从Hadoop/YARN主节点复制到本地主机,并设置HADOOP_CONF_DIR
环境变量指向该目录
Ensure that HADOOP_CONF_DIR or YARN_CONF_DIR points to the directory which contains the (client side) configuration files for the Hadoop cluster. These configs are used to write to HDFS and connect to the YARN ResourceManager. The configuration contained in this directory will be distributed to the YARN cluster so that all containers used by the application use the same configuration.
os.environ['HADOOP_CONF_DIR'] = "/opt/hadoop/hadoop-3.2.2/etc/hadoop"
Python 路径
pyspark
需要能够加载 pyspark python 模块。使用 pip 或 conda 安装 pyspark 以安装 Spark 运行时库(独立)。或者从 Spark 安装中复制 pyspark python 模块 $SPARK_HOME/python/lib
。
Ensure the SPARK_HOME environment variable points to the directory where the tar file has been extracted. Update PYTHONPATH environment variable such that it can find the PySpark and Py4J under SPARK_HOME/python/lib. One example of doing this is shown below:
export PYTHONPATH=$(ZIPS=("$SPARK_HOME"/python/lib/*.zip); IFS=:; echo >"${ZIPS[*]}"):$PYTHONPATH
sys.path.extend([
"/opt/spark/spark-3.1.2/python/lib/py4j-0.10.9-src.zip",
"/opt/spark/spark-3.1.2/python/lib/pyspark.zip"
])
PyDeequ
使用 pip 或 conda 安装 pydeequ。注意,这还不足以使用pydeequ。
Deequ JAR 文件
deequ jar到库路径
要使用PyDeequ,需要deequ jar 文件。从 Maven repository com.amazon.deequ.
下载 Spark/Deequ 版本的那个
import os
import sys
root = os.path.dirname(os.path.realpath(os.getcwd()))
deequ_jar = "https://repo1.maven.org/maven2/com/amazon/deequ/deequ/2.0.0-spark-3.1/deequ-2.0.0-spark-3.1.jar"
classpath = f"{root}/jar/deequ-2.0.0-spark-3.1.jar"
!wget -q -O $classpath $deequ_jar
火花Session
将 Deequ jar 文件指定为指定的 Spark jar 属性:
spark = SparkSession.builder\
.master('yarn') \
.config('spark.submit.deployMode', 'client') \
.config("spark.driver.extraClassPath", classpath) \
.config("spark.jars.packages", pydeequ.deequ_maven_coord) \
.config("spark.jars.excludes", pydeequ.f2j_maven_coord) \
.config('spark.debug.maxToStringFields', 100) \
.config('spark.executor.memory', '2g') \
.getOrCreate()
Deequ 工作
df = spark.read.csv(
path=f"file:///{root}/data/amazon_product_reviews.csv.gz",
header=True,
)
df.printSchema()
-----
root
|-- review_id: string (nullable = true)
|-- marketplace: string (nullable = true)
|-- product_id: string (nullable = true)
|-- year: string (nullable = true)
|-- star_rating: string (nullable = true)
|-- total_votes: string (nullable = true)
|-- helpful_votes: string (nullable = true)
|-- product_category: string (nullable = true)
from pydeequ.analyzers import *
analysisResult = AnalysisRunner(spark) \
.onData(df) \
.addAnalyzer(Size()) \
.addAnalyzer(Completeness("review_id")) \
.addAnalyzer(ApproxCountDistinct("review_id")) \
.addAnalyzer(Mean("star_rating")) \
.addAnalyzer(Compliance("top star_rating", "star_rating >= 4.0")) \
.addAnalyzer(Correlation("total_votes", "star_rating")) \
.addAnalyzer(Correlation("total_votes", "helpful_votes")) \
.run()
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()
-----
21/08/16 11:17:00 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
+-------+---------------+-------------------+------+
| entity| instance| name| value|
+-------+---------------+-------------------+------+
| Column| review_id| Completeness| 1.0|
| Column| review_id|ApproxCountDistinct|1040.0|
|Dataset| *| Size|1000.0|
| Column|top star_rating| Compliance| 0.657|
+-------+---------------+-------------------+------+
如何配置环境以将 PyDeequ 作业从 Jupyter 笔记本提交到 Spark/YARN(客户端模式)。除了使用环境的那些之外,没有全面的解释。如何设置环境以与非 AWS 环境一起使用?
如果只是按照示例进行操作,则会导致 TypeError: 'JavaPackage' object is not callable
等错误,例如Testing data quality at scale with PyDeequ.
from pydeequ.analyzers import *
analysisResult = AnalysisRunner(spark) \
.onData(df) \
.addAnalyzer(Size()) \
.addAnalyzer(Completeness("review_id")) \
.addAnalyzer(ApproxCountDistinct("review_id")) \
.addAnalyzer(Mean("star_rating")) \
.addAnalyzer(Compliance("top star_rating", "star_rating >= 4.0")) \
.addAnalyzer(Correlation("total_votes", "star_rating")) \
.addAnalyzer(Correlation("total_votes", "helpful_votes")) \
.run()
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
/tmp/ipykernel_499599/1388970492.py in <module>
1 from pydeequ.analyzers import *
----> 2 analysisResult = AnalysisRunner(spark) \
3 .onData(df) \
4 .addAnalyzer(Size()) \
5 .addAnalyzer(Completeness("review_id")) \
~/home/repository/git/oonisim/aws/venv/lib/python3.8/site-packages/pydeequ/analyzers.py in onData(self, df)
50 """
51 df = ensure_pyspark_df(self._spark_session, df)
---> 52 return AnalysisRunBuilder(self._spark_session, df)
53
54
~/home/repository/git/oonisim/aws/venv/lib/python3.8/site-packages/pydeequ/analyzers.py in __init__(self, spark_session, df)
122 self._jspark_session = spark_session._jsparkSession
123 self._df = df
--> 124 self._AnalysisRunBuilder = self._jvm.com.amazon.deequ.analyzers.runners.AnalysisRunBuilder(df._jdf)
125
126 def addAnalyzer(self, analyzer: _AnalyzerObject):
TypeError: 'JavaPackage' object is not callable
HADOOP_CONF_DIR
将$HADOOP_HOME/etc/hadoop
的内容从Hadoop/YARN主节点复制到本地主机,并设置HADOOP_CONF_DIR
环境变量指向该目录
Ensure that HADOOP_CONF_DIR or YARN_CONF_DIR points to the directory which contains the (client side) configuration files for the Hadoop cluster. These configs are used to write to HDFS and connect to the YARN ResourceManager. The configuration contained in this directory will be distributed to the YARN cluster so that all containers used by the application use the same configuration.
os.environ['HADOOP_CONF_DIR'] = "/opt/hadoop/hadoop-3.2.2/etc/hadoop"
Python 路径
pyspark
需要能够加载 pyspark python 模块。使用 pip 或 conda 安装 pyspark 以安装 Spark 运行时库(独立)。或者从 Spark 安装中复制 pyspark python 模块 $SPARK_HOME/python/lib
。
Ensure the SPARK_HOME environment variable points to the directory where the tar file has been extracted. Update PYTHONPATH environment variable such that it can find the PySpark and Py4J under SPARK_HOME/python/lib. One example of doing this is shown below:
export PYTHONPATH=$(ZIPS=("$SPARK_HOME"/python/lib/*.zip); IFS=:; echo >"${ZIPS[*]}"):$PYTHONPATH
sys.path.extend([
"/opt/spark/spark-3.1.2/python/lib/py4j-0.10.9-src.zip",
"/opt/spark/spark-3.1.2/python/lib/pyspark.zip"
])
PyDeequ
使用 pip 或 conda 安装 pydeequ。注意,这还不足以使用pydeequ。
Deequ JAR 文件
deequ jar到库路径
要使用PyDeequ,需要deequ jar 文件。从 Maven repository com.amazon.deequ.
下载 Spark/Deequ 版本的那个import os
import sys
root = os.path.dirname(os.path.realpath(os.getcwd()))
deequ_jar = "https://repo1.maven.org/maven2/com/amazon/deequ/deequ/2.0.0-spark-3.1/deequ-2.0.0-spark-3.1.jar"
classpath = f"{root}/jar/deequ-2.0.0-spark-3.1.jar"
!wget -q -O $classpath $deequ_jar
火花Session
将 Deequ jar 文件指定为指定的 Spark jar 属性:
spark = SparkSession.builder\
.master('yarn') \
.config('spark.submit.deployMode', 'client') \
.config("spark.driver.extraClassPath", classpath) \
.config("spark.jars.packages", pydeequ.deequ_maven_coord) \
.config("spark.jars.excludes", pydeequ.f2j_maven_coord) \
.config('spark.debug.maxToStringFields', 100) \
.config('spark.executor.memory', '2g') \
.getOrCreate()
Deequ 工作
df = spark.read.csv(
path=f"file:///{root}/data/amazon_product_reviews.csv.gz",
header=True,
)
df.printSchema()
-----
root
|-- review_id: string (nullable = true)
|-- marketplace: string (nullable = true)
|-- product_id: string (nullable = true)
|-- year: string (nullable = true)
|-- star_rating: string (nullable = true)
|-- total_votes: string (nullable = true)
|-- helpful_votes: string (nullable = true)
|-- product_category: string (nullable = true)
from pydeequ.analyzers import *
analysisResult = AnalysisRunner(spark) \
.onData(df) \
.addAnalyzer(Size()) \
.addAnalyzer(Completeness("review_id")) \
.addAnalyzer(ApproxCountDistinct("review_id")) \
.addAnalyzer(Mean("star_rating")) \
.addAnalyzer(Compliance("top star_rating", "star_rating >= 4.0")) \
.addAnalyzer(Correlation("total_votes", "star_rating")) \
.addAnalyzer(Correlation("total_votes", "helpful_votes")) \
.run()
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()
-----
21/08/16 11:17:00 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
+-------+---------------+-------------------+------+
| entity| instance| name| value|
+-------+---------------+-------------------+------+
| Column| review_id| Completeness| 1.0|
| Column| review_id|ApproxCountDistinct|1040.0|
|Dataset| *| Size|1000.0|
| Column|top star_rating| Compliance| 0.657|
+-------+---------------+-------------------+------+