JAVA Cloud DataProc + Spark + Cloud BigTable 的依赖性问题
Dependency Issues for Cloud DataProc + Spark + Cloud BigTable with JAVA
我需要在 Cloud DataProc 上为 运行 创建一个应用程序,并使用 Spark 以大规模并行方式处理大型 BigTable 写入、扫描和删除。这可能在 JAVA(或 Python,如果可行的话)。
我正在尝试使用 Eclipse 编写最少的代码来实现从 BigTable table 中获取 RDD
的基本功能,方法是使用 bulkPut
/bulkDelete
/butkGet
或使用 newAPIHadoopRDD()
或类似的东西。
我在 SO 和其他地方看到了多个 post 如何做到这一点以及连接 Bigtable API、HBase API 和火花。其中一些 post 到现在已经很过时了(几岁了,所以可能不相关)。到目前为止,我还没有设法使任何工作正常进行,主要是由于各种依赖冲突或不一致。无论我在 POM.XML 中尝试何种依赖项和版本的组合,当我尝试 运行ning 东西时,我都会得到 ClassNotFound 或 NoSuchMethod 异常。
请问我需要包含哪些 'working' Spark、HBase 和 Bigtable 依赖版本和包的组合?我的 POM.xml 目前如下所示。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>FFSpark5</groupId>
<artifactId>FFSpark5</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<bigtable.version>1.0.0</bigtable.version>
<hbase.version>1.3.1</hbase.version>
<hbase-shaded.version>2.0.0-alpha2</hbase-shaded.version>
<hbase-spark.version>2.0.0-alpha4</hbase-spark.version>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
<spark.version>1.6.2</spark.version>
<spark-streaming.version>1.6.2</spark-streaming.version>
<scala.version>2.11.0</scala.version>
<scalatest.version>3.0.3</scalatest.version>
<bigtable.projectID>my_gcp_project_id</bigtable.projectID>
<bigtable.instanceID>my_bigtable_instance_name</bigtable.instanceID>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark-streaming.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-spark</artifactId>
<version>${hbase-spark.version}</version>
</dependency>
<dependency>
<groupId>com.google.cloud.bigtable</groupId>
<artifactId>bigtable-hbase-1.x-hadoop</artifactId>
<version>${bigtable.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-shaded-server</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-shaded-client</artifactId>
<version>${hbase-shaded.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-shaded-server</artifactId>
<version>${hbase-shaded.version}</version>
</dependency>
</dependencies>
<build>
<outputDirectory>target/classes</outputDirectory>
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
</resources>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>FFSpark5</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>
我意识到这个版本的 POM.xml 中可能有很多错误的东西,但我尝试了很多依赖项和版本的组合,但无法让它们中的任何一个真正起作用。这个最新的似乎在日志输出方面走得最远,但仍然中断。这是最新的堆栈跟踪:
18/03/12 15:37:17 INFO BigtableSession: Bigtable options: BigtableOptions{dataHost=bigtable.googleapis.com, tableAdminHost=bigtableadmin.googleapis.com, instanceAdminHost=bigtableadmin.googleapis.com .... (lost of other options here)}.
18/03/12 15:37:17 INFO RefreshingOAuth2CredentialsInterceptor: Refreshing the OAuth token
18/03/12 15:37:19 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 210.6 KB, free 210.6 KB)
18/03/12 15:37:19 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 19.9 KB, free 230.5 KB)
18/03/12 15:37:19 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:58982 (size: 19.9 KB, free: 457.9 MB)
18/03/12 15:37:19 INFO SparkContext: Created broadcast 0 from broadcast at HBaseContext.scala:73
18/03/12 15:37:19 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 160.0 B, free 230.6 KB)
18/03/12 15:37:19 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 120.0 B, free 230.7 KB)
18/03/12 15:37:19 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:58982 (size: 120.0 B, free: 457.9 MB)
18/03/12 15:37:19 INFO SparkContext: Created broadcast 1 from broadcast at HBaseContext.scala:74
Direct test done
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.Scan.getMvccReadPoint()J
at org.apache.hadoop.hbase.client.PackagePrivateFieldAccessor.getMvccReadPoint(PackagePrivateFieldAccessor.java:39)
at org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toScan(ProtobufUtil.java:1088)
at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.convertScanToString(TableMapReduceUtil.java:601)
at FFSpark5.main(FFSpark5.java:64)
下面是我的基本代码。这个想法是做 3 个测试:
测试 1:简单地尝试通过简单的 Bigtable API 直接访问 Bigtable,只是为了确保没有简单的问题,如身份验证等。这工作正常
测试 2:尝试获取新的APIHadoopRDD()。这失败了
测试 3:尝试获取 bulkPut()。这失败了
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.spark.JavaHBaseContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.client.Connection;
class PutFunction implements Function<String, Put> {
private static final long serialVersionUID = 1L;
public Put call(String v) throws Exception {
String[] cells = v.split(",");
Put put = new Put(Bytes.toBytes(cells[0]));
put.addColumn(Bytes.toBytes(cells[1]), Bytes.toBytes(cells[2]),
Bytes.toBytes(cells[3]));
return put;
}
}
public class FFSpark5
{
public static void main(String args[]) throws IOException
{
SparkConf conf = new SparkConf().setAppName("SparkTest").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
Configuration hBaseConf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(hBaseConf);
JavaHBaseContext hbaseContext = new JavaHBaseContext(sc, hBaseConf);
// test 1: simple direct Bigtable access
connection.getTable(TableName.valueOf("FFTransFlat".getBytes()))
.put(new Put("abc".getBytes())
.addColumn("CI".getBytes(), "I".getBytes(), "val".getBytes()));
System.out.println("Direct test done");
// Test 2: newAPIHadoopRDD()
Scan scan1 = new Scan();
scan1.setCaching(500);
scan1.setCacheBlocks(false);
hBaseConf.set(TableInputFormat.INPUT_TABLE, "FFTransFlat");
hBaseConf.set(TableInputFormat.SCAN, TableMapReduceUtil.convertScanToString(scan1));
JavaPairRDD<ImmutableBytesWritable, Result> source = sc
.newAPIHadoopRDD(hBaseConf, TableInputFormat.class,
ImmutableBytesWritable.class, Result.class);
System.out.println(source.count());
// Test 3: bulkPut()
List<String> list = new ArrayList<String>(5);
list.add("1,CI,a,1");
list.add("2,CI,a,2");
list.add("3,CI,a,3");
JavaRDD<String> rdd = sc.parallelize(list);
byte tableName[] = "FFTransFlat".getBytes();
hbaseContext.bulkPut(rdd,
TableName.valueOf(tableName),
new PutFunction());
System.out.println(source.count());
connection.close();
}
}
我在DataProc网站上看到支持Spark 2.2.0和1.6.2。我在使用 2.2.0 时遇到问题,所以我使用的是 1.6.2。
我可以就以下方面获得一些建议吗:
要使用的依赖项和版本的正确组合是什么(专门用于 Cloud Bigtable,而不是 HBase 集群)
建议使用 newAPIHadoopRDD
或类似 bulkRead()/bulkDelete()
/等来实现并行化。 ?或者是否有另一种首选和高效的方法来使用 DataProc/Bigtable 进行 MPP?
为冗长道歉 post -- 这是我们第一次尝试 DataProc。
*** 更新:
在将 Bigtable 依赖项更新为 bigtable-hbase-2.x-hadoop
并将 HBase 版本更新为 2.0.0-alpha2
后,我设法让一些东西正常工作。至少 bulkPut 似乎在这个阶段起作用。现在将致力于从依赖项中清除不需要的东西。
Here is a full working example of Cloud Bigtable with Hortonwork's SHC which is based on HBase 1.. We will work on creating a similar example with HBase 2.'s new Spark integration built on Cloud Bigtable artifacts designed to work with the new HBase 2.* APIs (tracking issue link).
我需要在 Cloud DataProc 上为 运行 创建一个应用程序,并使用 Spark 以大规模并行方式处理大型 BigTable 写入、扫描和删除。这可能在 JAVA(或 Python,如果可行的话)。
我正在尝试使用 Eclipse 编写最少的代码来实现从 BigTable table 中获取 RDD
的基本功能,方法是使用 bulkPut
/bulkDelete
/butkGet
或使用 newAPIHadoopRDD()
或类似的东西。
我在 SO 和其他地方看到了多个 post 如何做到这一点以及连接 Bigtable API、HBase API 和火花。其中一些 post 到现在已经很过时了(几岁了,所以可能不相关)。到目前为止,我还没有设法使任何工作正常进行,主要是由于各种依赖冲突或不一致。无论我在 POM.XML 中尝试何种依赖项和版本的组合,当我尝试 运行ning 东西时,我都会得到 ClassNotFound 或 NoSuchMethod 异常。
请问我需要包含哪些 'working' Spark、HBase 和 Bigtable 依赖版本和包的组合?我的 POM.xml 目前如下所示。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>FFSpark5</groupId>
<artifactId>FFSpark5</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<bigtable.version>1.0.0</bigtable.version>
<hbase.version>1.3.1</hbase.version>
<hbase-shaded.version>2.0.0-alpha2</hbase-shaded.version>
<hbase-spark.version>2.0.0-alpha4</hbase-spark.version>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
<spark.version>1.6.2</spark.version>
<spark-streaming.version>1.6.2</spark-streaming.version>
<scala.version>2.11.0</scala.version>
<scalatest.version>3.0.3</scalatest.version>
<bigtable.projectID>my_gcp_project_id</bigtable.projectID>
<bigtable.instanceID>my_bigtable_instance_name</bigtable.instanceID>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark-streaming.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-spark</artifactId>
<version>${hbase-spark.version}</version>
</dependency>
<dependency>
<groupId>com.google.cloud.bigtable</groupId>
<artifactId>bigtable-hbase-1.x-hadoop</artifactId>
<version>${bigtable.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-shaded-server</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-shaded-client</artifactId>
<version>${hbase-shaded.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-shaded-server</artifactId>
<version>${hbase-shaded.version}</version>
</dependency>
</dependencies>
<build>
<outputDirectory>target/classes</outputDirectory>
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
</resources>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>FFSpark5</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>
我意识到这个版本的 POM.xml 中可能有很多错误的东西,但我尝试了很多依赖项和版本的组合,但无法让它们中的任何一个真正起作用。这个最新的似乎在日志输出方面走得最远,但仍然中断。这是最新的堆栈跟踪:
18/03/12 15:37:17 INFO BigtableSession: Bigtable options: BigtableOptions{dataHost=bigtable.googleapis.com, tableAdminHost=bigtableadmin.googleapis.com, instanceAdminHost=bigtableadmin.googleapis.com .... (lost of other options here)}.
18/03/12 15:37:17 INFO RefreshingOAuth2CredentialsInterceptor: Refreshing the OAuth token
18/03/12 15:37:19 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 210.6 KB, free 210.6 KB)
18/03/12 15:37:19 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 19.9 KB, free 230.5 KB)
18/03/12 15:37:19 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:58982 (size: 19.9 KB, free: 457.9 MB)
18/03/12 15:37:19 INFO SparkContext: Created broadcast 0 from broadcast at HBaseContext.scala:73
18/03/12 15:37:19 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 160.0 B, free 230.6 KB)
18/03/12 15:37:19 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 120.0 B, free 230.7 KB)
18/03/12 15:37:19 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:58982 (size: 120.0 B, free: 457.9 MB)
18/03/12 15:37:19 INFO SparkContext: Created broadcast 1 from broadcast at HBaseContext.scala:74
Direct test done
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.Scan.getMvccReadPoint()J
at org.apache.hadoop.hbase.client.PackagePrivateFieldAccessor.getMvccReadPoint(PackagePrivateFieldAccessor.java:39)
at org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toScan(ProtobufUtil.java:1088)
at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.convertScanToString(TableMapReduceUtil.java:601)
at FFSpark5.main(FFSpark5.java:64)
下面是我的基本代码。这个想法是做 3 个测试:
测试 1:简单地尝试通过简单的 Bigtable API 直接访问 Bigtable,只是为了确保没有简单的问题,如身份验证等。这工作正常
测试 2:尝试获取新的APIHadoopRDD()。这失败了
测试 3:尝试获取 bulkPut()。这失败了
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.spark.JavaHBaseContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.client.Connection;
class PutFunction implements Function<String, Put> {
private static final long serialVersionUID = 1L;
public Put call(String v) throws Exception {
String[] cells = v.split(",");
Put put = new Put(Bytes.toBytes(cells[0]));
put.addColumn(Bytes.toBytes(cells[1]), Bytes.toBytes(cells[2]),
Bytes.toBytes(cells[3]));
return put;
}
}
public class FFSpark5
{
public static void main(String args[]) throws IOException
{
SparkConf conf = new SparkConf().setAppName("SparkTest").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
Configuration hBaseConf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(hBaseConf);
JavaHBaseContext hbaseContext = new JavaHBaseContext(sc, hBaseConf);
// test 1: simple direct Bigtable access
connection.getTable(TableName.valueOf("FFTransFlat".getBytes()))
.put(new Put("abc".getBytes())
.addColumn("CI".getBytes(), "I".getBytes(), "val".getBytes()));
System.out.println("Direct test done");
// Test 2: newAPIHadoopRDD()
Scan scan1 = new Scan();
scan1.setCaching(500);
scan1.setCacheBlocks(false);
hBaseConf.set(TableInputFormat.INPUT_TABLE, "FFTransFlat");
hBaseConf.set(TableInputFormat.SCAN, TableMapReduceUtil.convertScanToString(scan1));
JavaPairRDD<ImmutableBytesWritable, Result> source = sc
.newAPIHadoopRDD(hBaseConf, TableInputFormat.class,
ImmutableBytesWritable.class, Result.class);
System.out.println(source.count());
// Test 3: bulkPut()
List<String> list = new ArrayList<String>(5);
list.add("1,CI,a,1");
list.add("2,CI,a,2");
list.add("3,CI,a,3");
JavaRDD<String> rdd = sc.parallelize(list);
byte tableName[] = "FFTransFlat".getBytes();
hbaseContext.bulkPut(rdd,
TableName.valueOf(tableName),
new PutFunction());
System.out.println(source.count());
connection.close();
}
}
我在DataProc网站上看到支持Spark 2.2.0和1.6.2。我在使用 2.2.0 时遇到问题,所以我使用的是 1.6.2。
我可以就以下方面获得一些建议吗: 要使用的依赖项和版本的正确组合是什么(专门用于 Cloud Bigtable,而不是 HBase 集群)
建议使用 newAPIHadoopRDD
或类似 bulkRead()/bulkDelete()
/等来实现并行化。 ?或者是否有另一种首选和高效的方法来使用 DataProc/Bigtable 进行 MPP?
为冗长道歉 post -- 这是我们第一次尝试 DataProc。
*** 更新:
在将 Bigtable 依赖项更新为 bigtable-hbase-2.x-hadoop
并将 HBase 版本更新为 2.0.0-alpha2
后,我设法让一些东西正常工作。至少 bulkPut 似乎在这个阶段起作用。现在将致力于从依赖项中清除不需要的东西。
Here is a full working example of Cloud Bigtable with Hortonwork's SHC which is based on HBase 1.. We will work on creating a similar example with HBase 2.'s new Spark integration built on Cloud Bigtable artifacts designed to work with the new HBase 2.* APIs (tracking issue link).