Databricks Apache Spark AttributeError: 'dict' object has no attribute 'write'
Databricks Apache Spark AttributeError: 'dict' object has no attribute 'write'
以下函数导致输出 JSON 个结果。
def myfunc():
with ServiceBusClient.from_connection_string(CONNECTION_STR) as client:
# max_wait_time specifies how long the receiver should wait with no incoming messages before stopping receipt.
# Default is None; to receive forever.
with client.get_queue_receiver(QUEUE_NAME, session_id=session_id, max_wait_time=5) as receiver:
for msg in receiver:
# print("Received: " + str(msg))
themsg = json.loads(str(msg))
# complete the message so that the message is removed from the queue
receiver.complete_message(msg)
return themsg
result = myfunc()
以下是 JSON 输出的片段。
Out[65]: {'name': 'dmMapping_DIM_WORK_ORDER',
'description': 'DIM_WORK_ORDER Azure Foundation to Azure Data Mart Mapping',
'version': '2.4',
'updateDttm': '01/02/2022 14:46PM',
'SCDType': 4,
'mappings': [{'ELLIPSE': {'method': 'ellipseItem',
'tables': [{'database': 'foundation',
'schema': 'AZ_FH_ELLIPSE',
'table': 'ADS_FND_MSF620',
'primaryKey': [{'column': 'WORK_ORDER'}]}],
'columns': [{'column': 'D_WORK_ORDER_KEY',
'type': 'int',
'allowNulls': 'No',
'mapType': 'autoGenerate'},
{'column': 'SYSTEM_OF_RECORD',
'type': 'varchar',
'length': 24,
'allowNulls': 'No',
'mapType': 'staticValue',
'value': 'ELLIPSE'},
{'column': 'ACTUAL_FINISH_DATE',
当我尝试使用以下内容保存输出时
result.write.save().json('/mnt/lake/RAW/FormulaClassification/F1Area/')
我收到错误:
AttributeError: 'dict' object has no attribute 'write'
谁能告诉我如何克服这个错误?
最简单的方法是直接将数据写入 JSON,而不使用 Spark:
with open("/dbfs/mnt/lake/RAW/FormulaClassification/F1Area/<file-name>", "w") as file:
file.write(json.dumps(result))
您仍然可以使用 Spark API,但对于一条消息来说,这就有点矫枉过正了:
rdd = sc.parallelize([json.dumps(result)])
spark.read.json(rdd) \
.write.mode("append").json('/mnt/lake/RAW/FormulaClassification/F1Area/')
以下函数导致输出 JSON 个结果。
def myfunc():
with ServiceBusClient.from_connection_string(CONNECTION_STR) as client:
# max_wait_time specifies how long the receiver should wait with no incoming messages before stopping receipt.
# Default is None; to receive forever.
with client.get_queue_receiver(QUEUE_NAME, session_id=session_id, max_wait_time=5) as receiver:
for msg in receiver:
# print("Received: " + str(msg))
themsg = json.loads(str(msg))
# complete the message so that the message is removed from the queue
receiver.complete_message(msg)
return themsg
result = myfunc()
以下是 JSON 输出的片段。
Out[65]: {'name': 'dmMapping_DIM_WORK_ORDER',
'description': 'DIM_WORK_ORDER Azure Foundation to Azure Data Mart Mapping',
'version': '2.4',
'updateDttm': '01/02/2022 14:46PM',
'SCDType': 4,
'mappings': [{'ELLIPSE': {'method': 'ellipseItem',
'tables': [{'database': 'foundation',
'schema': 'AZ_FH_ELLIPSE',
'table': 'ADS_FND_MSF620',
'primaryKey': [{'column': 'WORK_ORDER'}]}],
'columns': [{'column': 'D_WORK_ORDER_KEY',
'type': 'int',
'allowNulls': 'No',
'mapType': 'autoGenerate'},
{'column': 'SYSTEM_OF_RECORD',
'type': 'varchar',
'length': 24,
'allowNulls': 'No',
'mapType': 'staticValue',
'value': 'ELLIPSE'},
{'column': 'ACTUAL_FINISH_DATE',
当我尝试使用以下内容保存输出时
result.write.save().json('/mnt/lake/RAW/FormulaClassification/F1Area/')
我收到错误:
AttributeError: 'dict' object has no attribute 'write'
谁能告诉我如何克服这个错误?
最简单的方法是直接将数据写入 JSON,而不使用 Spark:
with open("/dbfs/mnt/lake/RAW/FormulaClassification/F1Area/<file-name>", "w") as file:
file.write(json.dumps(result))
您仍然可以使用 Spark API,但对于一条消息来说,这就有点矫枉过正了:
rdd = sc.parallelize([json.dumps(result)])
spark.read.json(rdd) \
.write.mode("append").json('/mnt/lake/RAW/FormulaClassification/F1Area/')