如何根据当前时间查询RethinkDB
How to query RethinkDB based on the current time
我正在尝试为 RethinkDB 数据库编写一个 'controller' 程序,该程序使用 RethinkDB 的 changefeed 功能连续转储到 JSON 并删除超过 3 天的数据。
问题是当前时间的查询'hangs',在定义查询时使用datetime.utcnow()
(或者,rethinkdb.now()
)进行评估,剩余此后固定。因此,随着 changefeed 的进行,查询变为 'outdated'.
如何进行连续 'updated' 的查询以反映当前时间?
为了说明问题,这里是目前的脚本:
import json
import rethinkdb as r
import pytz
from datetime import datetime, timedelta
# The database and table are assumed to have been previously created
database_name = "sensor_db"
table_name = "sensor_data"
table = r.db(database_name).table(table_name)
port_offset = 1 # To avoid interference of this testing program with the main program, all ports are initialized at an offset of 1 from the default ports using "rethinkdb --port_offset 1" at the command line.
conn = r.connect("localhost", 28015 + port_offset)
current_time = datetime.utcnow().replace(tzinfo=pytz.utc) # Current time including timezone (assumed to be UTC)
retention_period = timedelta(days=3) # Period of time during which data is retained on the main server
expiry_time = current_time - retention_period # Age at which data is removed from the main database
if "timestamp" in table.index_list().run(conn): # Assuming the table has "timestamp" as a secondary index, use "between" (for improved speed)
beginning_of_time = r.time(1400, 1, 1, 'Z') # The minimum time of a ReQL time object (the year 1400)
data_to_archive = table.between(beginning_of_time, expiry_time, index="timestamp")
else: # Else, use "filter" (requires more memory, but does not require "timestamp" to be a secondary index)
data_to_archive = table.filter(r.row['timestamp'] < expiry_time)
output_file = "archived_sensor_data.json"
with open(output_file, 'a') as f:
for change in data_to_archive.changes().run(conn, time_format="raw"): # The time_format="raw" option is passed to prevent a "RqlTzinfo object is not JSON serializable" error when dumping
if change['new_val'] is not None: # If the change is not a deletion
print change
json.dump(change['new_val'], f) # Since the main database we are reading from is append-only, the 'old_val' of the change is always None and we are interested in the 'new_val' only
f.write("\n") # Separate entries by a new line
ID_to_delete = change['new_val']['id'] # Get the ID of the data to be deleted from the database
r.db(database_name).table(table_name).get(ID_to_delete).delete().run(conn)
查询存储在变量data_to_archive
中。但是,between
语句中的时间间隔是根据定义current_time
变量时的utcnow()
,并不会在changefeed中持续更新。我怎么做到的?
我最终通过在 'batch' 模式下进行转储而不是连续使用 changes()
来解决这个问题。 (也就是说,我正在使用 schedule 模块)。
这是脚本:
import json
import rethinkdb as r
import pytz
from datetime import datetime, timedelta
import schedule
import time
import functools
def generate_archiving_query(retention_period=timedelta(days=3), database_name="ipercron", table_name="sensor_data", conn=None):
if conn is None:
conn = r.connect("localhost", 28015)
table = r.db(database_name).table(table_name) # RethinkDB cursor for the table of interest
current_time = r.now()
expiry_time = current_time - retention_period.total_seconds()
if "timestamp" in table.index_list().run(conn): # If the table has "timestamp" as a secondary index, use "between" (for improved speed)
beginning_of_time = r.time(1400, 1, 1, 'Z') # The minimum time of a ReQL time object (the year 1400)
data_to_archive = table.between(beginning_of_time, expiry_time, index="timestamp")
else: # Else, use "filter" (requires more memory, but does not require "timestamp" to be a secondary index)
data_to_archive = table.filter(r.row['timestamp'] < expiry_time)
# try:
# beginning_of_time = r.time(1400, 1, 1, 'Z') # The minimum time of a ReQL time object (the year 1400)
# data_to_archive = table.between(beginning_of_time, expiry_time, index="timestamp")
# except:
# data_to_archive = table.filter(r.row['timestamp'] < expiry_time)
return data_to_archive
def archiving_job(data_to_archive=None, output_file="archived_sensor_data.json", database_name="ipercron", table_name="sensor_data", conn=None):
if data_to_archive is None:
data_to_archive = generate_archiving_query()
if conn is None:
conn = r.connect("localhost", 28015)
table = r.db(database_name).table(table_name)
old_data = data_to_archive.run(conn, time_format="raw") # Without time_format="raw" the output does not dump to JSON
with open(output_file, 'a') as f:
ids_to_delete = []
for item in old_data:
print item
json.dump(item, f)
f.write('\n') # Separate each document by a new line
ids_to_delete.append(item['id'])
# table.get(item['id']).delete().run(conn)
table.get_all(r.args(ids_to_delete)).delete().run(conn)
if __name__ == "__main__":
# The database and table are assumed to have been previously created
database_name = "ipercron"
table_name = "sensor_data"
# table = r.db(database_name).table(table_name)
port_offset = 1 # To avoid interference of this testing program with the main program, all ports are initialized at an offset of 1 from the default ports using "rethinkdb --port_offset 1" at the command line.
conn = r.connect("localhost", 28015 + port_offset)
clean_slate = True
if clean_slate:
r.db(database_name).table(table_name).delete().run(conn) # For testing, start with an empty table and add a fixed amount of data
import rethinkdb_add_data
data_to_archive = generate_archiving_query(conn=conn, database_name=database_name, table_name=table_name) # Because r.now() is evaluated upon run(), the query needs only to be generated once
archiving_job_fixed_query = functools.partial(archiving_job, data_to_archive=data_to_archive, conn=conn)
schedule.every(0.1).minutes.do(archiving_job_fixed_query)
while True:
schedule.run_pending()
我正在尝试为 RethinkDB 数据库编写一个 'controller' 程序,该程序使用 RethinkDB 的 changefeed 功能连续转储到 JSON 并删除超过 3 天的数据。
问题是当前时间的查询'hangs',在定义查询时使用datetime.utcnow()
(或者,rethinkdb.now()
)进行评估,剩余此后固定。因此,随着 changefeed 的进行,查询变为 'outdated'.
如何进行连续 'updated' 的查询以反映当前时间?
为了说明问题,这里是目前的脚本:
import json
import rethinkdb as r
import pytz
from datetime import datetime, timedelta
# The database and table are assumed to have been previously created
database_name = "sensor_db"
table_name = "sensor_data"
table = r.db(database_name).table(table_name)
port_offset = 1 # To avoid interference of this testing program with the main program, all ports are initialized at an offset of 1 from the default ports using "rethinkdb --port_offset 1" at the command line.
conn = r.connect("localhost", 28015 + port_offset)
current_time = datetime.utcnow().replace(tzinfo=pytz.utc) # Current time including timezone (assumed to be UTC)
retention_period = timedelta(days=3) # Period of time during which data is retained on the main server
expiry_time = current_time - retention_period # Age at which data is removed from the main database
if "timestamp" in table.index_list().run(conn): # Assuming the table has "timestamp" as a secondary index, use "between" (for improved speed)
beginning_of_time = r.time(1400, 1, 1, 'Z') # The minimum time of a ReQL time object (the year 1400)
data_to_archive = table.between(beginning_of_time, expiry_time, index="timestamp")
else: # Else, use "filter" (requires more memory, but does not require "timestamp" to be a secondary index)
data_to_archive = table.filter(r.row['timestamp'] < expiry_time)
output_file = "archived_sensor_data.json"
with open(output_file, 'a') as f:
for change in data_to_archive.changes().run(conn, time_format="raw"): # The time_format="raw" option is passed to prevent a "RqlTzinfo object is not JSON serializable" error when dumping
if change['new_val'] is not None: # If the change is not a deletion
print change
json.dump(change['new_val'], f) # Since the main database we are reading from is append-only, the 'old_val' of the change is always None and we are interested in the 'new_val' only
f.write("\n") # Separate entries by a new line
ID_to_delete = change['new_val']['id'] # Get the ID of the data to be deleted from the database
r.db(database_name).table(table_name).get(ID_to_delete).delete().run(conn)
查询存储在变量data_to_archive
中。但是,between
语句中的时间间隔是根据定义current_time
变量时的utcnow()
,并不会在changefeed中持续更新。我怎么做到的?
我最终通过在 'batch' 模式下进行转储而不是连续使用 changes()
来解决这个问题。 (也就是说,我正在使用 schedule 模块)。
这是脚本:
import json
import rethinkdb as r
import pytz
from datetime import datetime, timedelta
import schedule
import time
import functools
def generate_archiving_query(retention_period=timedelta(days=3), database_name="ipercron", table_name="sensor_data", conn=None):
if conn is None:
conn = r.connect("localhost", 28015)
table = r.db(database_name).table(table_name) # RethinkDB cursor for the table of interest
current_time = r.now()
expiry_time = current_time - retention_period.total_seconds()
if "timestamp" in table.index_list().run(conn): # If the table has "timestamp" as a secondary index, use "between" (for improved speed)
beginning_of_time = r.time(1400, 1, 1, 'Z') # The minimum time of a ReQL time object (the year 1400)
data_to_archive = table.between(beginning_of_time, expiry_time, index="timestamp")
else: # Else, use "filter" (requires more memory, but does not require "timestamp" to be a secondary index)
data_to_archive = table.filter(r.row['timestamp'] < expiry_time)
# try:
# beginning_of_time = r.time(1400, 1, 1, 'Z') # The minimum time of a ReQL time object (the year 1400)
# data_to_archive = table.between(beginning_of_time, expiry_time, index="timestamp")
# except:
# data_to_archive = table.filter(r.row['timestamp'] < expiry_time)
return data_to_archive
def archiving_job(data_to_archive=None, output_file="archived_sensor_data.json", database_name="ipercron", table_name="sensor_data", conn=None):
if data_to_archive is None:
data_to_archive = generate_archiving_query()
if conn is None:
conn = r.connect("localhost", 28015)
table = r.db(database_name).table(table_name)
old_data = data_to_archive.run(conn, time_format="raw") # Without time_format="raw" the output does not dump to JSON
with open(output_file, 'a') as f:
ids_to_delete = []
for item in old_data:
print item
json.dump(item, f)
f.write('\n') # Separate each document by a new line
ids_to_delete.append(item['id'])
# table.get(item['id']).delete().run(conn)
table.get_all(r.args(ids_to_delete)).delete().run(conn)
if __name__ == "__main__":
# The database and table are assumed to have been previously created
database_name = "ipercron"
table_name = "sensor_data"
# table = r.db(database_name).table(table_name)
port_offset = 1 # To avoid interference of this testing program with the main program, all ports are initialized at an offset of 1 from the default ports using "rethinkdb --port_offset 1" at the command line.
conn = r.connect("localhost", 28015 + port_offset)
clean_slate = True
if clean_slate:
r.db(database_name).table(table_name).delete().run(conn) # For testing, start with an empty table and add a fixed amount of data
import rethinkdb_add_data
data_to_archive = generate_archiving_query(conn=conn, database_name=database_name, table_name=table_name) # Because r.now() is evaluated upon run(), the query needs only to be generated once
archiving_job_fixed_query = functools.partial(archiving_job, data_to_archive=data_to_archive, conn=conn)
schedule.every(0.1).minutes.do(archiving_job_fixed_query)
while True:
schedule.run_pending()