将行列表保存到 pyspark 中的 Hive table

saving a list of rows to a Hive table in pyspark

我有一个 pyspark 应用程序。我将配置单元 table 复制到我的 hdfs 目录,并在 python 中我 sqlContext.sql 对此 table 进行了查询。现在这个变量是一个数据框,我称之为 rows。我需要随机打乱 rows,所以我不得不将它们转换为行列表 rows_list = rows.collect()。那么我 shuffle(rows_list) 就地洗牌列表。我取了我需要的随机行数 x:

for r in range(x): allrows2add.append(rows_list[r]) 现在我想将 allrows2add 保存为一个配置单元 table 或附加一个现有的配置单元 table(以更容易的为准)。问题是我不能这样做:

all_df = sc.parallelize(allrows2add).toDF() 不能这样做,无法推断架构 ValueError: Some of types cannot be determined by the first 100 rows, please try again with sampling

没有放入整个模式。 rows 的架构有 117 列,因此我不想将它们键入。有没有办法提取 rows 的模式来帮助我制作 allrows2add 数据框或以某种方式保存为配置单元 table? 我可以 rows.printSchema() 但不确定如何将其转化为模式格式作为变量传递 toDF() 而无需解析所有文本

谢谢

添加循环信息

#Table is a List of Rows from small Hive table I loaded using
#query = "SELECT * FROM Table"
#Table = sqlContext.sql(query).collect()

for i in range(len(Table)):

    rows = sqlContext.sql(qry)
    val1 = Table[i][0]
    val2 = Table[i][1]
    count = Table[i][2]
    x = 100 - count

#hivetemp is a table that I copied from Hive to my hfs using:
#create external table IF NOT EXISTS hive temp LIKE hivetableIwant2copy LOCATION "/user/name/hiveBackup";
#INSERT OVERWRITE TABLE hivetemp SELECT * FROM hivetableIwant2copy;

    query = "SELECT * FROM hivetemp WHERE col1<>\""+val1+"\" AND col2 ==\""+val2+"\" ORDER BY RAND() LIMIT "+str(x)

    rows = sqlContext.sql(query)
    rows = rows.withColumn("col4", lit(10))
    rows = rows.withColumn("col5", lit(some_string))
#writing to parquet is heck slow AND I can't work with pandas due to the library not installed on the server
    rows.saveAsParquetFile("rows"+str(i)+".parquet")
#tried this before and heck slow also
    #rows_list = rows.collect()
    #shuffle(rows_list)

当无法推断架构时,通常是有原因的。 toDFcreateDataFrame 函数的语法糖,它默认只使用前 100 行(despite the docs 表示它只使用第一行)来确定模式应该是什么。要改变这一点,您可以增加采样率以查看更大比例的数据:

df = rdd.toDF(sampleRatio=0.2)
# or...
df = sqlContext.createDataFrame(rdd, samplingRatio=0.2)

也有可能您的随机抽样碰巧只采用某些特定列的空值行。如果是这种情况,您可以 create a schema from scratch 像这样:

from pyspark.sql.types import *
# all DataFrame rows are StructType
# can create a new StructType with combinations of StructField
schema = StructType([
    StructField("column_1", StringType(), True),
    StructField("column_2", IntegerType(), True),
    # etc.
])
df = sqlContext.createDataFrame(rdd, schema=schema)

或者,您可以通过访问 schema 值从之前创建的 DataFrame 中获取架构:

df2 = sqlContext.createDataFrame(rdd, schema=df1.schema)

请注意,如果您的 RDD 的行不是 StructType (a.k.a. Row) 对象而不是字典或列表,您将无法创建数据框从他们。如果您的 RDD 行是字典,您可以将它们转换为 Row 对象,如下所示:

rdd = rdd.map(lambda x: pyspark.sql.Row(**x))
# ** is to unpack the dictionary since the Row constructor
# only takes keyword arguments