加载失败:com/amazon/deequ/checks/Check
Failed to load : com/amazon/deequ/checks/Check
我正在构建一个 spark 应用程序来加载两个 json 文件,比较它们并打印差异。我还尝试使用亚马逊库 aws deequ
验证这些文件,但出现以下异常:
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
20/08/07 11:56:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Error: Failed to load com.deeq.CompareDataFrames: com/amazon/deequ/checks/Check
log4j:WARN No appenders could be found for logger (org.apache.spark.util.ShutdownHookManager).
log4j:WARN Please
当我将作业提交到 spark 时:
./spark-submit --class com.deeq.CompareDataFrames--master
spark://saif-VirtualBox:7077 ~/Downloads/deeq-trial-1.0-SNAPSHOT.jar
我正在使用 Ubuntu 来托管 spark,在我将 deequ 添加到 运行 一些验证之前,它工作没有任何问题。我想知道我是否在部署过程中遗漏了什么。这个错误似乎不是互联网上众所周知的错误。
代码:
import com.amazon.deequ.VerificationResult;
import com.amazon.deequ.VerificationSuite;
import com.amazon.deequ.checks.Check;
import com.amazon.deequ.checks.CheckLevel;
import com.amazon.deequ.checks.CheckStatus;
import com.amazon.deequ.constraints.Constraint;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Option;
import scala.Tuple2;
import scala.collection.mutable.ArraySeq;
import scala.collection.mutable.Seq;
public class CompareDataFrames {
public static void main(String[] args) {
SparkSession session = SparkSession.builder().appName("CompareDataFrames").getOrCreate();
session.sparkContext().setLogLevel("ALL");
StructType schema = DataTypes.createStructType(new StructField[]{
DataTypes.createStructField("CUST_ID", DataTypes.StringType, true),
DataTypes.createStructField("RECORD_LOCATOR_ID", DataTypes.StringType, true),
DataTypes.createStructField("EVNT_ID", DataTypes.StringType, true)
});
Dataset<Row> first = session.read().option("multiline", "true").schema(schema).json("/home/saif/Downloads/FILE_DEV1.json");
System.out.println("======= DataSet 1 =======");
first.printSchema();
first.show(false);
Dataset<Row> second = session.read().option("multiline", "true").schema(schema).json("/home/saif/Downloads/FILE_DEV2.json");
System.out.println("======= DataSet 2 =======");
second.printSchema();
second.show(false);
// This will show all the rows which are present in the first dataset
// but not present in the second dataset. But the comparison is at row
// level and not at column level.
System.out.println("======= Expect =======");
first.except(second).show();
StructType one = first.schema();
JavaPairRDD<String, Row> pair1 = first.toJavaRDD().mapToPair((PairFunction<Row, String, Row>)
row -> new Tuple2<>(row.getString(1), row));
JavaPairRDD<String, Row> pair2 = second.toJavaRDD().mapToPair((PairFunction<Row, String, Row>)
row -> new Tuple2<>(row.getString(1), row));
System.out.println("======= Pair1 & Pair2 were created =======");
JavaPairRDD<String, Row> subs = pair1.subtractByKey(pair2);
JavaRDD<Row> rdd = subs.values();
Dataset<Row> diff = session.createDataFrame(rdd, one);
System.out.println("======= Diff Show =======");
diff.show();
Seq<Constraint> cons = new ArraySeq<>(0);
VerificationResult vr = new VerificationSuite().onData(first)
.addCheck(new Check(CheckLevel.Error(), "unit test", cons)
.isComplete("EVNT_ID", Option.empty())
)
.run();
Seq<Check> checkSeq = new ArraySeq<>(0);
if (vr.status() != CheckStatus.Success()) {
Dataset<Row> vrr = vr.checkResultsAsDataFrame(session, vr, checkSeq);
vrr.show(false);
}
}
}
**专家:**
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.0.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.amazon.deequ</groupId>
<artifactId>deequ</artifactId>
<version>1.0.4</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.13.3</version>
</dependency>
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-java8-compat_2.13</artifactId>
<version>0.9.1</version>
</dependency>
请按照以下方法解决您的问题。
方法一
spark 提交带有 --jars
选项,
从以下 Maven Repo 下载 jar 到您的机器,https://mvnrepository.com/artifact/com.amazon.deequ/deequ/1.0.4 到 ~/Downloads/deequ-1.0.4.jar
./spark-submit --class com.deeq.CompareDataFrames --master
spark://saif-VirtualBox:7077 --jars ~/Downloads/deequ-1.0.4.jar ~/Downloads/deeq-trial-1.0-SNAPSHOT.jar
方法二
spark 提交 --packages
选项,
./spark-submit --class com.deeq.CompareDataFrames --master
spark://saif-VirtualBox:7077 --packages com.amazon.deequ:deequ:1.0.4 ~/Downloads/deeq-trial-1.0-SNAPSHOT.jar
备注:
- 仅当必须引用某些自定义存储库时才需要
--repositories
选项
如果未提供 --repositories
选项,则默认使用 maven 中央存储库
当指定 --packages
选项时,提交操作会尝试在 ~/.ivy2/cache
、~/.ivy2/jars
、~/.m2/repository
目录中查找包及其依赖项。
如果找不到,则使用 ivy 从 maven central 下载它们并存储在 ~/.ivy2
目录下。
编辑 1:
方法三:
如果上述解决方案 1 和 2 不起作用,则使用 maven-shade-plugin
构建 uber jar
并继续 spark-submit
。
使用下面的 pom.xml
文件使用 maven-shade-plugin
构建 uber jar。添加下面的 pom 并重建你的 jar 并使用 spark-submit
.
部署它
spark-submit --class com.deeq.CompareDataFrames --master
spark://saif-VirtualBox:7077 ~/Downloads/deeq-trial-1.0-SNAPSHOT.jar
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.deeq</groupId>
<artifactId>deeq-trial-1.0-SNAPSHOT</artifactId>
<version>1.0</version>
<name>Spark-3.0 Spark Application</name>
<url>https://maven.apache.org</url>
<repositories>
<repository>
<id>codelds</id>
<url>https://code.lds.org/nexus/content/groups/main-repo</url>
</repository>
<repository>
<id>central</id>
<name>Maven Repository Switchboard</name>
<layout>default</layout>
<url>https://repo1.maven.org/maven2</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.12.8</scala.version>
<java.version>1.8</java.version>
<CodeCacheSize>512m</CodeCacheSize>
<es.version>2.4.6</es.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.0.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.amazon.deequ</groupId>
<artifactId>deequ</artifactId>
<version>1.0.4</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.13.3</version>
</dependency>
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-java8-compat_2.13</artifactId>
<version>0.9.1</version>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
</resources>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<id>eclipse-add-source</id>
<goals>
<goal>add-source</goal>
</goals>
</execution>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile-first</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
<execution>
<id>attach-scaladocs</id>
<phase>verify</phase>
<goals>
<goal>doc-jar</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
<recompileMode>incremental</recompileMode>
<useZincServer>true</useZincServer>
<args>
<arg>-unchecked</arg>
<arg>-deprecation</arg>
<arg>-feature</arg>
</args>
<jvmArgs>
<jvmArg>-Xms1024m</jvmArg>
<jvmArg>-Xmx1024m</jvmArg>
<jvmArg>-XX:ReservedCodeCacheSize=${CodeCacheSize}</jvmArg>
</jvmArgs>
<javacArgs>
<javacArg>-source</javacArg>
<javacArg>${java.version}</javacArg>
<javacArg>-target</javacArg>
<javacArg>${java.version}</javacArg>
<javacArg>-Xlint:all,-serial,-path</javacArg>
</javacArgs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.xerial.snappy</exclude>
<exclude>org.scala-lang.modules</exclude>
<exclude>org.scala-lang</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<relocations>
<relocation>
<pattern>com.google.common</pattern>
<shadedPattern>shaded.com.google.common</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
我正在构建一个 spark 应用程序来加载两个 json 文件,比较它们并打印差异。我还尝试使用亚马逊库 aws deequ
验证这些文件,但出现以下异常:
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
20/08/07 11:56:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Error: Failed to load com.deeq.CompareDataFrames: com/amazon/deequ/checks/Check
log4j:WARN No appenders could be found for logger (org.apache.spark.util.ShutdownHookManager).
log4j:WARN Please
当我将作业提交到 spark 时:
./spark-submit --class com.deeq.CompareDataFrames--master
spark://saif-VirtualBox:7077 ~/Downloads/deeq-trial-1.0-SNAPSHOT.jar
我正在使用 Ubuntu 来托管 spark,在我将 deequ 添加到 运行 一些验证之前,它工作没有任何问题。我想知道我是否在部署过程中遗漏了什么。这个错误似乎不是互联网上众所周知的错误。
代码:
import com.amazon.deequ.VerificationResult;
import com.amazon.deequ.VerificationSuite;
import com.amazon.deequ.checks.Check;
import com.amazon.deequ.checks.CheckLevel;
import com.amazon.deequ.checks.CheckStatus;
import com.amazon.deequ.constraints.Constraint;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Option;
import scala.Tuple2;
import scala.collection.mutable.ArraySeq;
import scala.collection.mutable.Seq;
public class CompareDataFrames {
public static void main(String[] args) {
SparkSession session = SparkSession.builder().appName("CompareDataFrames").getOrCreate();
session.sparkContext().setLogLevel("ALL");
StructType schema = DataTypes.createStructType(new StructField[]{
DataTypes.createStructField("CUST_ID", DataTypes.StringType, true),
DataTypes.createStructField("RECORD_LOCATOR_ID", DataTypes.StringType, true),
DataTypes.createStructField("EVNT_ID", DataTypes.StringType, true)
});
Dataset<Row> first = session.read().option("multiline", "true").schema(schema).json("/home/saif/Downloads/FILE_DEV1.json");
System.out.println("======= DataSet 1 =======");
first.printSchema();
first.show(false);
Dataset<Row> second = session.read().option("multiline", "true").schema(schema).json("/home/saif/Downloads/FILE_DEV2.json");
System.out.println("======= DataSet 2 =======");
second.printSchema();
second.show(false);
// This will show all the rows which are present in the first dataset
// but not present in the second dataset. But the comparison is at row
// level and not at column level.
System.out.println("======= Expect =======");
first.except(second).show();
StructType one = first.schema();
JavaPairRDD<String, Row> pair1 = first.toJavaRDD().mapToPair((PairFunction<Row, String, Row>)
row -> new Tuple2<>(row.getString(1), row));
JavaPairRDD<String, Row> pair2 = second.toJavaRDD().mapToPair((PairFunction<Row, String, Row>)
row -> new Tuple2<>(row.getString(1), row));
System.out.println("======= Pair1 & Pair2 were created =======");
JavaPairRDD<String, Row> subs = pair1.subtractByKey(pair2);
JavaRDD<Row> rdd = subs.values();
Dataset<Row> diff = session.createDataFrame(rdd, one);
System.out.println("======= Diff Show =======");
diff.show();
Seq<Constraint> cons = new ArraySeq<>(0);
VerificationResult vr = new VerificationSuite().onData(first)
.addCheck(new Check(CheckLevel.Error(), "unit test", cons)
.isComplete("EVNT_ID", Option.empty())
)
.run();
Seq<Check> checkSeq = new ArraySeq<>(0);
if (vr.status() != CheckStatus.Success()) {
Dataset<Row> vrr = vr.checkResultsAsDataFrame(session, vr, checkSeq);
vrr.show(false);
}
}
}
**专家:**
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.0.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.amazon.deequ</groupId>
<artifactId>deequ</artifactId>
<version>1.0.4</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.13.3</version>
</dependency>
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-java8-compat_2.13</artifactId>
<version>0.9.1</version>
</dependency>
请按照以下方法解决您的问题。
方法一
spark 提交带有 --jars
选项,
从以下 Maven Repo 下载 jar 到您的机器,https://mvnrepository.com/artifact/com.amazon.deequ/deequ/1.0.4 到 ~/Downloads/deequ-1.0.4.jar
./spark-submit --class com.deeq.CompareDataFrames --master
spark://saif-VirtualBox:7077 --jars ~/Downloads/deequ-1.0.4.jar ~/Downloads/deeq-trial-1.0-SNAPSHOT.jar
方法二
spark 提交 --packages
选项,
./spark-submit --class com.deeq.CompareDataFrames --master
spark://saif-VirtualBox:7077 --packages com.amazon.deequ:deequ:1.0.4 ~/Downloads/deeq-trial-1.0-SNAPSHOT.jar
备注:
- 仅当必须引用某些自定义存储库时才需要
--repositories
选项 如果未提供--repositories
选项,则默认使用 maven 中央存储库 当指定--packages
选项时,提交操作会尝试在~/.ivy2/cache
、~/.ivy2/jars
、~/.m2/repository
目录中查找包及其依赖项。 如果找不到,则使用 ivy 从 maven central 下载它们并存储在~/.ivy2
目录下。
编辑 1:
方法三:
如果上述解决方案 1 和 2 不起作用,则使用 maven-shade-plugin
构建 uber jar
并继续 spark-submit
。
使用下面的 pom.xml
文件使用 maven-shade-plugin
构建 uber jar。添加下面的 pom 并重建你的 jar 并使用 spark-submit
.
spark-submit --class com.deeq.CompareDataFrames --master
spark://saif-VirtualBox:7077 ~/Downloads/deeq-trial-1.0-SNAPSHOT.jar
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.deeq</groupId>
<artifactId>deeq-trial-1.0-SNAPSHOT</artifactId>
<version>1.0</version>
<name>Spark-3.0 Spark Application</name>
<url>https://maven.apache.org</url>
<repositories>
<repository>
<id>codelds</id>
<url>https://code.lds.org/nexus/content/groups/main-repo</url>
</repository>
<repository>
<id>central</id>
<name>Maven Repository Switchboard</name>
<layout>default</layout>
<url>https://repo1.maven.org/maven2</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.12.8</scala.version>
<java.version>1.8</java.version>
<CodeCacheSize>512m</CodeCacheSize>
<es.version>2.4.6</es.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.0.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.amazon.deequ</groupId>
<artifactId>deequ</artifactId>
<version>1.0.4</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.13.3</version>
</dependency>
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-java8-compat_2.13</artifactId>
<version>0.9.1</version>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
</resources>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<id>eclipse-add-source</id>
<goals>
<goal>add-source</goal>
</goals>
</execution>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile-first</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
<execution>
<id>attach-scaladocs</id>
<phase>verify</phase>
<goals>
<goal>doc-jar</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
<recompileMode>incremental</recompileMode>
<useZincServer>true</useZincServer>
<args>
<arg>-unchecked</arg>
<arg>-deprecation</arg>
<arg>-feature</arg>
</args>
<jvmArgs>
<jvmArg>-Xms1024m</jvmArg>
<jvmArg>-Xmx1024m</jvmArg>
<jvmArg>-XX:ReservedCodeCacheSize=${CodeCacheSize}</jvmArg>
</jvmArgs>
<javacArgs>
<javacArg>-source</javacArg>
<javacArg>${java.version}</javacArg>
<javacArg>-target</javacArg>
<javacArg>${java.version}</javacArg>
<javacArg>-Xlint:all,-serial,-path</javacArg>
</javacArgs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.xerial.snappy</exclude>
<exclude>org.scala-lang.modules</exclude>
<exclude>org.scala-lang</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<relocations>
<relocation>
<pattern>com.google.common</pattern>
<shadedPattern>shaded.com.google.common</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>