如何使用其更改源从 RethinkDB 数据库中删除数据

How to delete data from a RethinkDB database using its changefeed

我正在为一个数据库开发 'controller',该数据库不断累积数据,但只使用最近的数据,定义为不到 3 天。一旦数据超过 3 天,我想将其转储到 JSON 文件并将其从数据库中删除。

为了模拟这个,我做了以下操作。 'controller' 程序 rethinkdb_monitor.py

import json
import rethinkdb as r
import pytz
from datetime import datetime, timedelta

# The database and table are assumed to have been previously created
database_name = "sensor_db"
table_name = "sensor_data"

port_offset = 1         # To avoid interference of this testing program with the main program, all ports are initialized at an offset of 1 from the default ports using "rethinkdb --port_offset 1" at the command line.
conn = r.connect("localhost", 28015 + port_offset)

current_time = datetime.utcnow().replace(tzinfo=pytz.utc)   # Current time include timezone (assumed UTC)
retention_period = timedelta(days=3)                        # Period of time during which data is retained on the main server
expiry_time = current_time - retention_period               # Age of data which is removed from the main server

data_to_archive = r.db(database_name).table(table_name).filter(r.row['timestamp'] < expiry_time)
output_file = "archived_sensor_data.json"

with open(output_file, 'a') as f:
    for change in data_to_archive.changes().run(conn, time_format="raw"):        # The time_format="raw" option is passed to prevent a "RqlTzinfo object is not JSON serializable" error when dumping
        print change
        json.dump(change['new_val'], f)             # Since the main database we are reading from is append-only, the 'old_val' of the change is always None and we are interested in the 'new_val' only
        f.write("\n")                               # Separate entries by a new line

在运行这个程序之前,我使用

启动了RethinkDB
rethinkdb --port_offset 1

在命令行,并使用位于 localhost:8081 的 Web 界面创建了一个名为 sensor_db 的数据库和一个名为 sensor_data 的 table(见下文)。

一旦 rethinkdb_monitor.py 被 运行 宁并等待更改,我 运行 一个生成合成数据的脚本 rethinkdb_add_data.py:

import random
import faker
from datetime import datetime, timedelta
import pytz
import rethinkdb as r

class RandomData(object):
    def __init__(self, seed=None):
        self._seed = seed
        self._random = random.Random()
        self._random.seed(seed)
        self.fake = faker.Faker()
        self.fake.random.seed(seed)

    def __getattr__(self, x):
        return getattr(self._random, x)

    def name(self):
        return self.fake.name()

    def datetime(self, start=None, end=None):
        if start is None:
            start = datetime(2000, 1, 1, tzinfo=pytz.utc)  # Jan 1st 2000
        if end is None:
            end = datetime.utcnow().replace(tzinfo=pytz.utc)

        if isinstance(end, datetime):
            dt = end - start
        elif isinstance(end, timedelta):
            dt = end
        assert isinstance(dt, timedelta)

        random_dt = timedelta(microseconds=self._random.randrange(int(dt.total_seconds() * (10 ** 6))))
        return start + random_dt

# Rethinkdb has been started at a port offset of 1 using the "--port_offset 1" argument.
port_offset = 1
conn = r.connect("localhost", 28015 + port_offset).repl()

rd = RandomData(seed=0)         # Instantiate and seed a random data generator

# The database and table have been previously created (e.g. through the web interface at localhost:8081)
database_name = "sensor_db"
table_name = "sensor_data"

# Generate random data with timestamps uniformly distributed over the past 6 days
random_data_time_interval = timedelta(days=6)
start_random_data = datetime.utcnow().replace(tzinfo=pytz.utc) - random_data_time_interval

for _ in range(5):
    entry = {"name": rd.name(), "timestamp": rd.datetime(start=start_random_data)}
    r.db(database_name).table(table_name).insert(entry).run()

用Cntrl+C中断rethinkdb_monitor.py后,archived_sensor_data.json文件包含要归档的数据:

{"timestamp": {"timezone": "+00:00", "$reql_type$": "TIME", "epoch_time": 1475963599.347}, "id": "be2b5fd7-28df-48ee-b744-99856643265a", "name": "Elizabeth Woods"}
{"timestamp": {"timezone": "+00:00", "$reql_type$": "TIME", "epoch_time": 1475879797.486}, "id": "36d69236-f710-481b-82b6-4a62a1aae36c", "name": "Susan Wagner"}

然而,我仍在苦苦挣扎的是如何随后从数据库中删除这些数据。 delete 的命令语法似乎可以在 table 或选择上调用它,但是通过 changefeed 获得的 change 只是一个字典。

如何使用 changefeed 从数据库中不断删除数据?

我利用了每个 change 都包含数据库中相应文档的 ID 的事实,并使用 get 创建了一个带有此 ID 的选择:

with open(output_file, 'a') as f:
    for change in data_to_archive.changes().run(conn, time_format="raw"):        # The time_format="raw" option is passed to prevent a "RqlTzinfo object is not JSON serializable" error when dumping
        print change
        if change['new_val'] is not None:               # If the change is not a deletion
            json.dump(change['new_val'], f)             # Since the main database we are reading from is append-only, the 'old_val' of the change is always None and we are interested in the 'new_val' only
            f.write("\n")                               # Separate entries by a new line
            ID_to_delete = change['new_val']['id']                # Get the ID of the data to be deleted from the database
            r.db(database_name).table(table_name).get(ID_to_delete).delete().run(conn)

删除本身将被注册为更改,但我使用 if change['new_val'] is not None 语句将其过滤掉。