使用 python 将 Spark 2 与 HBase 集成连接起来的 jars
jars to connect Spark 2 with HBase integration using python
我在 Spark 2 中使用 pyspark
,是否有可用的 jar 连接 HBase
和 pyspark
。
请帮我看看示例代码。
您可以访问此 link 以查看 Spark 中可用的连接器。
有关此 link 的更多信息。阅读 Readme.md 了解详情。
这是您必须添加的依赖项。
你可以在sbt中添加如下依赖:
libraryDependencies += "it.nerdammer.bigdata" % "spark-hbase-connector_2.10" % "1.0.3"
依赖的Maven风格版本为:
<dependency>
<groupId>it.nerdammer.bigdata</groupId>
<artifactId>spark-hbase-connector_2.10</artifactId>
<version>1.0.3</version>
</dependency>
示例代码:
val sparkConf = new SparkConf()
...
sparkConf.set("spark.hbase.host", "thehost")
...
val sc = new SparkContext(sparkConf)
写入 HBase(基本)
写入HBase非常简单。请记住导入隐式转换:
import it.nerdammer.spark.hbase._
您只需创建一个示例 RDD,如下所示:
val rdd = sc.parallelize(1 to 100)
.map(i => (i.toString, i+1, "Hello"))
这个 rdd 由像 ("1", 2, "Hello") 或 ("27", 28, "Hello") 这样的元组组成。每个元组的第一个元素被认为是行 ID,其他元素将分配给列。
rdd.toHBaseTable("mytable")
.toColumns("column1", "column2")
.inColumnFamily("mycf")
.save()
大功告成。 HBase 现在在 table mytable 中包含 100 行,每行包含列 mycf:column1 和 mycf:column2.
的两个值
从 HBase 读取(基本)
从 HBase 读取更容易。请记住导入隐式转换:
import it.nerdammer.spark.hbase._
如果要读取前面例子中写入的数据,只需要这样写:
val hBaseRDD = sc.hbaseTable[(String, Int, String)]("mytable")
.select("column1", "column2")
.inColumnFamily("mycf")
希望,这有帮助。
作为之前答案的替代:
我正在使用 hortonworks spark hbase 连接器。您可以在 github. They introduced the connector on a spark summit 上找到它。在演讲结束时,还有一个带有一些示例代码的现场演示。希望对你有帮助。
--- 编辑 ---
示例中的代码是用 scala 编写的,但连接器也适用于 pyspark。这里有一个 write/read 例子:
使用连接器启动 PySpark-Shell(也许您必须使用其他版本的软件包进行设置 - 查看 github 上的介绍和对话)。
pyspark --master yarn --packages com.hortonworks.shc:shc-core:1.1.0.2.6.5.2-8 --repositories http://nexus-private.hortonworks.com/nexus/content/groups/public/
创建一个sql-context并定义数据源
sqlc = SQLContext(sc)
data_source_format = 'org.apache.spark.sql.execution.datasources.hbase'
接下来我们要定义一个目录,它是连接器可读的table结构。当您可以访问 hbase 的默认命名空间时,将 YourNameSpace 替换为 default。如果没有,请插入您的可访问命名空间。
catalog = ''.join("""{
"table":{"namespace":"YourNameSpace", "name":"TestTable", "tableCoder":"PrimitiveType"},
"rowkey":"key",
"columns":{
"ID":{"cf":"rowkey", "col":"key", "type":"string"},
"DATA":{"cf":"data", "col":"", "type":"string"}
}
}""".split())
要创建一个 hbase table 并在其中写入一些内容,我们创建一个带有一些数据的 suitable 数据框 ...
df = sc.parallelize([('1', 'Moin'), ('2', 'Hello'), ('3', 'Hallo')]).toDF(schema=['ID', 'DATA'])
...并将其保存到 hbase。
df.write.options(catalog=catalog, newtable = 5).format(data_source_format).save()
现在我们可以从 hbase table 中读取内容并将其保存到变量中:
df_read = sqlc.read.options(catalog=catalog).format(data_source_format).load()
检查:
>>> df_read.show()
+---+-----+
| ID| DATA|
+---+-----+
| 1| Moin|
| 2|Hello|
| 3|Hallo|
+---+-----+
- 在 HDP 2.5 上使用 PySpark 2 测试
我在 Spark 2 中使用 pyspark
,是否有可用的 jar 连接 HBase
和 pyspark
。
请帮我看看示例代码。
您可以访问此 link 以查看 Spark 中可用的连接器。
有关此 link 的更多信息。阅读 Readme.md 了解详情。
这是您必须添加的依赖项。
你可以在sbt中添加如下依赖:
libraryDependencies += "it.nerdammer.bigdata" % "spark-hbase-connector_2.10" % "1.0.3"
依赖的Maven风格版本为:
<dependency>
<groupId>it.nerdammer.bigdata</groupId>
<artifactId>spark-hbase-connector_2.10</artifactId>
<version>1.0.3</version>
</dependency>
示例代码:
val sparkConf = new SparkConf()
...
sparkConf.set("spark.hbase.host", "thehost")
...
val sc = new SparkContext(sparkConf)
写入 HBase(基本)
写入HBase非常简单。请记住导入隐式转换:
import it.nerdammer.spark.hbase._
您只需创建一个示例 RDD,如下所示:
val rdd = sc.parallelize(1 to 100)
.map(i => (i.toString, i+1, "Hello"))
这个 rdd 由像 ("1", 2, "Hello") 或 ("27", 28, "Hello") 这样的元组组成。每个元组的第一个元素被认为是行 ID,其他元素将分配给列。
rdd.toHBaseTable("mytable")
.toColumns("column1", "column2")
.inColumnFamily("mycf")
.save()
大功告成。 HBase 现在在 table mytable 中包含 100 行,每行包含列 mycf:column1 和 mycf:column2.
的两个值从 HBase 读取(基本)
从 HBase 读取更容易。请记住导入隐式转换:
import it.nerdammer.spark.hbase._
如果要读取前面例子中写入的数据,只需要这样写:
val hBaseRDD = sc.hbaseTable[(String, Int, String)]("mytable")
.select("column1", "column2")
.inColumnFamily("mycf")
希望,这有帮助。
作为之前答案的替代:
我正在使用 hortonworks spark hbase 连接器。您可以在 github. They introduced the connector on a spark summit 上找到它。在演讲结束时,还有一个带有一些示例代码的现场演示。希望对你有帮助。
--- 编辑 ---
示例中的代码是用 scala 编写的,但连接器也适用于 pyspark。这里有一个 write/read 例子:
使用连接器启动 PySpark-Shell(也许您必须使用其他版本的软件包进行设置 - 查看 github 上的介绍和对话)。
pyspark --master yarn --packages com.hortonworks.shc:shc-core:1.1.0.2.6.5.2-8 --repositories http://nexus-private.hortonworks.com/nexus/content/groups/public/
创建一个sql-context并定义数据源
sqlc = SQLContext(sc)
data_source_format = 'org.apache.spark.sql.execution.datasources.hbase'
接下来我们要定义一个目录,它是连接器可读的table结构。当您可以访问 hbase 的默认命名空间时,将 YourNameSpace 替换为 default。如果没有,请插入您的可访问命名空间。
catalog = ''.join("""{
"table":{"namespace":"YourNameSpace", "name":"TestTable", "tableCoder":"PrimitiveType"},
"rowkey":"key",
"columns":{
"ID":{"cf":"rowkey", "col":"key", "type":"string"},
"DATA":{"cf":"data", "col":"", "type":"string"}
}
}""".split())
要创建一个 hbase table 并在其中写入一些内容,我们创建一个带有一些数据的 suitable 数据框 ...
df = sc.parallelize([('1', 'Moin'), ('2', 'Hello'), ('3', 'Hallo')]).toDF(schema=['ID', 'DATA'])
...并将其保存到 hbase。
df.write.options(catalog=catalog, newtable = 5).format(data_source_format).save()
现在我们可以从 hbase table 中读取内容并将其保存到变量中:
df_read = sqlc.read.options(catalog=catalog).format(data_source_format).load()
检查:
>>> df_read.show()
+---+-----+
| ID| DATA|
+---+-----+
| 1| Moin|
| 2|Hello|
| 3|Hallo|
+---+-----+
- 在 HDP 2.5 上使用 PySpark 2 测试