从一个 Hadoop 集群读取并写入另一个 Hadoop 集群

Reading from one Hadoop cluster and writing to another Hadoop custer

我是 运行 一个 spark 作业,我需要从一个 HDFS table 中读取数据,它位于 HadoopCluster-1 中。 现在我希望将聚合数据帧放入另一个 HadoopCluster-2 中的 table。 最好的方法是什么?

  1. 我正在考虑以下方法: 在将数据写入目标 table 之前,使用 addResource 读取 hdfs-site.xml 和 core-site.xml。 然后将所有配置值复制到 Map 然后将这些 conf 值设置到我的 dataset.sparkSession.SparkContext.hadoopConfiguration().

这是实现我的目标的好方法吗?

如果你想从cluster1中读取hive table作为dataframe并在转换dataframe后将其作为hive table写入cluster2,你可以尝试以下方法。

  1. 确保 hiveserver2 在两个集群上都处于 运行ning 状态。 运行 服务器的命令是

hive --service hiveserever2

hive --service metastore

  1. 确保配置单元已正确配置username/password。 您可以将 username/password 都标记为空,但您会得到一个错误,您可以通过引用此 .

    来解决该问题
  2. 现在从cluster1读取hivetable作为spark dataframe,转换后写入cluster2的hivetable

    // spark-scala code
    
    val sourceJdbcMap = Map(
     "url"->"jdbc:hive2://<source_host>:<port>", //default port is 10000
     "driver"->"org.apache.hive.jdbc.HiveDriver",
     "user"->"<username>",
     "password"->"<password>",
     "dbtable"->"<source_table>")
    
    val targetJdbcMap = Map(
     "url"->"jdbc:hive2://<target_host>:<port>", //default port is 10000
     "driver"->"org.apache.hive.jdbc.HiveDriver",
     "user"->"<username>",
     "password"->"<password>",
     "dbtable"->"<target_table>")
    
    val sourceDF = spark.read.format("jdbc").options(sourceJdbcMap).load()
    
    val transformedDF = //transformation goes here...
    
    transformedDF.write.options(targetJdbcMap).format("jdbc").save()
    

我能够通过以下步骤使用 Spark 从一个启用 HA 的 Hadoop 集群 hdfs 位置读取并写入另一个启用 HA 的 hadoop 集群 hdfs 位置:

1) 检查两个服务器中的 KDC 是相同还是不同的领域。如果相同则跳过此步骤,否则在 2 个 KDC 之间设置跨领域身份验证。 可能有人会跟随:https://community.cloudera.com/t5/Community-Articles/Setup-cross-realm-trust-between-two-MIT-KDC/ta-p/247026

场景一:这是读写的循环操作

2)按照以下步骤编辑源集群的hdfs-site.xml: https://docs.cloudera.com/HDPDocuments/HDP2/HDP-2.6.4/bk_administration/content/distcp_between_ha_clusters.html

3) 在应用程序启动时在 spark conf 中添加以下内容 属性: spark.kerberos.access.hadoopFileSystems=hdfs://targetCluster-01.xyz.com:8020 基本上,该值应该是 active namenode.

的 InetSocketAddress

4) 在您的代码中,给出目标 hdfs 位置的绝对路径。 例如:df.write.mode(SaveMode.Append).save("hdfs://targetCluster-01.xyz.com/usr/tmp/targetFolder")

注意:在第4步中,您还可以提供逻辑路径,如hdfs://targetCluster/usr/tmp/targetFolder 因为我们在我们的 hdfs-site.xml.

中添加了目标 namservice

场景2:这是一个临时请求,只需要执行一次读写操作

跳过上面提到的第 2 步。

按原样执行步骤#3 和步骤#4。

PS:作业的用户应该有权访问这两个集群才能正常工作。