如何将 JSON 结果转换为 Parquet?
How to convert a JSON result to Parquet?
我有以下代码可以从 Marketo 系统中获取一些数据
from marketorestpython.client import MarketoClient
munchkin_id = "xxx-xxx-xxx"
client_id = "00000000-0000-0000-0000-00000000000"
client_secret= "secret"
mc = MarketoClient(munchkin_id, client_id, client_secret)
mc.execute(method='get_multiple_leads_by_filter_type', filterType='email', filterValues=['email@domain.com'],
fields=['BG__c','email','company','createdAt'], batchSize=None)
这个returns我下面的数据
[{'BG__c': 'ABC',
'company': 'MCS',
'createdAt': '2016-10-25T14:04:15Z',
'id': 4,
'email': 'email@domain.com'},
{'BG__c': 'CDE',
'company': 'MSC',
'createdAt': '2018-03-28T16:41:06Z',
'id': 10850879,
'email': 'email@domain.com'}]
我想做的是,将返回的这个保存到 Parquet 文件中。但是当我用下面的代码尝试这个时,我收到一条错误消息。
from marketorestpython.client import MarketoClient
munchkin_id = "xxx-xxx-xxx"
client_id = "00000000-0000-0000-0000-00000000000"
client_secret= "secret"
mc = MarketoClient(munchkin_id, client_id, client_secret)
data = mc.execute(method='get_multiple_leads_by_filter_type', filterType='email', filterValues=['email@domain.com'],
fields=['BG__c','email','company','createdAt'], batchSize=None)
sqlContext.read.json(data)
data.write.parquet("adl://subscription.azuredatalakestore.net/folder1/Marketo/marketo_data")
java.lang.ClassCastException: java.util.HashMap cannot be cast to java.lang.String
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<command-1431708582476650> in <module>()
7 fields=['BG__c','email','company','createdAt'], batchSize=None)
8
----> 9 sqlContext.read.json(data)
10 data.write.parquet("adl://subscription.azuredatalakestore.net/folder1/Marketo/marketo_data")
/databricks/spark/python/pyspark/sql/readwriter.py in json(self, path, schema, primitivesAsString, prefersDecimal, allowComments, allowUnquotedFieldNames, allowSingleQuotes, allowNumericLeadingZero, allowBackslashEscapingAnyCharacter, mode, columnNameOfCorruptRecord, dateFormat, timestampFormat, multiLine, allowUnquotedControlChars, charset)
261 path = [path]
262 if type(path) == list:
--> 263 return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))
264 elif isinstance(path, RDD):
265 def func(iterator):
/databricks/spark/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py in __call__(self, *args)
1158 answer = self.gateway_client.send_command(command)
1159 return_value = get_return_value(
-> 1160 answer, self.gateway_client, self.target_id, self.name)
1161
我做错了什么?
您有以下数据
data = [{'BG__c': 'ABC',
'company': 'MCS',
'createdAt': '2016-10-25T14:04:15Z',
'id': 4,
'email': 'email@domain.com'},
{'BG__c': 'CDE',
'company': 'MSC',
'createdAt': '2018-03-28T16:41:06Z',
'id': 10850879,
'email': 'email@domain.com'}]
为了将其保存到 parquet 文件中,我建议创建一个 DataFrame,然后将其保存为 parquet。
from pyspark.sql.types import *
df = spark.createDataFrame(data,
schema = StructType([
StructField("BC_g", StringType(), True),
StructField("company", StringType(), True),
StructField("createdAt", StringType(), True),
StructField("email", StringType(), True),
StructField("id", IntegerType(), True)]))
这将给出以下类型:
df.dtypes
[('BC_g', 'string'),
('company', 'string'),
('createdAt', 'string'),
('email', 'string'),
('id', 'int')]
然后您可以将数据帧保存为 parquet 文件
df.show()
+-----+-------+--------------------+----------------+--------+
|BG__c|company| createdAt| email| id|
+-----+-------+--------------------+----------------+--------+
| ABC| MCS|2016-10-25T14:04:15Z|email@domain.com| 4|
| CDE| MSC|2018-03-28T16:41:06Z|email@domain.com|10850879|
+-----+-------+--------------------+----------------+--------+
df.write.format('parquet').save(parquet_path_in_hdfs)
其中 parquet_path_in_hdfs 是所需镶木地板文件的路径和名称
根据您代码中的以下语句,您正在直接写入数据。您必须先创建数据框。您可以使用 val df = sqlContext.read.json("path/to/json/file") 将 json 转换为 df。然后执行 df.write
data.write.parquet("adl://subscription.azuredatalakestore.net/folder1/Marketo/marketo_data")
我有以下代码可以从 Marketo 系统中获取一些数据
from marketorestpython.client import MarketoClient
munchkin_id = "xxx-xxx-xxx"
client_id = "00000000-0000-0000-0000-00000000000"
client_secret= "secret"
mc = MarketoClient(munchkin_id, client_id, client_secret)
mc.execute(method='get_multiple_leads_by_filter_type', filterType='email', filterValues=['email@domain.com'],
fields=['BG__c','email','company','createdAt'], batchSize=None)
这个returns我下面的数据
[{'BG__c': 'ABC',
'company': 'MCS',
'createdAt': '2016-10-25T14:04:15Z',
'id': 4,
'email': 'email@domain.com'},
{'BG__c': 'CDE',
'company': 'MSC',
'createdAt': '2018-03-28T16:41:06Z',
'id': 10850879,
'email': 'email@domain.com'}]
我想做的是,将返回的这个保存到 Parquet 文件中。但是当我用下面的代码尝试这个时,我收到一条错误消息。
from marketorestpython.client import MarketoClient
munchkin_id = "xxx-xxx-xxx"
client_id = "00000000-0000-0000-0000-00000000000"
client_secret= "secret"
mc = MarketoClient(munchkin_id, client_id, client_secret)
data = mc.execute(method='get_multiple_leads_by_filter_type', filterType='email', filterValues=['email@domain.com'],
fields=['BG__c','email','company','createdAt'], batchSize=None)
sqlContext.read.json(data)
data.write.parquet("adl://subscription.azuredatalakestore.net/folder1/Marketo/marketo_data")
java.lang.ClassCastException: java.util.HashMap cannot be cast to java.lang.String
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<command-1431708582476650> in <module>()
7 fields=['BG__c','email','company','createdAt'], batchSize=None)
8
----> 9 sqlContext.read.json(data)
10 data.write.parquet("adl://subscription.azuredatalakestore.net/folder1/Marketo/marketo_data")
/databricks/spark/python/pyspark/sql/readwriter.py in json(self, path, schema, primitivesAsString, prefersDecimal, allowComments, allowUnquotedFieldNames, allowSingleQuotes, allowNumericLeadingZero, allowBackslashEscapingAnyCharacter, mode, columnNameOfCorruptRecord, dateFormat, timestampFormat, multiLine, allowUnquotedControlChars, charset)
261 path = [path]
262 if type(path) == list:
--> 263 return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))
264 elif isinstance(path, RDD):
265 def func(iterator):
/databricks/spark/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py in __call__(self, *args)
1158 answer = self.gateway_client.send_command(command)
1159 return_value = get_return_value(
-> 1160 answer, self.gateway_client, self.target_id, self.name)
1161
我做错了什么?
您有以下数据
data = [{'BG__c': 'ABC',
'company': 'MCS',
'createdAt': '2016-10-25T14:04:15Z',
'id': 4,
'email': 'email@domain.com'},
{'BG__c': 'CDE',
'company': 'MSC',
'createdAt': '2018-03-28T16:41:06Z',
'id': 10850879,
'email': 'email@domain.com'}]
为了将其保存到 parquet 文件中,我建议创建一个 DataFrame,然后将其保存为 parquet。
from pyspark.sql.types import *
df = spark.createDataFrame(data,
schema = StructType([
StructField("BC_g", StringType(), True),
StructField("company", StringType(), True),
StructField("createdAt", StringType(), True),
StructField("email", StringType(), True),
StructField("id", IntegerType(), True)]))
这将给出以下类型:
df.dtypes
[('BC_g', 'string'),
('company', 'string'),
('createdAt', 'string'),
('email', 'string'),
('id', 'int')]
然后您可以将数据帧保存为 parquet 文件
df.show()
+-----+-------+--------------------+----------------+--------+
|BG__c|company| createdAt| email| id|
+-----+-------+--------------------+----------------+--------+
| ABC| MCS|2016-10-25T14:04:15Z|email@domain.com| 4|
| CDE| MSC|2018-03-28T16:41:06Z|email@domain.com|10850879|
+-----+-------+--------------------+----------------+--------+
df.write.format('parquet').save(parquet_path_in_hdfs)
其中 parquet_path_in_hdfs 是所需镶木地板文件的路径和名称
根据您代码中的以下语句,您正在直接写入数据。您必须先创建数据框。您可以使用 val df = sqlContext.read.json("path/to/json/file") 将 json 转换为 df。然后执行 df.write
data.write.parquet("adl://subscription.azuredatalakestore.net/folder1/Marketo/marketo_data")