如何在 Google Cloud Dataflow 中设置到外部数据库服务器的 SSH 隧道?

How to set up a SSH tunnel in Google Cloud Dataflow to an external database server?

我在使用 DataflowRunner 使我的 Apache Beam 管道在 Cloud Dataflow 上工作时遇到问题。

管道的第一步是连接到托管在虚拟机上的外部 Postgresql 服务器,该服务器只能通过 SSH、端口 22 从外部访问,并提取一些数据。我无法更改这些防火墙规则,所以我只能通过通过 SSH 隧道连接到数据库服务器,也就是端口转发。

在我的代码中,我使用了 python 库 sshtunnel。当使用 DirectRunner:

从我的开发计算机启动管道时,它工作得很好
from sshtunnel import open_tunnel

with open_tunnel(
        (user_options.ssh_tunnel_host, user_options.ssh_tunnel_port),
        ssh_username=user_options.ssh_tunnel_user,
        ssh_password=user_options.ssh_tunnel_password,
        remote_bind_address=(user_options.dbhost, user_options.dbport)
    ) as tunnel:
        with beam.Pipeline(options=pipeline_options) as p:
            (p | "Read data" >> ReadFromSQL(
                host=tunnel.local_bind_host,
                port=tunnel.local_bind_port,
                username=user_options.dbusername,
                password=user_options.dbpassword,
                database=user_options.dbname,
                wrapper=PostgresWrapper,
                query=select_query
            )
                | "Format CSV" >> DictToCSV(headers)
                | "Write CSV" >> WriteToText(user_options.export_location)
            )

相同的代码,在非默认 VPC 中使用 DataflowRunner 启动,其中所有入口都被拒绝但没有出口限制,并且配置了 CloudNAT,失败并显示此消息:

psycopg2.OperationalError: could not connect to server: Connection refused Is the server running on host "0.0.0.0" and accepting TCP/IP connections on port 41697? [while running 'Read data/Read']

所以,显然我的隧道出了点问题,但我无法确定到底是什么。我开始怀疑是否可以通过 CloudNAT 直接设置 SSH 隧道,直到我发现这个博客 post:https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1 说明:

A core strength of Cloud Dataflow is that you can call external services for data enrichment. For example, you can call a micro service to get additional data for an element. Within a DoFn, call-out to the service (usually done via HTTP). You have full control to make any type of connection that you choose, so long as the firewall rules you set up within your project/network allow it.

所以应该可以建立这个隧道!我不想放弃,但我不知道接下来要尝试什么。有什么想法吗?

感谢阅读

问题解决了!我简直不敢相信我已经花了整整两天的时间...我完全看错了方向。

问题不在于某些 Dataflow 或 GCP 网络配置,据我所知...

You have full control to make any type of connection that you choose, so long as the firewall rules you set up within your project/network allow it

是真的。

问题当然出在我的代码中:只有在分布式环境中才会出现问题。我错误地从主管道处理器而不是工作人员打开隧道。所以 SSH 隧道已启动,但在工作人员和目标服务器之间没有启动,仅在主管道和目标服务器之间启动!

为了解决这个问题,我不得不更改我的请求 DoFn 以使用隧道包装查询执行:

class TunnelledSQLSourceDoFn(sql.SQLSourceDoFn):
"""Wraps SQLSourceDoFn in a ssh tunnel"""

def __init__(self, *args, **kwargs):
    self.dbport = kwargs["port"]
    self.dbhost = kwargs["host"]
    self.args = args
    self.kwargs = kwargs
    super().__init__(*args, **kwargs)

def process(self, query, *args, **kwargs):
    # Remote side of the SSH Tunnel
    remote_address = (self.dbhost, self.dbport)
    ssh_tunnel = (self.kwargs['ssh_host'], self.kwargs['ssh_port'])
    with open_tunnel(
        ssh_tunnel,
        ssh_username=self.kwargs["ssh_user"],
        ssh_password=self.kwargs["ssh_password"],
        remote_bind_address=remote_address,
        set_keepalive=10.0
    ) as tunnel:
        forwarded_port = tunnel.local_bind_port
        self.kwargs["port"] = forwarded_port
        source = sql.SQLSource(*self.args, **self.kwargs)
        sql.SQLSouceInput._build_value(source, source.runtime_params)
        logging.info("Processing - {}".format(query))
        for records, schema in source.client.read(query):
            for row in records:
                yield source.client.row_as_dict(row, schema)

如您所见,我不得不覆盖 pysql_beam 库的一些位。

最后,每个工作人员为每个请求打开自己的隧道。可能可以优化此行为,但这足以满足我的需求。