如何处理热备副本和服务器端游标问题

How to deal with Hot Standby replicas and server-side cursor issue

我有一个问题,我需要从副本 PostgreSQL 数据库中获取一次 N-tables 并且每天获取在 45 天内编辑的所有行。副本配置为热备用副本,其中对主数据库的更新导致我的连接/事务被终止并抛出 DatabaseError。

我尝试使用迭代大小设置为 100 000 的命名服务器端游标,但问题仍然存在。我还将事务级别更改为可重复读取。

我需要将 SELECT * FROM table 结果写入 Apache Avro 文件并将它们移动到 Cloud Storage。由于缺少 disk-space 这些文件需要在迭代之间移动和删除,因此这会导致打开连接需要一些额外的时间。

任何关于如何避免的建议:

ERROR 2019-02-01 15:51:25,317: DatabaseError occurred: Traceback (most recent call last):
  File "main.py", line 202, in export_data
    rows = cur.fetchmany(itersize)
  File "/home/userA/data-export/lib/python2.7/site-packages/psycopg2/extras.py", line 93, in fetchmany
    res = super(DictCursorBase, self).fetchmany(size)
TransactionRollbackError: terminating connection due to conflict with recovery
DETAIL:  User query might have needed to see row versions that must be removed.
HINT:  In a moment you should be able to reconnect to the database and repeat your command.
server closed the connection unexpectedly
    This probably means the server terminated abnormally
    before or while processing the request.

我也尝试过使用 Apache Sqoop 完成这项工作,但最终还是遇到了同样的问题。

with connection(connection_params=connection_params) as c:
    c.set_isolation_level(ISOLATION_LEVEL_REPEATABLE_READ)
    with cursor(connection=c, cursorname='{brand}_{database}_{tablename}'.format(brand=brand_name,database=db, tablename=table_name)) as cur:
        try:
            cur.itersize = 100000
            cur.execute(sql)
            results = []
            while True:
                rows = cur.fetchmany(100000)
                if not rows:
                   break
                for row in rows: 
          results.append(return_record(columns=list(row.keys()),row=row, cast_types=True))

            outputpath = './exports/file/test.avro'

            if writer(schemafile=schema_file, outputpath=outputpath, data=results):
              logging.info('Write finished, moving file {} to GCS'.format(outputpath))
              upload_failed += 1 if not upload_file(storage_config=config, filepath=outputpath) else upload_failed
            else:
              write_failed += 1
            counter += 1
            del results[:]
        except ValueError:
          logging.error('ValueError occurred: %s', traceback.format_exc()) 
          cli.report(traceback.format_exc())
        except (Exception, DatabaseError):
          logging.error('DatabaseError occurred: %s', traceback.format_exc())
          cli.report(traceback.format_exc())

此错误与您的查询无关,仅与您的复制配置和查询持续时间有关。

复制冲突发生在

  1. VACUUM 删除主数据库上的旧行版本,备用数据库上的长 运行 查询可能仍然需要。

  2. ACCESS EXCLUSIVE 主数据库上的锁与备用数据库上的查询发生冲突。此类锁由 ALTER TABLEDROP TABLETRUNCATECREATE INDEXCLUSTER 和类似的锁获取,而且当 VACUUM 截断空页时table.

  3. 结束

你遇到的是第一个问题。

有两种补救措施:

  1. 在备用机上设置hot_standby_feedback = on。然后,主数据库不会在 VACUUM 期间删除备用数据库可能仍需要的旧行版本。不利的一面是,如果繁忙的 tables 上的 autovacuum 被阻塞,这可能会导致主数据库 table 膨胀。

  2. max_standby_streaming_delay 设置为比您在备用数据库上的最长查询更长的值(或 -1 表示无穷大)。然后主服务器上的冲突更改会流式传输到备用服务器,但更改的应用会延迟。这意味着备用数据库可能会落后。此技术还有助于解决上述第二种类型的冲突。

你必须做出选择,但不要忘记所有方法都有可能无法接受的缺点table。