使用 Python Psycopg2 将 JSON 数据插入远程 Postgres table 花费的时间太长 time.Is 这是正确的方法吗?
Inserting JSON data to remote Postgres table with Python Psycopg2 taking too long time.Is this the right approach?
我有 2GB json 来自 Mongodb 的数据 export.I 我正在尝试对这些数据进行一些分析,然后 post 将这些分析数据导入远程 Postgres table.This是我的代码:
import json
from psycopg2 import connect, Error
import dateutil.parser as parser
from datetime import datetime
record_list = []
for line in open('december.json', 'r'):
record_list.append(json.loads(line))
#print(record_list[0])
devices = []
for key, values in record_list[0].items():
if key == "deviceSerial":
devices.append(values)
test_list = list(set(devices))
device_data = []
for x in test_list:
device_1 = [i for i in record_list if (i['deviceSerial'] == x)]
datetime_object_first = parser.parse(device_1[0]['createdAt'],fuzzy=True)
datetime_object_last = parser.parse(device_1[len(device_1)-1]['createdAt'],fuzzy=True)
devices = device_1[0]['deviceSerial']
device_model = device_1[0]['device']
device_Usage=round((datetime.timestamp(datetime_object_last)-datetime.timestamp(datetime_object_first))/60/1000,3)
#calculating device usage
device_data_elements ={
'deviceModel': device_model,
'deviceSerial':devices,
'deviceUsageInMin':device_Usage
}
device_data.append(device_data_elements)
if type(device_data) == list:
first_record = device_data[0]
columns = list(first_record.keys())
#print ("\ncolumn names:", columns)
#print(device_data)
table_name = "test_data"
sql_string = 'INSERT INTO {} '.format( table_name )
sql_string += "(" + ', '.join(columns) + ")\nVALUES "
# enumerate over the record
for i, record_dict in enumerate(device_data):
# iterate over the values of each record dict object
values = []
for col_names, val in record_dict.items():
if type(val) == str:
val = val.replace("'", "''")
val = "'" + val + "'"
values += [ str(val) ]
# join the list of values and enclose record in parenthesis
sql_string += "(" + ', '.join(values) + "),\n"
# remove the last comma and end statement with a semicolon
sql_string = sql_string[:-2] + ";"
print ("\nSQL statement:")
print (sql_string)
try:
# declare a new PostgreSQL connection object
conn = connect(
dbname = "events_data",
user = "chao",
host = "localhost",
# attempt to connect for 10 seconds then raise exception
connect_timeout = 10
)
cur = conn.cursor()
print ("\ncreated cursor object:", cur)
#Post data into postgres table
except (Exception, Error) as err:
print ("\npsycopg2 connect error:", err)
conn = None
cur = None
if cur != None:
try:
cur.execute( sql_string )
conn.commit()
print ('\nfinished INSERT INTO execution')
except (Exception, Error) as error:
print("\nexecute_sql() error:", error)
conn.rollback()
# close the cursor and connection
cur.close()
conn.close()
我正在用这个 script.So 做一些计算(分析) script.When 数据长度较少,脚本将这些分析插入 Postgres successfully.If 数据长度很大,花费的时间太长,我什至等了 12 小时但没有成功。现在我的脚本在本地 运行 并且读取数据也 locally.What 将是读取和操作大量数据的最佳方法并且post Postgres table.This 的分析是我的示例 json 数据。
[
{
"createdAt": "Fri Nov 27 2020 08:07:39 GMT+0000 ",
"sessionId": null,
"text": null,
"device": null,
"deviceSerial": null
},
{
"createdAt": "Tue Sep 01 2020 06:59:18 GMT+0000",
"sessionId": null,
"text": "Approve",
"device": "Android",
"deviceSerial": null
},
{
"createdAt": "Wed Sep 02 2020 08:40:10 GMT+0000",
"pageTitle": "submit option",
"sessionId": null,
"text": "launchComponent",
"device": "Android",
"deviceSerial": "636363636890"
},
{
"createdAt": "Wed Sep 02 2020 08:40:11",
"pageTitle": "quick check",
"sessionId": "88958d89c65f4fcea56e148a5a2838cfhdhdhd",
"text": "",
"device": "Android",
"deviceSerial": "6625839827"
}
]
我建议将 JSON 转储到 Postgres 中并在 Postgres 中进行分析。这就是 Postgres 擅长的。 JSON不需要处理,直接把JSON数组转成Postgres行即可。
一个选项是使用单个 jsonb
列创建一个 table,然后使用 jsonb_array_elements
.
将每个项目作为一行插入
create table devices_json (
data jsonb
)
insert into devices_json (data)
select * from jsonb_array_elements('
[
{
"createdAt": "Fri Nov 27 2020 08:07:39 GMT+0000 ",
"sessionId": null,
"text": null,
"device": null,
"deviceSerial": null
},
...and so on...
]
')
然后使用 its JSON functions 在 Postgres 中进行分析。
如果字段众所周知,您可以使用 json_populate_recordset
将字段插入到传统 SQL table 的各个列中。然后你有一个传统的 SQL table 可能更容易使用。
-- NOTE Postgres columns are case-sensitive, so they must be quoted to
-- ensure they exactly match the JSON keys
create table devices (
"createdAt" timestamp,
"sessionId" text,
"text" text,
device text,
"deviceSerial" text,
"pageTitle" text
);
insert into devices
select * from json_populate_recordset(NULL::devices, '
[
{
"createdAt": "Fri Nov 27 2020 08:07:39 GMT+0000 ",
"sessionId": null,
"text": null,
"device": null,
"deviceSerial": null
},
...and so on...
]
')
您也可以将两者结合起来:将 JSON 数组转储到 jsonb 列中,然后将其分离到 Postgres 中的单独列中。
谢谢@Schwern.It 真的很好 help.I 想做我的分析 dynamically.For 例子 我想对 deviceSerial 的列进行排序,并根据与上次的差异分别计算其使用情况第一个 index.This 是我的 analytics.There 将有数千行 data.I 想要 post 那些分析到另一个 table.I 正在考虑在 Python 并进行一些计算并将结果推送到 Postgres.Would 这是一个好方法吗?或者我可以这样做,而不是使用 Postgres 的 python 命令从 Postgres.Instead 获取数据。
Posgres 中的批量插入最好由 cursor.copy_from 完成。
将你的json转换为Python中所需的结构,然后写入io.TextIO内存文件。在对 copy_from() 的一次调用中将其推送到数据库。在 copy_from.
之前对 TextIO 对象执行 seek(0)
import io
import psycopg2
buf = io.StringIO()
buf.write('1|{"x":0}\n')
buf.write('2|{"y":0}\n')
buf.seek(0)
with psycopg2.connect(service="my_service") as con:
cur = con.cursor()
cur.execute("create table if not exists t (a int, b jsonb)")
cur.copy_from(buf, "t", sep="|")
print(cur.rowcount)
插入到 postgres 的速度不能比这更快。
cursor.copy_expert 可以更灵活地做同样的事情。
问候尼尔斯
我有 2GB json 来自 Mongodb 的数据 export.I 我正在尝试对这些数据进行一些分析,然后 post 将这些分析数据导入远程 Postgres table.This是我的代码:
import json
from psycopg2 import connect, Error
import dateutil.parser as parser
from datetime import datetime
record_list = []
for line in open('december.json', 'r'):
record_list.append(json.loads(line))
#print(record_list[0])
devices = []
for key, values in record_list[0].items():
if key == "deviceSerial":
devices.append(values)
test_list = list(set(devices))
device_data = []
for x in test_list:
device_1 = [i for i in record_list if (i['deviceSerial'] == x)]
datetime_object_first = parser.parse(device_1[0]['createdAt'],fuzzy=True)
datetime_object_last = parser.parse(device_1[len(device_1)-1]['createdAt'],fuzzy=True)
devices = device_1[0]['deviceSerial']
device_model = device_1[0]['device']
device_Usage=round((datetime.timestamp(datetime_object_last)-datetime.timestamp(datetime_object_first))/60/1000,3)
#calculating device usage
device_data_elements ={
'deviceModel': device_model,
'deviceSerial':devices,
'deviceUsageInMin':device_Usage
}
device_data.append(device_data_elements)
if type(device_data) == list:
first_record = device_data[0]
columns = list(first_record.keys())
#print ("\ncolumn names:", columns)
#print(device_data)
table_name = "test_data"
sql_string = 'INSERT INTO {} '.format( table_name )
sql_string += "(" + ', '.join(columns) + ")\nVALUES "
# enumerate over the record
for i, record_dict in enumerate(device_data):
# iterate over the values of each record dict object
values = []
for col_names, val in record_dict.items():
if type(val) == str:
val = val.replace("'", "''")
val = "'" + val + "'"
values += [ str(val) ]
# join the list of values and enclose record in parenthesis
sql_string += "(" + ', '.join(values) + "),\n"
# remove the last comma and end statement with a semicolon
sql_string = sql_string[:-2] + ";"
print ("\nSQL statement:")
print (sql_string)
try:
# declare a new PostgreSQL connection object
conn = connect(
dbname = "events_data",
user = "chao",
host = "localhost",
# attempt to connect for 10 seconds then raise exception
connect_timeout = 10
)
cur = conn.cursor()
print ("\ncreated cursor object:", cur)
#Post data into postgres table
except (Exception, Error) as err:
print ("\npsycopg2 connect error:", err)
conn = None
cur = None
if cur != None:
try:
cur.execute( sql_string )
conn.commit()
print ('\nfinished INSERT INTO execution')
except (Exception, Error) as error:
print("\nexecute_sql() error:", error)
conn.rollback()
# close the cursor and connection
cur.close()
conn.close()
我正在用这个 script.So 做一些计算(分析) script.When 数据长度较少,脚本将这些分析插入 Postgres successfully.If 数据长度很大,花费的时间太长,我什至等了 12 小时但没有成功。现在我的脚本在本地 运行 并且读取数据也 locally.What 将是读取和操作大量数据的最佳方法并且post Postgres table.This 的分析是我的示例 json 数据。
[
{
"createdAt": "Fri Nov 27 2020 08:07:39 GMT+0000 ",
"sessionId": null,
"text": null,
"device": null,
"deviceSerial": null
},
{
"createdAt": "Tue Sep 01 2020 06:59:18 GMT+0000",
"sessionId": null,
"text": "Approve",
"device": "Android",
"deviceSerial": null
},
{
"createdAt": "Wed Sep 02 2020 08:40:10 GMT+0000",
"pageTitle": "submit option",
"sessionId": null,
"text": "launchComponent",
"device": "Android",
"deviceSerial": "636363636890"
},
{
"createdAt": "Wed Sep 02 2020 08:40:11",
"pageTitle": "quick check",
"sessionId": "88958d89c65f4fcea56e148a5a2838cfhdhdhd",
"text": "",
"device": "Android",
"deviceSerial": "6625839827"
}
]
我建议将 JSON 转储到 Postgres 中并在 Postgres 中进行分析。这就是 Postgres 擅长的。 JSON不需要处理,直接把JSON数组转成Postgres行即可。
一个选项是使用单个 jsonb
列创建一个 table,然后使用 jsonb_array_elements
.
create table devices_json (
data jsonb
)
insert into devices_json (data)
select * from jsonb_array_elements('
[
{
"createdAt": "Fri Nov 27 2020 08:07:39 GMT+0000 ",
"sessionId": null,
"text": null,
"device": null,
"deviceSerial": null
},
...and so on...
]
')
然后使用 its JSON functions 在 Postgres 中进行分析。
如果字段众所周知,您可以使用 json_populate_recordset
将字段插入到传统 SQL table 的各个列中。然后你有一个传统的 SQL table 可能更容易使用。
-- NOTE Postgres columns are case-sensitive, so they must be quoted to
-- ensure they exactly match the JSON keys
create table devices (
"createdAt" timestamp,
"sessionId" text,
"text" text,
device text,
"deviceSerial" text,
"pageTitle" text
);
insert into devices
select * from json_populate_recordset(NULL::devices, '
[
{
"createdAt": "Fri Nov 27 2020 08:07:39 GMT+0000 ",
"sessionId": null,
"text": null,
"device": null,
"deviceSerial": null
},
...and so on...
]
')
您也可以将两者结合起来:将 JSON 数组转储到 jsonb 列中,然后将其分离到 Postgres 中的单独列中。
谢谢@Schwern.It 真的很好 help.I 想做我的分析 dynamically.For 例子 我想对 deviceSerial 的列进行排序,并根据与上次的差异分别计算其使用情况第一个 index.This 是我的 analytics.There 将有数千行 data.I 想要 post 那些分析到另一个 table.I 正在考虑在 Python 并进行一些计算并将结果推送到 Postgres.Would 这是一个好方法吗?或者我可以这样做,而不是使用 Postgres 的 python 命令从 Postgres.Instead 获取数据。
Posgres 中的批量插入最好由 cursor.copy_from 完成。
将你的json转换为Python中所需的结构,然后写入io.TextIO内存文件。在对 copy_from() 的一次调用中将其推送到数据库。在 copy_from.
之前对 TextIO 对象执行 seek(0)import io
import psycopg2
buf = io.StringIO()
buf.write('1|{"x":0}\n')
buf.write('2|{"y":0}\n')
buf.seek(0)
with psycopg2.connect(service="my_service") as con:
cur = con.cursor()
cur.execute("create table if not exists t (a int, b jsonb)")
cur.copy_from(buf, "t", sep="|")
print(cur.rowcount)
插入到 postgres 的速度不能比这更快。
cursor.copy_expert 可以更灵活地做同样的事情。
问候尼尔斯