EMR 从 Git 分支安装 Python 包
EMR install Python package from Git branch
我通常通过 Spark 的 install_pypi_package
方法在 EMR 中安装包。这限制了我可以从中安装软件包的位置。如何从特定的 GitHub 分支安装软件包?有什么方法可以通过 install_pypi_package
方法做到这一点吗?
如果您有权访问集群创建步骤,则可以使用来自 github at bootstrap 的 pip 安装程序包。 (install_pypi_package
是必需的,因为当时集群已经 运行,并且包可能无法在所有节点上解析)
安装之前的集群是运行:
bootstrap 的一个简单示例(例如 download.sh bootstrap 文件)并使用 pip 从 github 安装是
#!/bin/bash
sudo pip install <you-repo>.git
那么您可以在 bootstrap 将此 bash 用作
aws emr create-cluster --name "Test cluster" --bootstrap-actions
Path="s3://elasticmapreduce/bootstrap-actions/download.sh"
或者您可以在 bootstrap
中使用 pip3
sudo pip3 install <you-repo>.git
或者只是克隆它并使用 setup.py 文件
在 EMR 上本地构建它
#!/bin/bash
git clone <your-repo>.git
sudo python setup.py install
Cluster后运行(复杂不推荐)
如果您在集群已经 运行 时仍想安装或构建自定义包,AWS 有一些 explanation here 使用 AWS-RunShellScript
在所有核心节点上安装包。它说
(I) 将软件包安装到主节点,(通过 shell 或在其上的 jupyter notebook 在 运行 集群上进行 pip 安装)
(II) 运行 在 EMR 上本地跟随脚本,为此您传递 cluster-id
和 boostrap 脚本路径(例如上面的 download.sh
)作为参数。
import argparse
import time
import boto3
def install_libraries_on_core_nodes(
cluster_id, script_path, emr_client, ssm_client):
"""
Copies and runs a shell script on the core nodes in the cluster.
:param cluster_id: The ID of the cluster.
:param script_path: The path to the script, typically an Amazon S3 object URL.
:param emr_client: The Boto3 Amazon EMR client.
:param ssm_client: The Boto3 AWS Systems Manager client.
"""
core_nodes = emr_client.list_instances(
ClusterId=cluster_id, InstanceGroupTypes=['CORE'])['Instances']
core_instance_ids = [node['Ec2InstanceId'] for node in core_nodes]
print(f"Found core instances: {core_instance_ids}.")
commands = [
# Copy the shell script from Amazon S3 to each node instance.
f"aws s3 cp {script_path} /home/hadoop",
# Run the shell script to install libraries on each node instance.
"bash /home/hadoop/install_libraries.sh"]
for command in commands:
print(f"Sending '{command}' to core instances...")
command_id = ssm_client.send_command(
InstanceIds=core_instance_ids,
DocumentName='AWS-RunShellScript',
Parameters={"commands": [command]},
TimeoutSeconds=3600)['Command']['CommandId']
while True:
# Verify the previous step succeeded before running the next step.
cmd_result = ssm_client.list_commands(
CommandId=command_id)['Commands'][0]
if cmd_result['StatusDetails'] == 'Success':
print(f"Command succeeded.")
break
elif cmd_result['StatusDetails'] in ['Pending', 'InProgress']:
print(f"Command status is {cmd_result['StatusDetails']}, waiting...")
time.sleep(10)
else:
print(f"Command status is {cmd_result['StatusDetails']}, quitting.")
raise RuntimeError(
f"Command {command} failed to run. "
f"Details: {cmd_result['StatusDetails']}")
def main():
parser = argparse.ArgumentParser()
parser.add_argument('cluster_id', help="The ID of the cluster.")
parser.add_argument('script_path', help="The path to the script in Amazon S3.")
args = parser.parse_args()
emr_client = boto3.client('emr')
ssm_client = boto3.client('ssm')
install_libraries_on_core_nodes(
args.cluster_id, args.script_path, emr_client, ssm_client)
if __name__ == '__main__':
main()
我通常通过 Spark 的 install_pypi_package
方法在 EMR 中安装包。这限制了我可以从中安装软件包的位置。如何从特定的 GitHub 分支安装软件包?有什么方法可以通过 install_pypi_package
方法做到这一点吗?
如果您有权访问集群创建步骤,则可以使用来自 github at bootstrap 的 pip 安装程序包。 (install_pypi_package
是必需的,因为当时集群已经 运行,并且包可能无法在所有节点上解析)
安装之前的集群是运行:
bootstrap 的一个简单示例(例如 download.sh bootstrap 文件)并使用 pip 从 github 安装是
#!/bin/bash
sudo pip install <you-repo>.git
那么您可以在 bootstrap 将此 bash 用作
aws emr create-cluster --name "Test cluster" --bootstrap-actions Path="s3://elasticmapreduce/bootstrap-actions/download.sh"
或者您可以在 bootstrap
中使用 pip3sudo pip3 install <you-repo>.git
或者只是克隆它并使用 setup.py 文件
在 EMR 上本地构建它#!/bin/bash
git clone <your-repo>.git
sudo python setup.py install
Cluster后运行(复杂不推荐)
如果您在集群已经 运行 时仍想安装或构建自定义包,AWS 有一些 explanation here 使用 AWS-RunShellScript
在所有核心节点上安装包。它说
(I) 将软件包安装到主节点,(通过 shell 或在其上的 jupyter notebook 在 运行 集群上进行 pip 安装)
(II) 运行 在 EMR 上本地跟随脚本,为此您传递 cluster-id
和 boostrap 脚本路径(例如上面的 download.sh
)作为参数。
import argparse
import time
import boto3
def install_libraries_on_core_nodes(
cluster_id, script_path, emr_client, ssm_client):
"""
Copies and runs a shell script on the core nodes in the cluster.
:param cluster_id: The ID of the cluster.
:param script_path: The path to the script, typically an Amazon S3 object URL.
:param emr_client: The Boto3 Amazon EMR client.
:param ssm_client: The Boto3 AWS Systems Manager client.
"""
core_nodes = emr_client.list_instances(
ClusterId=cluster_id, InstanceGroupTypes=['CORE'])['Instances']
core_instance_ids = [node['Ec2InstanceId'] for node in core_nodes]
print(f"Found core instances: {core_instance_ids}.")
commands = [
# Copy the shell script from Amazon S3 to each node instance.
f"aws s3 cp {script_path} /home/hadoop",
# Run the shell script to install libraries on each node instance.
"bash /home/hadoop/install_libraries.sh"]
for command in commands:
print(f"Sending '{command}' to core instances...")
command_id = ssm_client.send_command(
InstanceIds=core_instance_ids,
DocumentName='AWS-RunShellScript',
Parameters={"commands": [command]},
TimeoutSeconds=3600)['Command']['CommandId']
while True:
# Verify the previous step succeeded before running the next step.
cmd_result = ssm_client.list_commands(
CommandId=command_id)['Commands'][0]
if cmd_result['StatusDetails'] == 'Success':
print(f"Command succeeded.")
break
elif cmd_result['StatusDetails'] in ['Pending', 'InProgress']:
print(f"Command status is {cmd_result['StatusDetails']}, waiting...")
time.sleep(10)
else:
print(f"Command status is {cmd_result['StatusDetails']}, quitting.")
raise RuntimeError(
f"Command {command} failed to run. "
f"Details: {cmd_result['StatusDetails']}")
def main():
parser = argparse.ArgumentParser()
parser.add_argument('cluster_id', help="The ID of the cluster.")
parser.add_argument('script_path', help="The path to the script in Amazon S3.")
args = parser.parse_args()
emr_client = boto3.client('emr')
ssm_client = boto3.client('ssm')
install_libraries_on_core_nodes(
args.cluster_id, args.script_path, emr_client, ssm_client)
if __name__ == '__main__':
main()