如何使用其更改源从 RethinkDB 数据库中删除数据
How to delete data from a RethinkDB database using its changefeed
我正在为一个数据库开发 'controller',该数据库不断累积数据,但只使用最近的数据,定义为不到 3 天。一旦数据超过 3 天,我想将其转储到 JSON 文件并将其从数据库中删除。
为了模拟这个,我做了以下操作。 'controller' 程序 rethinkdb_monitor.py
是
import json
import rethinkdb as r
import pytz
from datetime import datetime, timedelta
# The database and table are assumed to have been previously created
database_name = "sensor_db"
table_name = "sensor_data"
port_offset = 1 # To avoid interference of this testing program with the main program, all ports are initialized at an offset of 1 from the default ports using "rethinkdb --port_offset 1" at the command line.
conn = r.connect("localhost", 28015 + port_offset)
current_time = datetime.utcnow().replace(tzinfo=pytz.utc) # Current time include timezone (assumed UTC)
retention_period = timedelta(days=3) # Period of time during which data is retained on the main server
expiry_time = current_time - retention_period # Age of data which is removed from the main server
data_to_archive = r.db(database_name).table(table_name).filter(r.row['timestamp'] < expiry_time)
output_file = "archived_sensor_data.json"
with open(output_file, 'a') as f:
for change in data_to_archive.changes().run(conn, time_format="raw"): # The time_format="raw" option is passed to prevent a "RqlTzinfo object is not JSON serializable" error when dumping
print change
json.dump(change['new_val'], f) # Since the main database we are reading from is append-only, the 'old_val' of the change is always None and we are interested in the 'new_val' only
f.write("\n") # Separate entries by a new line
在运行这个程序之前,我使用
启动了RethinkDB
rethinkdb --port_offset 1
在命令行,并使用位于 localhost:8081
的 Web 界面创建了一个名为 sensor_db
的数据库和一个名为 sensor_data
的 table(见下文)。
一旦 rethinkdb_monitor.py
被 运行 宁并等待更改,我 运行 一个生成合成数据的脚本 rethinkdb_add_data.py
:
import random
import faker
from datetime import datetime, timedelta
import pytz
import rethinkdb as r
class RandomData(object):
def __init__(self, seed=None):
self._seed = seed
self._random = random.Random()
self._random.seed(seed)
self.fake = faker.Faker()
self.fake.random.seed(seed)
def __getattr__(self, x):
return getattr(self._random, x)
def name(self):
return self.fake.name()
def datetime(self, start=None, end=None):
if start is None:
start = datetime(2000, 1, 1, tzinfo=pytz.utc) # Jan 1st 2000
if end is None:
end = datetime.utcnow().replace(tzinfo=pytz.utc)
if isinstance(end, datetime):
dt = end - start
elif isinstance(end, timedelta):
dt = end
assert isinstance(dt, timedelta)
random_dt = timedelta(microseconds=self._random.randrange(int(dt.total_seconds() * (10 ** 6))))
return start + random_dt
# Rethinkdb has been started at a port offset of 1 using the "--port_offset 1" argument.
port_offset = 1
conn = r.connect("localhost", 28015 + port_offset).repl()
rd = RandomData(seed=0) # Instantiate and seed a random data generator
# The database and table have been previously created (e.g. through the web interface at localhost:8081)
database_name = "sensor_db"
table_name = "sensor_data"
# Generate random data with timestamps uniformly distributed over the past 6 days
random_data_time_interval = timedelta(days=6)
start_random_data = datetime.utcnow().replace(tzinfo=pytz.utc) - random_data_time_interval
for _ in range(5):
entry = {"name": rd.name(), "timestamp": rd.datetime(start=start_random_data)}
r.db(database_name).table(table_name).insert(entry).run()
用Cntrl+C中断rethinkdb_monitor.py
后,archived_sensor_data.json
文件包含要归档的数据:
{"timestamp": {"timezone": "+00:00", "$reql_type$": "TIME", "epoch_time": 1475963599.347}, "id": "be2b5fd7-28df-48ee-b744-99856643265a", "name": "Elizabeth Woods"}
{"timestamp": {"timezone": "+00:00", "$reql_type$": "TIME", "epoch_time": 1475879797.486}, "id": "36d69236-f710-481b-82b6-4a62a1aae36c", "name": "Susan Wagner"}
然而,我仍在苦苦挣扎的是如何随后从数据库中删除这些数据。 delete
的命令语法似乎可以在 table 或选择上调用它,但是通过 changefeed 获得的 change
只是一个字典。
如何使用 changefeed 从数据库中不断删除数据?
我利用了每个 change
都包含数据库中相应文档的 ID 的事实,并使用 get
创建了一个带有此 ID 的选择:
with open(output_file, 'a') as f:
for change in data_to_archive.changes().run(conn, time_format="raw"): # The time_format="raw" option is passed to prevent a "RqlTzinfo object is not JSON serializable" error when dumping
print change
if change['new_val'] is not None: # If the change is not a deletion
json.dump(change['new_val'], f) # Since the main database we are reading from is append-only, the 'old_val' of the change is always None and we are interested in the 'new_val' only
f.write("\n") # Separate entries by a new line
ID_to_delete = change['new_val']['id'] # Get the ID of the data to be deleted from the database
r.db(database_name).table(table_name).get(ID_to_delete).delete().run(conn)
删除本身将被注册为更改,但我使用 if change['new_val'] is not None
语句将其过滤掉。
我正在为一个数据库开发 'controller',该数据库不断累积数据,但只使用最近的数据,定义为不到 3 天。一旦数据超过 3 天,我想将其转储到 JSON 文件并将其从数据库中删除。
为了模拟这个,我做了以下操作。 'controller' 程序 rethinkdb_monitor.py
是
import json
import rethinkdb as r
import pytz
from datetime import datetime, timedelta
# The database and table are assumed to have been previously created
database_name = "sensor_db"
table_name = "sensor_data"
port_offset = 1 # To avoid interference of this testing program with the main program, all ports are initialized at an offset of 1 from the default ports using "rethinkdb --port_offset 1" at the command line.
conn = r.connect("localhost", 28015 + port_offset)
current_time = datetime.utcnow().replace(tzinfo=pytz.utc) # Current time include timezone (assumed UTC)
retention_period = timedelta(days=3) # Period of time during which data is retained on the main server
expiry_time = current_time - retention_period # Age of data which is removed from the main server
data_to_archive = r.db(database_name).table(table_name).filter(r.row['timestamp'] < expiry_time)
output_file = "archived_sensor_data.json"
with open(output_file, 'a') as f:
for change in data_to_archive.changes().run(conn, time_format="raw"): # The time_format="raw" option is passed to prevent a "RqlTzinfo object is not JSON serializable" error when dumping
print change
json.dump(change['new_val'], f) # Since the main database we are reading from is append-only, the 'old_val' of the change is always None and we are interested in the 'new_val' only
f.write("\n") # Separate entries by a new line
在运行这个程序之前,我使用
启动了RethinkDBrethinkdb --port_offset 1
在命令行,并使用位于 localhost:8081
的 Web 界面创建了一个名为 sensor_db
的数据库和一个名为 sensor_data
的 table(见下文)。
一旦 rethinkdb_monitor.py
被 运行 宁并等待更改,我 运行 一个生成合成数据的脚本 rethinkdb_add_data.py
:
import random
import faker
from datetime import datetime, timedelta
import pytz
import rethinkdb as r
class RandomData(object):
def __init__(self, seed=None):
self._seed = seed
self._random = random.Random()
self._random.seed(seed)
self.fake = faker.Faker()
self.fake.random.seed(seed)
def __getattr__(self, x):
return getattr(self._random, x)
def name(self):
return self.fake.name()
def datetime(self, start=None, end=None):
if start is None:
start = datetime(2000, 1, 1, tzinfo=pytz.utc) # Jan 1st 2000
if end is None:
end = datetime.utcnow().replace(tzinfo=pytz.utc)
if isinstance(end, datetime):
dt = end - start
elif isinstance(end, timedelta):
dt = end
assert isinstance(dt, timedelta)
random_dt = timedelta(microseconds=self._random.randrange(int(dt.total_seconds() * (10 ** 6))))
return start + random_dt
# Rethinkdb has been started at a port offset of 1 using the "--port_offset 1" argument.
port_offset = 1
conn = r.connect("localhost", 28015 + port_offset).repl()
rd = RandomData(seed=0) # Instantiate and seed a random data generator
# The database and table have been previously created (e.g. through the web interface at localhost:8081)
database_name = "sensor_db"
table_name = "sensor_data"
# Generate random data with timestamps uniformly distributed over the past 6 days
random_data_time_interval = timedelta(days=6)
start_random_data = datetime.utcnow().replace(tzinfo=pytz.utc) - random_data_time_interval
for _ in range(5):
entry = {"name": rd.name(), "timestamp": rd.datetime(start=start_random_data)}
r.db(database_name).table(table_name).insert(entry).run()
用Cntrl+C中断rethinkdb_monitor.py
后,archived_sensor_data.json
文件包含要归档的数据:
{"timestamp": {"timezone": "+00:00", "$reql_type$": "TIME", "epoch_time": 1475963599.347}, "id": "be2b5fd7-28df-48ee-b744-99856643265a", "name": "Elizabeth Woods"}
{"timestamp": {"timezone": "+00:00", "$reql_type$": "TIME", "epoch_time": 1475879797.486}, "id": "36d69236-f710-481b-82b6-4a62a1aae36c", "name": "Susan Wagner"}
然而,我仍在苦苦挣扎的是如何随后从数据库中删除这些数据。 delete
的命令语法似乎可以在 table 或选择上调用它,但是通过 changefeed 获得的 change
只是一个字典。
如何使用 changefeed 从数据库中不断删除数据?
我利用了每个 change
都包含数据库中相应文档的 ID 的事实,并使用 get
创建了一个带有此 ID 的选择:
with open(output_file, 'a') as f:
for change in data_to_archive.changes().run(conn, time_format="raw"): # The time_format="raw" option is passed to prevent a "RqlTzinfo object is not JSON serializable" error when dumping
print change
if change['new_val'] is not None: # If the change is not a deletion
json.dump(change['new_val'], f) # Since the main database we are reading from is append-only, the 'old_val' of the change is always None and we are interested in the 'new_val' only
f.write("\n") # Separate entries by a new line
ID_to_delete = change['new_val']['id'] # Get the ID of the data to be deleted from the database
r.db(database_name).table(table_name).get(ID_to_delete).delete().run(conn)
删除本身将被注册为更改,但我使用 if change['new_val'] is not None
语句将其过滤掉。