即使与 Python 客户端 Elasticsearch 的连接丢失,如何恢复流数据?
How to recover streaming data even if the connection lost with Python client Elasticsearch?
我从 https://www.n2yo.com/api/ 流式传输 RESTful API 数据,用于跟踪卫星位置。
我将 python 客户端与 Elasticsearch 一起使用。我每 10 秒将流式数据保存到 ES,并通过 Kibana 可视化。我的 ES 版本是 6.4.3
我的代码是:
URL = "https://www.n2yo.com/rest/v1/satellite/positions/25544/41.702/-76.014/0/2/&apiKey= your key"
es = Elasticsearch('http://ip:port',timeout=600)
settings = { "settings": {
"number_of_shards":1,
'number_of_replicas':0
},
"mappings" : {
"document" : {
"properties":{
"geo": {
"type": "geo_point"
}
}
}
}
}
try:
es.indices.create(index = "spacestation", body=settings)
except RequestError as es1:
print('Index already exists!!')
sys.exit(1)
def collect_data():
data = requests.get(url = URL).json()
del data['positions'][1]
new_data = {'geo':{'lat':data['positions'][0]['satlatitude'],
'lon':data['positions'][0]['satlongitude']},
'satname': data['info']['satname'], 'satid': data['info']['satid'],
'timestamp':datetime.fromtimestamp(data['positions'][0]['timestamp']).isoformat()
}
es.index(index='spacestation', doc_type='document', body=new_data)
schedule.every(10).seconds.do(collect_data)
while True:
schedule.run_pending()
time.sleep(1)
我的问题是:昨天我掉线了。错误如下,
requests.exceptions.ConnectionError: HTTPSConnectionPool(host='www.n2yo.com', port=443): Max retries exceeded with url: /rest/v1/satellite/positions/25544/41.702/-76.014/0/2/&apiKey= (Caused by NewConnectionError(': Failed to establish a new connection: [Errno -3] Temporary failure in name resolution',))
当我重新运行我的代码时,我不能,因为索引已经存在。如果我删除索引,我将丢失已经在 ES 中的数据。我可以做什么?我需要保留我保存的数据,并且我需要从现在开始 运行 这份工作。请问有什么解决办法吗?
仅当您收到来自 n2yo.com 的数据时才创建索引。你应该使用函数 es.indices.exists
。然后你使你的函数 collect_data
在失败的情况下递归。尝试:
URL = "https://www.n2yo.com/rest/v1/satellite/positions/25544/41.702/-76.014/0/2/&apiKey= your key"
es = Elasticsearch('http://ip:port',timeout=600)
def create_index()
if not es.indices.exists(index = "spacestation"):
settings = { "settings": {
"number_of_shards":1,
'number_of_replicas':0
},
"mappings" : {
"document" : {
"properties":{
"geo": {
"type": "geo_point"
}
}
}
}
}
es.indices.create(index = "spacestation", body=settings)
else:
print('Index already exists!!')
def collect_data():
try:
data = requests.get(url = URL).json()
create_index()
del data['positions'][1]
new_data = {'geo':{'lat':data['positions'][0]['satlatitude'],
'lon':data['positions'][0]['satlongitude']},
'satname': data['info']['satname'], 'satid': data['info']['satid'],
'timestamp':datetime.fromtimestamp(data['positions'][0]['timestamp']).isoformat()
}
es.index(index='spacestation', doc_type='document', body=new_data)
except:
collect_data()
schedule.every(10).seconds.do(collect_data)
while True:
schedule.run_pending()
time.sleep(1)
我从 https://www.n2yo.com/api/ 流式传输 RESTful API 数据,用于跟踪卫星位置。 我将 python 客户端与 Elasticsearch 一起使用。我每 10 秒将流式数据保存到 ES,并通过 Kibana 可视化。我的 ES 版本是 6.4.3
我的代码是:
URL = "https://www.n2yo.com/rest/v1/satellite/positions/25544/41.702/-76.014/0/2/&apiKey= your key"
es = Elasticsearch('http://ip:port',timeout=600)
settings = { "settings": {
"number_of_shards":1,
'number_of_replicas':0
},
"mappings" : {
"document" : {
"properties":{
"geo": {
"type": "geo_point"
}
}
}
}
}
try:
es.indices.create(index = "spacestation", body=settings)
except RequestError as es1:
print('Index already exists!!')
sys.exit(1)
def collect_data():
data = requests.get(url = URL).json()
del data['positions'][1]
new_data = {'geo':{'lat':data['positions'][0]['satlatitude'],
'lon':data['positions'][0]['satlongitude']},
'satname': data['info']['satname'], 'satid': data['info']['satid'],
'timestamp':datetime.fromtimestamp(data['positions'][0]['timestamp']).isoformat()
}
es.index(index='spacestation', doc_type='document', body=new_data)
schedule.every(10).seconds.do(collect_data)
while True:
schedule.run_pending()
time.sleep(1)
我的问题是:昨天我掉线了。错误如下,
requests.exceptions.ConnectionError: HTTPSConnectionPool(host='www.n2yo.com', port=443): Max retries exceeded with url: /rest/v1/satellite/positions/25544/41.702/-76.014/0/2/&apiKey= (Caused by NewConnectionError(': Failed to establish a new connection: [Errno -3] Temporary failure in name resolution',))
当我重新运行我的代码时,我不能,因为索引已经存在。如果我删除索引,我将丢失已经在 ES 中的数据。我可以做什么?我需要保留我保存的数据,并且我需要从现在开始 运行 这份工作。请问有什么解决办法吗?
仅当您收到来自 n2yo.com 的数据时才创建索引。你应该使用函数 es.indices.exists
。然后你使你的函数 collect_data
在失败的情况下递归。尝试:
URL = "https://www.n2yo.com/rest/v1/satellite/positions/25544/41.702/-76.014/0/2/&apiKey= your key"
es = Elasticsearch('http://ip:port',timeout=600)
def create_index()
if not es.indices.exists(index = "spacestation"):
settings = { "settings": {
"number_of_shards":1,
'number_of_replicas':0
},
"mappings" : {
"document" : {
"properties":{
"geo": {
"type": "geo_point"
}
}
}
}
}
es.indices.create(index = "spacestation", body=settings)
else:
print('Index already exists!!')
def collect_data():
try:
data = requests.get(url = URL).json()
create_index()
del data['positions'][1]
new_data = {'geo':{'lat':data['positions'][0]['satlatitude'],
'lon':data['positions'][0]['satlongitude']},
'satname': data['info']['satname'], 'satid': data['info']['satid'],
'timestamp':datetime.fromtimestamp(data['positions'][0]['timestamp']).isoformat()
}
es.index(index='spacestation', doc_type='document', body=new_data)
except:
collect_data()
schedule.every(10).seconds.do(collect_data)
while True:
schedule.run_pending()
time.sleep(1)