如何将结构化数据映射到 Spark 中的 schemaRDD?
How to map structured data to schemaRDD in Spark?
我之前以不同的方式问过这个问题,但有一些变化,所以我想把它作为一个新问题再问一次。
我有一个结构化数据,其中只有一部分是 json 格式,但我需要将整个数据映射到 schemaRDD。数据如下所示:
03052015 04:13:20
{"recordType":"NEW","data":{"keycol":"val1","col2":"val2","col3":"val3"}
每行以日期开头,后跟时间和 json 格式的文本。
我不仅需要将 json 中的文本映射到同一结构中,还需要将日期和时间映射到相同的结构中。
我在 Python 中尝试过,但显然它不起作用,因为 Row 不采用 RDD(在本例中为 jsonRDD)。
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
orderFile = sc.textFile(myfile)
orderLine = orderFile.map(lambda line: line.split(" ", 2))
anotherOrderLine = orderLine.map(lambda p: Row(date=p[0], time=p[1], content=sqlContext.jsonRDD(p[3])))
schemaOrder = sqlContext.inferSchema(anotherOrderLine)
schemaOrder.printSchema()
for x in schemaOrder.collect():
print x
目标是能够运行 对 schemaRDD 进行这样的查询:
select date, time, data.keycol, data.val1, data.val2, data.val3 from myOrder
如何将整行映射到 schemaRDD?
感谢任何帮助?
sqlContext.jsonRDD 从包含字符串的 RDD 创建模式 rdd,其中每个字符串包含一个 JSON 表示。此代码示例来自 SparkSQL 文档 (https://spark.apache.org/docs/1.2.0/sql-programming-guide.html):
val anotherPeopleRDD = sc.parallelize("""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD)
关于 jsonRDD 的一个很酷的事情是您可以提供和附加参数来说明 JSONs 模式,这应该会提高性能。这可以通过创建一个 schemaRDD(只需加载一个示例文档)然后调用 schemaRDD.schema 方法来获取模式来实现。
最简单的选择是将此字段添加到 JSON 并使用 jsonRDD
我的数据:
03052015 04:13:20 {"recordType":"NEW","data":{"keycol":"val1","col1":"val5","col2":"val3"}}
03062015 04:13:20 {"recordType":"NEW1","data":{"keycol":"val2","col1":"val6","col2":"val3"}}
03072015 04:13:20 {"recordType":"NEW2","data":{"keycol":"val3","col1":"val7","col2":"val3"}}
03082015 04:13:20 {"recordType":"NEW3","data":{"keycol":"val4","col1":"val8","col2":"val3"}}
代码:
import json
def transform(data):
ts = data[:18].strip()
jss = data[18:].strip()
jsj = json.loads(jss)
jsj['ts'] = ts
return json.dumps(jsj)
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
rdd = sc.textFile('/sparkdemo/sample.data')
tbl = sqlContext.jsonRDD(rdd.map(transform))
tbl.registerTempTable("myOrder")
sqlContext.sql("select ts, recordType, data.keycol, data.col1, data.col2 data from myOrder").collect()
结果:
[Row(ts=u'03052015 04:13:20', recordType=u'NEW', keycol=u'val1', col1=u'val5', data=u'val3'), Row(ts=u'03062015 04:13:20', recordType=u'NEW1', keycol=u'val2', col1=u'val6', data=u'val3'), Row(ts=u'03072015 04:13:20', recordType=u'NEW2', keycol=u'val3', col1=u'val7', data=u'val3'), Row(ts=u'03082015 04:13:20', recordType=u'NEW3', keycol=u'val4', col1=u'val8', data=u'val3')]
在您的代码中存在一个问题,即您为每一行调用 jsonRDD,这是不正确的 - 它接受 RDD 和 returns SchemaRDD。
我之前以不同的方式问过这个问题,但有一些变化,所以我想把它作为一个新问题再问一次。 我有一个结构化数据,其中只有一部分是 json 格式,但我需要将整个数据映射到 schemaRDD。数据如下所示:
03052015 04:13:20 {"recordType":"NEW","data":{"keycol":"val1","col2":"val2","col3":"val3"}
每行以日期开头,后跟时间和 json 格式的文本。 我不仅需要将 json 中的文本映射到同一结构中,还需要将日期和时间映射到相同的结构中。
我在 Python 中尝试过,但显然它不起作用,因为 Row 不采用 RDD(在本例中为 jsonRDD)。
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
orderFile = sc.textFile(myfile)
orderLine = orderFile.map(lambda line: line.split(" ", 2))
anotherOrderLine = orderLine.map(lambda p: Row(date=p[0], time=p[1], content=sqlContext.jsonRDD(p[3])))
schemaOrder = sqlContext.inferSchema(anotherOrderLine)
schemaOrder.printSchema()
for x in schemaOrder.collect():
print x
目标是能够运行 对 schemaRDD 进行这样的查询:
select date, time, data.keycol, data.val1, data.val2, data.val3 from myOrder
如何将整行映射到 schemaRDD?
感谢任何帮助?
sqlContext.jsonRDD 从包含字符串的 RDD 创建模式 rdd,其中每个字符串包含一个 JSON 表示。此代码示例来自 SparkSQL 文档 (https://spark.apache.org/docs/1.2.0/sql-programming-guide.html):
val anotherPeopleRDD = sc.parallelize("""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD)
关于 jsonRDD 的一个很酷的事情是您可以提供和附加参数来说明 JSONs 模式,这应该会提高性能。这可以通过创建一个 schemaRDD(只需加载一个示例文档)然后调用 schemaRDD.schema 方法来获取模式来实现。
最简单的选择是将此字段添加到 JSON 并使用 jsonRDD
我的数据:
03052015 04:13:20 {"recordType":"NEW","data":{"keycol":"val1","col1":"val5","col2":"val3"}}
03062015 04:13:20 {"recordType":"NEW1","data":{"keycol":"val2","col1":"val6","col2":"val3"}}
03072015 04:13:20 {"recordType":"NEW2","data":{"keycol":"val3","col1":"val7","col2":"val3"}}
03082015 04:13:20 {"recordType":"NEW3","data":{"keycol":"val4","col1":"val8","col2":"val3"}}
代码:
import json
def transform(data):
ts = data[:18].strip()
jss = data[18:].strip()
jsj = json.loads(jss)
jsj['ts'] = ts
return json.dumps(jsj)
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
rdd = sc.textFile('/sparkdemo/sample.data')
tbl = sqlContext.jsonRDD(rdd.map(transform))
tbl.registerTempTable("myOrder")
sqlContext.sql("select ts, recordType, data.keycol, data.col1, data.col2 data from myOrder").collect()
结果:
[Row(ts=u'03052015 04:13:20', recordType=u'NEW', keycol=u'val1', col1=u'val5', data=u'val3'), Row(ts=u'03062015 04:13:20', recordType=u'NEW1', keycol=u'val2', col1=u'val6', data=u'val3'), Row(ts=u'03072015 04:13:20', recordType=u'NEW2', keycol=u'val3', col1=u'val7', data=u'val3'), Row(ts=u'03082015 04:13:20', recordType=u'NEW3', keycol=u'val4', col1=u'val8', data=u'val3')]
在您的代码中存在一个问题,即您为每一行调用 jsonRDD,这是不正确的 - 它接受 RDD 和 returns SchemaRDD。