如何从 python 启动并动态连接到 h2o 集群实例?

How to start and dynamically connect to h2o cluster instance from python?

根据问题和后续答案:在 hadoop 集群上启动 h2o 实例 运行ning 时,(比如 hadoop jar h2odriver.jar -nodes 4 -mapperXmx 6g -output hdfsOutputDir)回调 IP用于连接到 h2o 实例的地址由 hadoop 运行 时间 select 编辑。所以在大多数情况下,IP 地址和端口是 select 由 Hadoop 运行 时间找到最好的,看起来像

....
H2O node 172.18.4.63:54321 reports H2O cluster size 4
H2O node 172.18.4.67:54321 reports H2O cluster size 4
H2O cluster (4 nodes) is up
(Note: Use the -disown option to exit the driver after cluster formation)
Open H2O Flow in your web browser: http://172.18.4.67:54321
Connection url output line: Open H2O Flow in your web browser: http://172.18.4.67:54321

推荐的h2o使用方式是每次要使用时启动和停止单个实例(抱歉,目前无法找到支持文档)。这里的问题是,如果你想让你的 python 代码启动并自动连接到 h2o 实例,它不会知道要连接到哪个 IP,直到 h2o 实例是已经启动 运行ning。因此,在 Hadoop 上启动 H2O 集群的一种常见方法是让 Hadoop 决定集群,然后解析行

的输出
Open H2O Flow in your web browser: x.x.x.x:54321

到get/extract IP地址。

这里的问题是 h2o 是一个阻塞进程,当实例启动时,它的输出打印为 文本行,而不是批量打印,这使得我很难获得使用基本 python Popen 逻辑捕获输出所需的最终输出行。有没有办法在生成输出时捕获输出以获取连接 IP 的线路?

我最终使用的解决方案是在单独的线程中启动 h2o 进程,然后通过队列将输出传回主线程,然后我们从队列中读取并使用正则表达式搜索连接 IP .请参阅下面的示例。

# startup hadoop h2o cluster
import shlex
import re

from Queue import Queue, Empty
from threading import Thread

def enqueue_output(out, queue):
    """
    Function for communicating streaming text lines from seperate thread.
    see 
    """
    for line in iter(out.readline, b''):
        queue.put(line)
    out.close()

# series of commands to run in-order for for bringing up the h2o cluster on demand
startup_cmds = [
    # remove any existing tmp log dir. for h2o processes
    'rm -r /some/location/for/h2odriver.jar/output',
    # start h2o on cluster
    '/bin/hadoop jar {}h2odriver.jar -nodes 4 -mapperXmx 6g -output hdfsOutputDir'.format("/local/h2o/start/path")
]

# clear legacy temp. dir.
if os.path.isdir(/some/location/for/h2odriver.jar/output):
    print subprocess.check_output(shlex.split(startup_cmds[0]))

# start h2o service in background thread
startup_p = subprocess.Popen(shlex.split(startup_cmds[1]), 
                             shell=False, 
                             stdout=subprocess.PIPE, stderr=subprocess.PIPE)

# setup message passing queue
q = Queue()
t = Thread(target=enqueue_output, args=(startup_p.stdout, q))
t.daemon = True # thread dies with the program
t.start()

# read line without blocking
h2o_url_out = ''
while True:
    try:  line = q.get_nowait() # or q.get(timeout=.1)
    except Empty:
        continue
    else: # got line
        print line
        # check for first instance connection url output
        if re.search("Open H2O Flow in your web browser", line) is not None:
            h2o_url_out = line
            break
        if re.search('Error', line) is not None:
            print 'Error generated: %s' % line
            sys.exit()

# capture connection IP from h2o process output
print 'Connection url output line: %s' % h2o_url_out
h2o_cnxn_ip = re.search("(?<=Open H2O Flow in your web browser: http:\/\/)(.*?)(?=:)", h2o_url_out).group(1)
print 'H2O connection ip: %s' % h2o_cnxn_ip