PySpark:将 SchemaRDD 映射到 SchemaRDD
PySpark: Map a SchemaRDD into a SchemaRDD
我正在加载一个包含 JSON 个对象的文件作为 PySpark SchemaRDD
。我想更改对象的 "shape"(基本上,我将它们展平)然后插入 Hive table.
我遇到的问题是下面的 returns 是 PipelinedRDD
而不是 SchemaRDD
:
log_json.map(flatten_function)
(其中 log_json
是 SchemaRDD
)。
有没有办法保留类型、转换回所需类型或从新类型高效插入?
看起来 select
在 python 中不可用,因此您必须 registerTempTable
并将其写为 SQL 语句,例如
`SELECT flatten(*) FROM TABLE`
设置函数后用于SQL
sqlCtx.registerFunction("flatten", lambda x: flatten_function(x))
正如@zero323 提出的那样,可能不支持针对 * 的函数...so you can just create a function that takes in your data types and pass all of that in.
与其说是真正的解决方案,不如说是一个想法。假设您的数据如下所示:
data = [
{"foobar":
{"foo": 1, "bar": 2, "fozbaz": {
"foz": 0, "baz": {"b": -1, "a": -1, "z": -1}
}}}]
import json
with open("foobar.json", "w") as fw:
for record in data:
fw.write(json.dumps(record))
首先让我们加载它并检查模式:
>>> srdd = sqlContext.jsonFile("foobar.json")
>>> srdd.printSchema()
root
|-- foobar: struct (nullable = true)
| |-- bar: integer (nullable = true)
| |-- foo: integer (nullable = true)
| |-- fozbaz: struct (nullable = true)
| | |-- baz: struct (nullable = true)
| | | |-- a: integer (nullable = true)
| | | |-- b: integer (nullable = true)
| | | |-- z: integer (nullable = true)
| | |-- foz: integer (nullable = true)
现在我们按照 Justin Pihony 的建议注册 table 并提取架构:
srdd.registerTempTable("srdd")
schema = srdd.schema().jsonValue()
我们可以使用类似于此的方法来展平模式,而不是展平数据:
def flatten_schema(schema):
"""Take schema as returned from schema().jsonValue()
and return list of field names with full path"""
def _flatten(schema, path="", accum=None):
# Extract name of the current element
name = schema.get("name")
# If there is a name extend path
if name is not None:
path = "{0}.{1}".format(path, name) if path else name
# It is some kind of struct
if isinstance(schema.get("fields"), list):
for field in schema.get("fields"):
_flatten(field, path, accum)
elif isinstance(schema.get("type"), dict):
_flatten(schema.get("type"), path, accum)
# It is an atomic type
else:
accum.append(path)
accum = []
_flatten(schema, "", accum)
return accum
添加小助手来格式化查询字符串:
def build_query(schema, df):
select = ", ".join(
"{0} AS {1}".format(field, field.replace(".", "_"))
for field in flatten_schema(schema))
return "SELECT {0} FROM {1}".format(select, df)
最终结果:
>>> sqlContext.sql(build_query(schema, "srdd")).printSchema()
root
|-- foobar_bar: integer (nullable = true)
|-- foobar_foo: integer (nullable = true)
|-- foobar_fozbaz_baz_a: integer (nullable = true)
|-- foobar_fozbaz_baz_b: integer (nullable = true)
|-- foobar_fozbaz_baz_z: integer (nullable = true)
|-- foobar_fozbaz_foz: integer (nullable = true)
免责声明:我没有尝试深入研究模式结构,所以很可能有一些情况没有被 flatten_schema
涵盖。
解决方法是applySchema
:
mapped = log_json.map(flatten_function)
hive_context.applySchema(mapped, flat_schema).insertInto(name)
其中 flat_schema 是一个 StructType
表示架构的方式与您从 log_json.schema()
获得的方式相同(但显然是扁平化的)。
你可以试试这个……有点长但是很管用
def flat_table(df,table_name):
def rec(l,in_array,name):
for i,v in enumerate(l):
if isinstance(v['type'],dict):
if 'fields' in v['type'].keys():
rec(name=name+[v['name']],l=v['type']['fields'],in_array=False)
if 'elementType' in v['type'].keys():
rec(name=name+[v['name']],l=v['type']['elementType']['fields'],in_array=True)
else:#recursia stop rule
#if this is an array so we need to explode every element in the array
if in_array:
field_list.append('{node}{subnode}.array'.format(node=".".join(name)+'.' if name else '', subnode=v['name']))
else:
field_list.append('{node}{subnode}'.format(node=".".join(name)+'.' if name else '', subnode=v['name']))
# table_name='x'
field_list=[]
l=df.schema.jsonValue()['fields']
df.registerTempTable(table_name)
rec(l,in_array=False,name=[table_name])
#create the select satement
inner_fileds=[]
outer_fields=[]
flag=True
for x in field_list:
f=x.split('.')
if f[-1]<>'array':
inner_fileds.append('{field} as {name}'.format(field=".".join(f),name=f[-1]))
of=['a']+f[-1:]
outer_fields.append('{field} as {name}'.format(field=".".join(of),name=of[-1]))
else:
if flag:#add the array to the inner query for expotion only once for every array field
inner_fileds.append('explode({field}) as {name}'.format(field=".".join(f[:-2]),name=f[-3]))
flag=False
of=['a']+f[-3:-1]
outer_fields.append('{field} as {name}'.format(field=".".join(of),name=of[-1]))
q="""select {outer_fields}
from (select {inner_fileds}
from {table_name}) a""".format(outer_fields=',\n'.join(outer_fields),inner_fileds=',\n'.join(inner_fileds),table_name=table_name)
return q
我正在加载一个包含 JSON 个对象的文件作为 PySpark SchemaRDD
。我想更改对象的 "shape"(基本上,我将它们展平)然后插入 Hive table.
我遇到的问题是下面的 returns 是 PipelinedRDD
而不是 SchemaRDD
:
log_json.map(flatten_function)
(其中 log_json
是 SchemaRDD
)。
有没有办法保留类型、转换回所需类型或从新类型高效插入?
看起来 select
在 python 中不可用,因此您必须 registerTempTable
并将其写为 SQL 语句,例如
`SELECT flatten(*) FROM TABLE`
设置函数后用于SQL
sqlCtx.registerFunction("flatten", lambda x: flatten_function(x))
正如@zero323 提出的那样,可能不支持针对 * 的函数...so you can just create a function that takes in your data types and pass all of that in.
与其说是真正的解决方案,不如说是一个想法。假设您的数据如下所示:
data = [
{"foobar":
{"foo": 1, "bar": 2, "fozbaz": {
"foz": 0, "baz": {"b": -1, "a": -1, "z": -1}
}}}]
import json
with open("foobar.json", "w") as fw:
for record in data:
fw.write(json.dumps(record))
首先让我们加载它并检查模式:
>>> srdd = sqlContext.jsonFile("foobar.json")
>>> srdd.printSchema()
root
|-- foobar: struct (nullable = true)
| |-- bar: integer (nullable = true)
| |-- foo: integer (nullable = true)
| |-- fozbaz: struct (nullable = true)
| | |-- baz: struct (nullable = true)
| | | |-- a: integer (nullable = true)
| | | |-- b: integer (nullable = true)
| | | |-- z: integer (nullable = true)
| | |-- foz: integer (nullable = true)
现在我们按照 Justin Pihony 的建议注册 table 并提取架构:
srdd.registerTempTable("srdd")
schema = srdd.schema().jsonValue()
我们可以使用类似于此的方法来展平模式,而不是展平数据:
def flatten_schema(schema):
"""Take schema as returned from schema().jsonValue()
and return list of field names with full path"""
def _flatten(schema, path="", accum=None):
# Extract name of the current element
name = schema.get("name")
# If there is a name extend path
if name is not None:
path = "{0}.{1}".format(path, name) if path else name
# It is some kind of struct
if isinstance(schema.get("fields"), list):
for field in schema.get("fields"):
_flatten(field, path, accum)
elif isinstance(schema.get("type"), dict):
_flatten(schema.get("type"), path, accum)
# It is an atomic type
else:
accum.append(path)
accum = []
_flatten(schema, "", accum)
return accum
添加小助手来格式化查询字符串:
def build_query(schema, df):
select = ", ".join(
"{0} AS {1}".format(field, field.replace(".", "_"))
for field in flatten_schema(schema))
return "SELECT {0} FROM {1}".format(select, df)
最终结果:
>>> sqlContext.sql(build_query(schema, "srdd")).printSchema()
root
|-- foobar_bar: integer (nullable = true)
|-- foobar_foo: integer (nullable = true)
|-- foobar_fozbaz_baz_a: integer (nullable = true)
|-- foobar_fozbaz_baz_b: integer (nullable = true)
|-- foobar_fozbaz_baz_z: integer (nullable = true)
|-- foobar_fozbaz_foz: integer (nullable = true)
免责声明:我没有尝试深入研究模式结构,所以很可能有一些情况没有被 flatten_schema
涵盖。
解决方法是applySchema
:
mapped = log_json.map(flatten_function)
hive_context.applySchema(mapped, flat_schema).insertInto(name)
其中 flat_schema 是一个 StructType
表示架构的方式与您从 log_json.schema()
获得的方式相同(但显然是扁平化的)。
你可以试试这个……有点长但是很管用
def flat_table(df,table_name):
def rec(l,in_array,name):
for i,v in enumerate(l):
if isinstance(v['type'],dict):
if 'fields' in v['type'].keys():
rec(name=name+[v['name']],l=v['type']['fields'],in_array=False)
if 'elementType' in v['type'].keys():
rec(name=name+[v['name']],l=v['type']['elementType']['fields'],in_array=True)
else:#recursia stop rule
#if this is an array so we need to explode every element in the array
if in_array:
field_list.append('{node}{subnode}.array'.format(node=".".join(name)+'.' if name else '', subnode=v['name']))
else:
field_list.append('{node}{subnode}'.format(node=".".join(name)+'.' if name else '', subnode=v['name']))
# table_name='x'
field_list=[]
l=df.schema.jsonValue()['fields']
df.registerTempTable(table_name)
rec(l,in_array=False,name=[table_name])
#create the select satement
inner_fileds=[]
outer_fields=[]
flag=True
for x in field_list:
f=x.split('.')
if f[-1]<>'array':
inner_fileds.append('{field} as {name}'.format(field=".".join(f),name=f[-1]))
of=['a']+f[-1:]
outer_fields.append('{field} as {name}'.format(field=".".join(of),name=of[-1]))
else:
if flag:#add the array to the inner query for expotion only once for every array field
inner_fileds.append('explode({field}) as {name}'.format(field=".".join(f[:-2]),name=f[-3]))
flag=False
of=['a']+f[-3:-1]
outer_fields.append('{field} as {name}'.format(field=".".join(of),name=of[-1]))
q="""select {outer_fields}
from (select {inner_fileds}
from {table_name}) a""".format(outer_fields=',\n'.join(outer_fields),inner_fileds=',\n'.join(inner_fileds),table_name=table_name)
return q