如何确保快速可靠地将数据集迁移到 fiware 后端?
How do I ensure fast and reliable datasets migration to fiware backend?
正在将非常大的数据集传输到 fiware
后端:SourceDB ->Orion -> Cygnus -> Postgres
。为此,我编写了一个 python 脚本来获取行,并为获取的每一行向 Orion 发送一个有效负载。
脚本以 150 毫秒(0.15 秒)的间隔触发,但令我惊讶的是,在大约 10 次迭代后,只有两个值(发送的第一个和最后一个有效负载)被保存到 Postgres
接收器。这意味着 80% 的数据集不会持久保存到接收器中。
脚本:
import psycopg2
from time import sleep
from config import config
from tqdm import tqdm
import requests
import json
def val_json():
db = "select to_json(d) from ( select \
n.noise_data as measurand, \
n.factor as \"sonometerClass\", \
to_timestamp(n.seconds) as \"dateObserved\", \
l.description as name, \
json_build_object( \
'coordinates', \
json_build_array(l.node_lon, l.node_lat) \
) as location \
from noise as n \
inner join deployment as d on \
d.deployment_id = n.deployment_id \
inner join location as l on \
l.location_id = d.location_id \
order by n.seconds asc \
) as d"
return db
def main():
url = 'http://localhost:1026/v2/entities/002/attrs?options=keyValues'
headers = {"Content-Type": "application/json", \
"fiware-service": "urbansense", \
"fiware-servicepath": "/basic"}
conn = None
try:
params = config()
with psycopg2.connect(**params) as conn:
with conn.cursor(name='my_cursor') as cur:
cur.itersize = 5000
cur.execute(val_json())
for row in tqdm(cur):
jsonData = json.dumps(row)
if jsonData.startswith('[') and jsonData.endswith(']'):
jsonData = jsonData[1:-1]
print()
print(jsonData)
requests.post(url, data= jsonData, headers=headers)
sleep(0.15)
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
if __name__ == '__main__':
main()
前十次迭代(有效载荷):
$python3 noiselevelObserved.py
0it [00:00, ?it/s]
{"measurand": 64.8, "sonometerClass": 1, "dateObserved": "1970-01-01T01:00:15+01:00", "name": "Trindade", "location": {"coordinates": [-8.609973, 41.151943]}}
1it [00:00, 1.75it/s]
{"measurand": 58.8, "sonometerClass": 0, "dateObserved": "1970-01-01T01:00:16+01:00", "name": "Trindade", "location": {"coordinates": [-8.609973, 41.151943]}}
2it [00:00, 2.23it/s]
{"measurand": 56.5, "sonometerClass": 0, "dateObserved": "1970-01-01T01:00:17+01:00", "name": "Trindade", "location": {"coordinates": [-8.609973, 41.151943]}}
3it [00:00, 2.76it/s]
{"measurand": 61.1, "sonometerClass": 1, "dateObserved": "1970-01-01T01:00:18+01:00", "name": "Casa da Musica", "location": {"coordinates": [-8.63041, 41.158091]}}
4it [00:01, 3.31it/s]
{"measurand": 108.5, "sonometerClass": 2, "dateObserved": "1970-01-01T01:00:18+01:00", "name": "Pr. Liberdade Cardosas", "location": {"coordinates": [-8.611119, 41.146023]}}
5it [00:01, 3.86it/s]
{"measurand": 56.5, "sonometerClass": 0, "dateObserved": "1970-01-01T01:00:18+01:00", "name": "Trindade", "location": {"coordinates": [-8.609973, 41.151943]}}
6it [00:01, 4.35it/s]
{"measurand": 59.9, "sonometerClass": 1, "dateObserved": "1970-01-01T01:00:19+01:00", "name": "Casa da Musica", "location": {"coordinates": [-8.63041, 41.158091]}}
7it [00:01, 4.78it/s]
{"measurand": 97.2, "sonometerClass": 2, "dateObserved": "1970-01-01T01:00:19+01:00", "name": "D. Manuel II", "location": {"coordinates": [-8.625192, 41.148558]}}
8it [00:01, 5.12it/s]
{"measurand": 108.6, "sonometerClass": 2, "dateObserved": "1970-01-01T01:00:19+01:00", "name": "Pr. Liberdade Cardosas", "location": {"coordinates": [-8.611119, 41.146023]}}
9it [00:01, 5.41it/s]
{"measurand": 57.1, "sonometerClass": 0, "dateObserved": "1970-01-01T01:00:19+01:00", "name": "Trindade", "location": {"coordinates": [-8.609973, 41.151943]}}
10it [00:02, 5.63it/s]
{"measurand": 53.9, "sonometerClass": 0, "dateObserved": "1970-01-01T01:00:20+01:00", "name": "Casa da Musica", "location": {"coordinates": [-8.63041, 41.158091]}}
正在读取持久保存到 postgres 中的属性值:
postgres=# select * from urbansense.basic_002_noiselevelobserved ;
recvtimets | recvtime | fiwareservicepath | entityid | entitytype | attrname | attrtype | attrvalue | attrmd
---------------+--------------------------+-------------------+----------+--------------------+----------------+-----------------+---------------------------------------+--------
1559045918129 | 2019-05-28T12:18:38.129Z | /basic | 002 | NoiseLevelObserved | dateObserved | Text | 1970-01-01T01:00:15+01:00 | []
1559045918129 | 2019-05-28T12:18:38.129Z | /basic | 002 | NoiseLevelObserved | latitude | Number | 41.1591 | []
1559045918129 | 2019-05-28T12:18:38.129Z | /basic | 002 | NoiseLevelObserved | location | StructuredValue | {"coordinates":[-8.609973,41.151943]} | []
1559045918129 | 2019-05-28T12:18:38.129Z | /basic | 002 | NoiseLevelObserved | longitude | Number | -8.65915 | []
1559045918129 | 2019-05-28T12:18:38.129Z | /basic | 002 | NoiseLevelObserved | measurand | Number | 64.8 | []
1559045918129 | 2019-05-28T12:18:38.129Z | /basic | 002 | NoiseLevelObserved | name | Text | Trindade | []
1559045918129 | 2019-05-28T12:18:38.129Z | /basic | 002 | NoiseLevelObserved | sonometerClass | Number | 1 | []
1559045919723 | 2019-05-28T12:18:39.723Z | /basic | 002 | NoiseLevelObserved | dateObserved | Text | 1970-01-01T01:00:20+01:00 | []
1559045919723 | 2019-05-28T12:18:39.723Z | /basic | 002 | NoiseLevelObserved | latitude | Number | 41.1591 | []
1559045919723 | 2019-05-28T12:18:39.723Z | /basic | 002 | NoiseLevelObserved | location | StructuredValue | {"coordinates":[-8.63041,41.158091]} | []
1559045919723 | 2019-05-28T12:18:39.723Z | /basic | 002 | NoiseLevelObserved | longitude | Number | -8.65915 | []
1559045919723 | 2019-05-28T12:18:39.723Z | /basic | 002 | NoiseLevelObserved | measurand | Number | 53.9 | []
1559045919723 | 2019-05-28T12:18:39.723Z | /basic | 002 | NoiseLevelObserved | name | Text | Casa da Musica | []
1559045919723 | 2019-05-28T12:18:39.723Z | /basic | 002 | NoiseLevelObserved | sonometerClass | Number | 0 | []
(14 rows)
将发射率更改为 1 秒间隔并没有带来太大改善,只有 3 个结果集(有效负载)被保留(丢失 70%):
postgres=# select * from urbansense.basic_002_noiselevelobserved ;
recvtimets | recvtime | fiwareservicepath | entityid | entitytype | attrname | attrtype | attrvalue | attrmd
---------------+--------------------------+-------------------+----------+--------------------+----------------+-----------------+---------------------------------------+--------
1559046840569 | 2019-05-28T12:34:00.569Z | /basic | 002 | NoiseLevelObserved | dateObserved | Text | 1970-01-01T01:00:15+01:00 | []
1559046840569 | 2019-05-28T12:34:00.569Z | /basic | 002 | NoiseLevelObserved | latitude | Number | 41.1591 | []
1559046840569 | 2019-05-28T12:34:00.569Z | /basic | 002 | NoiseLevelObserved | location | StructuredValue | {"coordinates":[-8.609973,41.151943]} | []
1559046840569 | 2019-05-28T12:34:00.569Z | /basic | 002 | NoiseLevelObserved | longitude | Number | -8.65915 | []
1559046840569 | 2019-05-28T12:34:00.569Z | /basic | 002 | NoiseLevelObserved | measurand | Number | 64.8 | []
1559046840569 | 2019-05-28T12:34:00.569Z | /basic | 002 | NoiseLevelObserved | name | Text | Trindade | []
1559046840569 | 2019-05-28T12:34:00.569Z | /basic | 002 | NoiseLevelObserved | sonometerClass | Number | 1 | []
1559046845620 | 2019-05-28T12:34:05.620Z | /basic | 002 | NoiseLevelObserved | dateObserved | Text | 1970-01-01T01:00:18+01:00 | []
1559046845620 | 2019-05-28T12:34:05.620Z | /basic | 002 | NoiseLevelObserved | latitude | Number | 41.1591 | []
1559046845620 | 2019-05-28T12:34:05.620Z | /basic | 002 | NoiseLevelObserved | location | StructuredValue | {"coordinates":[-8.609973,41.151943]} | []
1559046845620 | 2019-05-28T12:34:05.620Z | /basic | 002 | NoiseLevelObserved | longitude | Number | -8.65915 | []
1559046845620 | 2019-05-28T12:34:05.620Z | /basic | 002 | NoiseLevelObserved | measurand | Number | 56.5 | []
1559046845620 | 2019-05-28T12:34:05.620Z | /basic | 002 | NoiseLevelObserved | name | Text | Trindade | []
1559046845620 | 2019-05-28T12:34:05.620Z | /basic | 002 | NoiseLevelObserved | sonometerClass | Number | 0 | []
1559046850679 | 2019-05-28T12:34:10.679Z | /basic | 002 | NoiseLevelObserved | dateObserved | Text | 1970-01-01T01:00:20+01:00 | []
1559046850679 | 2019-05-28T12:34:10.679Z | /basic | 002 | NoiseLevelObserved | latitude | Number | 41.1591 | []
1559046850679 | 2019-05-28T12:34:10.679Z | /basic | 002 | NoiseLevelObserved | location | StructuredValue | {"coordinates":[-8.63041,41.158091]} | []
1559046850679 | 2019-05-28T12:34:10.679Z | /basic | 002 | NoiseLevelObserved | longitude | Number | -8.65915 | []
1559046850679 | 2019-05-28T12:34:10.679Z | /basic | 002 | NoiseLevelObserved | measurand | Number | 53.9 | []
1559046850679 | 2019-05-28T12:34:10.679Z | /basic | 002 | NoiseLevelObserved | name | Text | Casa da Musica | []
1559046850679 | 2019-05-28T12:34:10.679Z | /basic | 002 | NoiseLevelObserved | sonometerClass | Number | 0 | []
(21 rows)
老实说,我不希望发射率达到 秒 ,因为要迁移数据以继续我的研究。当我第一次以 1 秒的间隔尝试时,我意识到要花几个月的时间才能完成(也许 4 个月)。
问题: Orion CB
(或者可能 Cygnus
)是否不具备以这种速率接收 entity/attributes 值的能力(150 毫秒/0.15 秒),或者 Cygnus
是否 足够聪明 以这样的速度接收来自 Orion
的通知?。
对于确保所有值在最短时间内持久化的任何建议,我将不胜感激。
根据问题评论中的讨论,解决方案是删除订阅中的 throttling
参数。这是有道理的:节流使得一些通知没有被发送(在这种特殊情况下,所有通知的 80%)。
正在将非常大的数据集传输到 fiware
后端:SourceDB ->Orion -> Cygnus -> Postgres
。为此,我编写了一个 python 脚本来获取行,并为获取的每一行向 Orion 发送一个有效负载。
脚本以 150 毫秒(0.15 秒)的间隔触发,但令我惊讶的是,在大约 10 次迭代后,只有两个值(发送的第一个和最后一个有效负载)被保存到 Postgres
接收器。这意味着 80% 的数据集不会持久保存到接收器中。
脚本:
import psycopg2
from time import sleep
from config import config
from tqdm import tqdm
import requests
import json
def val_json():
db = "select to_json(d) from ( select \
n.noise_data as measurand, \
n.factor as \"sonometerClass\", \
to_timestamp(n.seconds) as \"dateObserved\", \
l.description as name, \
json_build_object( \
'coordinates', \
json_build_array(l.node_lon, l.node_lat) \
) as location \
from noise as n \
inner join deployment as d on \
d.deployment_id = n.deployment_id \
inner join location as l on \
l.location_id = d.location_id \
order by n.seconds asc \
) as d"
return db
def main():
url = 'http://localhost:1026/v2/entities/002/attrs?options=keyValues'
headers = {"Content-Type": "application/json", \
"fiware-service": "urbansense", \
"fiware-servicepath": "/basic"}
conn = None
try:
params = config()
with psycopg2.connect(**params) as conn:
with conn.cursor(name='my_cursor') as cur:
cur.itersize = 5000
cur.execute(val_json())
for row in tqdm(cur):
jsonData = json.dumps(row)
if jsonData.startswith('[') and jsonData.endswith(']'):
jsonData = jsonData[1:-1]
print()
print(jsonData)
requests.post(url, data= jsonData, headers=headers)
sleep(0.15)
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
if __name__ == '__main__':
main()
前十次迭代(有效载荷):
$python3 noiselevelObserved.py
0it [00:00, ?it/s]
{"measurand": 64.8, "sonometerClass": 1, "dateObserved": "1970-01-01T01:00:15+01:00", "name": "Trindade", "location": {"coordinates": [-8.609973, 41.151943]}}
1it [00:00, 1.75it/s]
{"measurand": 58.8, "sonometerClass": 0, "dateObserved": "1970-01-01T01:00:16+01:00", "name": "Trindade", "location": {"coordinates": [-8.609973, 41.151943]}}
2it [00:00, 2.23it/s]
{"measurand": 56.5, "sonometerClass": 0, "dateObserved": "1970-01-01T01:00:17+01:00", "name": "Trindade", "location": {"coordinates": [-8.609973, 41.151943]}}
3it [00:00, 2.76it/s]
{"measurand": 61.1, "sonometerClass": 1, "dateObserved": "1970-01-01T01:00:18+01:00", "name": "Casa da Musica", "location": {"coordinates": [-8.63041, 41.158091]}}
4it [00:01, 3.31it/s]
{"measurand": 108.5, "sonometerClass": 2, "dateObserved": "1970-01-01T01:00:18+01:00", "name": "Pr. Liberdade Cardosas", "location": {"coordinates": [-8.611119, 41.146023]}}
5it [00:01, 3.86it/s]
{"measurand": 56.5, "sonometerClass": 0, "dateObserved": "1970-01-01T01:00:18+01:00", "name": "Trindade", "location": {"coordinates": [-8.609973, 41.151943]}}
6it [00:01, 4.35it/s]
{"measurand": 59.9, "sonometerClass": 1, "dateObserved": "1970-01-01T01:00:19+01:00", "name": "Casa da Musica", "location": {"coordinates": [-8.63041, 41.158091]}}
7it [00:01, 4.78it/s]
{"measurand": 97.2, "sonometerClass": 2, "dateObserved": "1970-01-01T01:00:19+01:00", "name": "D. Manuel II", "location": {"coordinates": [-8.625192, 41.148558]}}
8it [00:01, 5.12it/s]
{"measurand": 108.6, "sonometerClass": 2, "dateObserved": "1970-01-01T01:00:19+01:00", "name": "Pr. Liberdade Cardosas", "location": {"coordinates": [-8.611119, 41.146023]}}
9it [00:01, 5.41it/s]
{"measurand": 57.1, "sonometerClass": 0, "dateObserved": "1970-01-01T01:00:19+01:00", "name": "Trindade", "location": {"coordinates": [-8.609973, 41.151943]}}
10it [00:02, 5.63it/s]
{"measurand": 53.9, "sonometerClass": 0, "dateObserved": "1970-01-01T01:00:20+01:00", "name": "Casa da Musica", "location": {"coordinates": [-8.63041, 41.158091]}}
正在读取持久保存到 postgres 中的属性值:
postgres=# select * from urbansense.basic_002_noiselevelobserved ;
recvtimets | recvtime | fiwareservicepath | entityid | entitytype | attrname | attrtype | attrvalue | attrmd
---------------+--------------------------+-------------------+----------+--------------------+----------------+-----------------+---------------------------------------+--------
1559045918129 | 2019-05-28T12:18:38.129Z | /basic | 002 | NoiseLevelObserved | dateObserved | Text | 1970-01-01T01:00:15+01:00 | []
1559045918129 | 2019-05-28T12:18:38.129Z | /basic | 002 | NoiseLevelObserved | latitude | Number | 41.1591 | []
1559045918129 | 2019-05-28T12:18:38.129Z | /basic | 002 | NoiseLevelObserved | location | StructuredValue | {"coordinates":[-8.609973,41.151943]} | []
1559045918129 | 2019-05-28T12:18:38.129Z | /basic | 002 | NoiseLevelObserved | longitude | Number | -8.65915 | []
1559045918129 | 2019-05-28T12:18:38.129Z | /basic | 002 | NoiseLevelObserved | measurand | Number | 64.8 | []
1559045918129 | 2019-05-28T12:18:38.129Z | /basic | 002 | NoiseLevelObserved | name | Text | Trindade | []
1559045918129 | 2019-05-28T12:18:38.129Z | /basic | 002 | NoiseLevelObserved | sonometerClass | Number | 1 | []
1559045919723 | 2019-05-28T12:18:39.723Z | /basic | 002 | NoiseLevelObserved | dateObserved | Text | 1970-01-01T01:00:20+01:00 | []
1559045919723 | 2019-05-28T12:18:39.723Z | /basic | 002 | NoiseLevelObserved | latitude | Number | 41.1591 | []
1559045919723 | 2019-05-28T12:18:39.723Z | /basic | 002 | NoiseLevelObserved | location | StructuredValue | {"coordinates":[-8.63041,41.158091]} | []
1559045919723 | 2019-05-28T12:18:39.723Z | /basic | 002 | NoiseLevelObserved | longitude | Number | -8.65915 | []
1559045919723 | 2019-05-28T12:18:39.723Z | /basic | 002 | NoiseLevelObserved | measurand | Number | 53.9 | []
1559045919723 | 2019-05-28T12:18:39.723Z | /basic | 002 | NoiseLevelObserved | name | Text | Casa da Musica | []
1559045919723 | 2019-05-28T12:18:39.723Z | /basic | 002 | NoiseLevelObserved | sonometerClass | Number | 0 | []
(14 rows)
将发射率更改为 1 秒间隔并没有带来太大改善,只有 3 个结果集(有效负载)被保留(丢失 70%):
postgres=# select * from urbansense.basic_002_noiselevelobserved ;
recvtimets | recvtime | fiwareservicepath | entityid | entitytype | attrname | attrtype | attrvalue | attrmd
---------------+--------------------------+-------------------+----------+--------------------+----------------+-----------------+---------------------------------------+--------
1559046840569 | 2019-05-28T12:34:00.569Z | /basic | 002 | NoiseLevelObserved | dateObserved | Text | 1970-01-01T01:00:15+01:00 | []
1559046840569 | 2019-05-28T12:34:00.569Z | /basic | 002 | NoiseLevelObserved | latitude | Number | 41.1591 | []
1559046840569 | 2019-05-28T12:34:00.569Z | /basic | 002 | NoiseLevelObserved | location | StructuredValue | {"coordinates":[-8.609973,41.151943]} | []
1559046840569 | 2019-05-28T12:34:00.569Z | /basic | 002 | NoiseLevelObserved | longitude | Number | -8.65915 | []
1559046840569 | 2019-05-28T12:34:00.569Z | /basic | 002 | NoiseLevelObserved | measurand | Number | 64.8 | []
1559046840569 | 2019-05-28T12:34:00.569Z | /basic | 002 | NoiseLevelObserved | name | Text | Trindade | []
1559046840569 | 2019-05-28T12:34:00.569Z | /basic | 002 | NoiseLevelObserved | sonometerClass | Number | 1 | []
1559046845620 | 2019-05-28T12:34:05.620Z | /basic | 002 | NoiseLevelObserved | dateObserved | Text | 1970-01-01T01:00:18+01:00 | []
1559046845620 | 2019-05-28T12:34:05.620Z | /basic | 002 | NoiseLevelObserved | latitude | Number | 41.1591 | []
1559046845620 | 2019-05-28T12:34:05.620Z | /basic | 002 | NoiseLevelObserved | location | StructuredValue | {"coordinates":[-8.609973,41.151943]} | []
1559046845620 | 2019-05-28T12:34:05.620Z | /basic | 002 | NoiseLevelObserved | longitude | Number | -8.65915 | []
1559046845620 | 2019-05-28T12:34:05.620Z | /basic | 002 | NoiseLevelObserved | measurand | Number | 56.5 | []
1559046845620 | 2019-05-28T12:34:05.620Z | /basic | 002 | NoiseLevelObserved | name | Text | Trindade | []
1559046845620 | 2019-05-28T12:34:05.620Z | /basic | 002 | NoiseLevelObserved | sonometerClass | Number | 0 | []
1559046850679 | 2019-05-28T12:34:10.679Z | /basic | 002 | NoiseLevelObserved | dateObserved | Text | 1970-01-01T01:00:20+01:00 | []
1559046850679 | 2019-05-28T12:34:10.679Z | /basic | 002 | NoiseLevelObserved | latitude | Number | 41.1591 | []
1559046850679 | 2019-05-28T12:34:10.679Z | /basic | 002 | NoiseLevelObserved | location | StructuredValue | {"coordinates":[-8.63041,41.158091]} | []
1559046850679 | 2019-05-28T12:34:10.679Z | /basic | 002 | NoiseLevelObserved | longitude | Number | -8.65915 | []
1559046850679 | 2019-05-28T12:34:10.679Z | /basic | 002 | NoiseLevelObserved | measurand | Number | 53.9 | []
1559046850679 | 2019-05-28T12:34:10.679Z | /basic | 002 | NoiseLevelObserved | name | Text | Casa da Musica | []
1559046850679 | 2019-05-28T12:34:10.679Z | /basic | 002 | NoiseLevelObserved | sonometerClass | Number | 0 | []
(21 rows)
老实说,我不希望发射率达到 秒 ,因为要迁移数据以继续我的研究。当我第一次以 1 秒的间隔尝试时,我意识到要花几个月的时间才能完成(也许 4 个月)。
问题: Orion CB
(或者可能 Cygnus
)是否不具备以这种速率接收 entity/attributes 值的能力(150 毫秒/0.15 秒),或者 Cygnus
是否 足够聪明 以这样的速度接收来自 Orion
的通知?。
对于确保所有值在最短时间内持久化的任何建议,我将不胜感激。
根据问题评论中的讨论,解决方案是删除订阅中的 throttling
参数。这是有道理的:节流使得一些通知没有被发送(在这种特殊情况下,所有通知的 80%)。