使用 python 将 Spark 2 与 HBase 集成连接起来的 jars

jars to connect Spark 2 with HBase integration using python

我在 Spark 2 中使用 pyspark,是否有可用的 jar 连接 HBasepyspark

请帮我看看示例代码。

您可以访问此 link 以查看 Spark 中可用的连接器。

有关此 link 的更多信息。阅读 Readme.md 了解详情。

这是您必须添加的依赖项。

你可以在sbt中添加如下依赖:

libraryDependencies += "it.nerdammer.bigdata" % "spark-hbase-connector_2.10" % "1.0.3"

依赖的Maven风格版本为:

<dependency>
  <groupId>it.nerdammer.bigdata</groupId>
  <artifactId>spark-hbase-connector_2.10</artifactId>
  <version>1.0.3</version>
</dependency>

示例代码:

val sparkConf = new SparkConf()
...
sparkConf.set("spark.hbase.host", "thehost")
...
val sc = new SparkContext(sparkConf)

写入 HBase(基本)

写入HBase非常简单。请记住导入隐式转换:

import it.nerdammer.spark.hbase._

您只需创建一个示例 RDD,如下所示:

val rdd = sc.parallelize(1 to 100)
            .map(i => (i.toString, i+1, "Hello"))

这个 rdd 由像 ("1", 2, "Hello") 或 ("27", 28, "Hello") 这样的元组组成。每个元组的第一个元素被认为是行 ID,其他元素将分配给列。

rdd.toHBaseTable("mytable")
    .toColumns("column1", "column2")
    .inColumnFamily("mycf")
    .save()

大功告成。 HBase 现在在 table mytable 中包含 100 行,每行包含列 mycf:column1 和 mycf:column2.

的两个值

从 HBase 读取(基本)

从 HBase 读取更容易。请记住导入隐式转换:

import it.nerdammer.spark.hbase._

如果要读取前面例子中写入的数据,只需要这样写:

val hBaseRDD = sc.hbaseTable[(String, Int, String)]("mytable")
    .select("column1", "column2")
    .inColumnFamily("mycf")

希望,这有帮助。

作为之前答案的替代:

我正在使用 hortonworks spark hbase 连接器。您可以在 github. They introduced the connector on a spark summit 上找到它。在演讲结束时,还有一个带有一些示例代码的现场演示。希望对你有帮助。

--- 编辑 ---

示例中的代码是用 scala 编写的,但连接器也适用于 pyspark。这里有一个 write/read 例子:

使用连接器启动 PySpark-Shell(也许您必须使用其他版本的软件包进行设置 - 查看 github 上的介绍和对话)。

pyspark --master yarn --packages com.hortonworks.shc:shc-core:1.1.0.2.6.5.2-8 --repositories http://nexus-private.hortonworks.com/nexus/content/groups/public/

创建一个sql-context并定义数据源

sqlc = SQLContext(sc)
data_source_format = 'org.apache.spark.sql.execution.datasources.hbase'

接下来我们要定义一个目录,它是连接器可读的table结构。当您可以访问 hbase 的默认命名空间时,将 YourNameSpace 替换为 default。如果没有,请插入您的可访问命名空间。

catalog = ''.join("""{
    "table":{"namespace":"YourNameSpace", "name":"TestTable", "tableCoder":"PrimitiveType"},
    "rowkey":"key",
    "columns":{
        "ID":{"cf":"rowkey", "col":"key", "type":"string"},
        "DATA":{"cf":"data", "col":"", "type":"string"}
        }
    }""".split())

要创建一个 hbase table 并在其中写入一些内容,我们创建一个带有一些数据的 suitable 数据框 ...

df = sc.parallelize([('1', 'Moin'), ('2', 'Hello'), ('3', 'Hallo')]).toDF(schema=['ID', 'DATA'])

...并将其保存到 hbase。

df.write.options(catalog=catalog, newtable = 5).format(data_source_format).save()

现在我们可以从 hbase table 中读取内容并将其保存到变量中:

df_read = sqlc.read.options(catalog=catalog).format(data_source_format).load()

检查:

>>> df_read.show()
+---+-----+                                                                     
| ID| DATA|
+---+-----+
|  1| Moin|
|  2|Hello|
|  3|Hallo|
+---+-----+

- 在 HDP 2.5 上使用 PySpark 2 测试