如何在 Cloudera 5.x 上使用 pyspark 从 DataTap 读取数据?
How to read data from DataTap using pyspark on Cloudera 5.x?
我创建了一个 Cloudera 5.x 集群并设置了 Spark 选项:
我想 运行 一个简单的测试,使用 PySpark 从一个 Datatap 读取数据并将其写入另一个 Datatap。
使用 PySpark 执行此操作的步骤是什么?
对于这个例子,我将使用默认为我的租户创建的 TenantStorage DTAP。
我已经从 https://raw.githubusercontent.com/fivethirtyeight/data/master/airline-safety/airline-safety.csv
上传了一个数据集
接下来,找到控制器节点并通过 ssh 连接到它:
因为租户设置为默认值 Cluster Superuser Privileges(站点管理员和租户管理员),我可以从集群页面下载租户 ssh 密钥并使用它通过 ssh 进入控制器节点:
ssh bluedata@x.x.x.x -p 10007 -i ~/Downloads/BD_Demo\ Tenant.pem
x.x.x.x
对我来说是我的 BlueData 网关的 public IP 地址。
请注意,我们正在连接到控制器的端口 10007。
运行 pyspark:
$ pyspark --master yarn --deploy-mode client --packages com.databricks:spark-csv_2.10:1.4.0
访问数据文件并检索第一条记录:
>>> from pyspark.sql import SQLContext
>>> sqlContext = SQLContext(sc)
>>> df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('dtap://TenantStorage/airline-safety.csv')
>>> df.take(1)
结果是:
[Row(airline=u'Aer Lingus', avail_seat_km_per_week=320906734, incidents_85_99=2, fatal_accidents_85_99=0, fatalities_85_99=0, incidents_00_14=0, fatal_accidents_00_14=0, fatalities_00_14=0)]
如果你想从一个 Datatap 读取数据,对其进行处理并将其保存到另一个 Datatap,它看起来像这样:
>>> df_filtered = df.filter(df.incidents_85_99 == 0)
>>> df_filtered.write.parquet('dtap://OtherDataTap/airline-safety_zero_incidents.parquet')
我创建了一个 Cloudera 5.x 集群并设置了 Spark 选项:
我想 运行 一个简单的测试,使用 PySpark 从一个 Datatap 读取数据并将其写入另一个 Datatap。
使用 PySpark 执行此操作的步骤是什么?
对于这个例子,我将使用默认为我的租户创建的 TenantStorage DTAP。
我已经从 https://raw.githubusercontent.com/fivethirtyeight/data/master/airline-safety/airline-safety.csv
上传了一个数据集接下来,找到控制器节点并通过 ssh 连接到它:
因为租户设置为默认值 Cluster Superuser Privileges(站点管理员和租户管理员),我可以从集群页面下载租户 ssh 密钥并使用它通过 ssh 进入控制器节点:
ssh bluedata@x.x.x.x -p 10007 -i ~/Downloads/BD_Demo\ Tenant.pem
x.x.x.x
对我来说是我的 BlueData 网关的 public IP 地址。
请注意,我们正在连接到控制器的端口 10007。
运行 pyspark:
$ pyspark --master yarn --deploy-mode client --packages com.databricks:spark-csv_2.10:1.4.0
访问数据文件并检索第一条记录:
>>> from pyspark.sql import SQLContext
>>> sqlContext = SQLContext(sc)
>>> df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('dtap://TenantStorage/airline-safety.csv')
>>> df.take(1)
结果是:
[Row(airline=u'Aer Lingus', avail_seat_km_per_week=320906734, incidents_85_99=2, fatal_accidents_85_99=0, fatalities_85_99=0, incidents_00_14=0, fatal_accidents_00_14=0, fatalities_00_14=0)]
如果你想从一个 Datatap 读取数据,对其进行处理并将其保存到另一个 Datatap,它看起来像这样:
>>> df_filtered = df.filter(df.incidents_85_99 == 0)
>>> df_filtered.write.parquet('dtap://OtherDataTap/airline-safety_zero_incidents.parquet')