如何使用Ansible 2.9.2 python API执行任务?
How to use Ansible 2.9.2 python API to execute tasks?
我正在使用 Ansbile 2.9.2 python API 和 python 3.7.2 从主机文件中的服务器获取 crontab 信息。
我查看了 Ansible API document,但示例代码仅适用于 'localhost'。
这是我的代码api.py
# !/usr/bin/python3
# -*- coding:utf-8 -*-
import django
import environ
import os
import re
import sys
from ansible import context
from ansible.executor.task_queue_manager import TaskQueueManager
from ansible.inventory.manager import InventoryManager
from ansible.module_utils.common.collections import ImmutableDict
from ansible.parsing.dataloader import DataLoader
from ansible.playbook.play import Play
from ansible.plugins.callback import CallbackBase
from ansible.vars.manager import VariableManager
from configurations import importer
project_path = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
sys.path.append(project_path) # 将项目路径添加到系统搜寻路径当中
env = environ.Env()
read_env_file = env.bool("DJANGO_READ_DOT_ENV_FILE", default=True)
if read_env_file: # OS environment variables take precedence over variables from .env
env.read_env(os.path.join(project_path, ".env"))
os.environ.setdefault("DJANGO_SETTINGS_MODULE", env("DJANGO_SETTINGS_MODULE"))
os.environ.setdefault("DJANGO_CONFIGURATION", env("DJANGO_CONFIGURATION"))
importer.install()
django.setup()
from django.conf import settings
from godie.imt.models import IMTInfo
from godie.utils.common import DEP_ENV, app_path
class ResultCallback(CallbackBase):
"""A sample callback plugin used for performing an action as results come in
If you want to collect all results into a single object for processing at
the end of the execution, look into utilizing the ``json`` callback plugin
or writing your own custom callback plugin
"""
def v2_runner_on_ok(self, result, **kwargs):
"""Print a json representation of the result
This method could store the result in an instance attribute for retrieval later
"""
print(result)
imt_id = int(variable_manager.get_vars(host=result._host).get("imt_id"))
main_ip, dep_env = str(result._host), None
# 获取字典格式的输出结果
stdout, stderr = result._result.get("stdout_lines", ""), result._result.get("stderr_lines", "")
text = stdout if stdout else stderr # Tasks中的一个dict(即一条Ansible命令),不会同时输出stout和stderr
print(text)
try:
imt = IMTInfo.objects # 定义IMTInfo对象管理器
if re.search("cd /home/imt", str(text)):
print(main_ip + "Crontab:\n" + str(text))
imt.update_or_create(id=imt_id, defaults={'crontab': str(text), 'status': '2'})
if re.search("dep.admin.environmentID=", str(text)): # True表示获取DEP-IMT连接信息成功
dep_env = re.compile(r"=").split(text[0])[1]
print(main_ip + " DEP环境: " + dep_env)
imt.update_or_create(id=imt_id, defaults={'dep_env': DEP_ENV.get(dep_env, '0'), 'status': '3'})
except Exception as e:
print(e)
settings.DB_LOG.exception(f"imtinfo.py- IMTInfo表更新失败,main_ip:{main_ip} 报错内容:\n{e}")
# since the API is constructed for CLI it expects certain options to always be set in the context object
context.CLIARGS = ImmutableDict(connection='smart', module_path=None, forks=4, become=None, become_method=None, become_user=None,
check=False, diff=False)
# 用来加载解析yaml文件或JSON内容,并且支持vault的解密
loader = DataLoader() # Takes care of finding and reading yaml, json and ini files
passwords = dict(vault_pass=None) # become_user的密码
# Instantiate our ResultCallback for handling results as they come in. Ansible expects this to be one of its main display outlets
results_callback = ResultCallback()
# create inventory, use path to host config file as source or hosts in a comma separated string
inventory = InventoryManager(loader=loader, sources=f"{app_path}/ansible/hosts") # 主机清单的路径
# variable manager takes care of merging all the different sources to give you a unified view of variables available in each context
variable_manager = VariableManager(loader=loader, inventory=inventory)
# create data structure that represents our play, including tasks, this is basically what our YAML loader does internally.
play_source = dict(
name="Get IMT information",
hosts="all", # 传入hosts分组或main_ip
gather_facts="no",
tasks=[
dict(action=dict(module='shell', args='ls'), register='shell_out'),
# {{ file }}是在Inventory中定义好的变量
# dict(name='crontab', action=dict(module='shell', args='crontab -l'), register='shell_out'), # 获取IMT的Crontab信息
# dict(name='dep_env', action=dict(module='shell', args='cat {{ file }} | grep ^dep.admin.environmentID'), register='shell_out'),
dict(action=dict(module='debug', args=dict(msg='{{shell_out.stdout}}'))) # Print statements during execution
]
)
# Create play object, playbook objects use .load instead of init or new methods,
# this will also automatically create the task objects from the info provided in play_source
play = Play().load(play_source, variable_manager=variable_manager, loader=loader)
# Run it - instantiate task queue manager, which takes care of forking and setting up all objects to iterate over host list and tasks
tqm = None
try:
tqm = TaskQueueManager(
inventory=inventory,
variable_manager=variable_manager,
loader=loader,
passwords=passwords,
stdout_callback=results_callback, # Use our custom callback instead of the ``default`` callback plugin, which prints to stdout
)
result = tqm.run(play) # most interesting data for a play is actually sent to the callback's methods
finally:
# we always need to cleanup child procs and the structures we use to communicate with them
if tqm is not None:
tqm.cleanup()
和 hosts 文件
[PTPS]
172.17.193.162 ansible_user=imt ansible_ssh_pass=*****
当 python3 api.py
执行时,什么都没发生,没有打印信息。
有谁知道我错在哪里?任何指导将不胜感激。谢谢!
您可能应该为此使用 Ansible Runner,您正在连接的 Ansible Python API 没有向后兼容性的承诺。
遇到了类似的问题。要查看错误,请尝试删除:
stdout_callback=results_callback
同时添加到 hosts 文件:
ansible_connection=ssh
我正在使用 Ansbile 2.9.2 python API 和 python 3.7.2 从主机文件中的服务器获取 crontab 信息。
我查看了 Ansible API document,但示例代码仅适用于 'localhost'。
这是我的代码api.py
# !/usr/bin/python3
# -*- coding:utf-8 -*-
import django
import environ
import os
import re
import sys
from ansible import context
from ansible.executor.task_queue_manager import TaskQueueManager
from ansible.inventory.manager import InventoryManager
from ansible.module_utils.common.collections import ImmutableDict
from ansible.parsing.dataloader import DataLoader
from ansible.playbook.play import Play
from ansible.plugins.callback import CallbackBase
from ansible.vars.manager import VariableManager
from configurations import importer
project_path = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
sys.path.append(project_path) # 将项目路径添加到系统搜寻路径当中
env = environ.Env()
read_env_file = env.bool("DJANGO_READ_DOT_ENV_FILE", default=True)
if read_env_file: # OS environment variables take precedence over variables from .env
env.read_env(os.path.join(project_path, ".env"))
os.environ.setdefault("DJANGO_SETTINGS_MODULE", env("DJANGO_SETTINGS_MODULE"))
os.environ.setdefault("DJANGO_CONFIGURATION", env("DJANGO_CONFIGURATION"))
importer.install()
django.setup()
from django.conf import settings
from godie.imt.models import IMTInfo
from godie.utils.common import DEP_ENV, app_path
class ResultCallback(CallbackBase):
"""A sample callback plugin used for performing an action as results come in
If you want to collect all results into a single object for processing at
the end of the execution, look into utilizing the ``json`` callback plugin
or writing your own custom callback plugin
"""
def v2_runner_on_ok(self, result, **kwargs):
"""Print a json representation of the result
This method could store the result in an instance attribute for retrieval later
"""
print(result)
imt_id = int(variable_manager.get_vars(host=result._host).get("imt_id"))
main_ip, dep_env = str(result._host), None
# 获取字典格式的输出结果
stdout, stderr = result._result.get("stdout_lines", ""), result._result.get("stderr_lines", "")
text = stdout if stdout else stderr # Tasks中的一个dict(即一条Ansible命令),不会同时输出stout和stderr
print(text)
try:
imt = IMTInfo.objects # 定义IMTInfo对象管理器
if re.search("cd /home/imt", str(text)):
print(main_ip + "Crontab:\n" + str(text))
imt.update_or_create(id=imt_id, defaults={'crontab': str(text), 'status': '2'})
if re.search("dep.admin.environmentID=", str(text)): # True表示获取DEP-IMT连接信息成功
dep_env = re.compile(r"=").split(text[0])[1]
print(main_ip + " DEP环境: " + dep_env)
imt.update_or_create(id=imt_id, defaults={'dep_env': DEP_ENV.get(dep_env, '0'), 'status': '3'})
except Exception as e:
print(e)
settings.DB_LOG.exception(f"imtinfo.py- IMTInfo表更新失败,main_ip:{main_ip} 报错内容:\n{e}")
# since the API is constructed for CLI it expects certain options to always be set in the context object
context.CLIARGS = ImmutableDict(connection='smart', module_path=None, forks=4, become=None, become_method=None, become_user=None,
check=False, diff=False)
# 用来加载解析yaml文件或JSON内容,并且支持vault的解密
loader = DataLoader() # Takes care of finding and reading yaml, json and ini files
passwords = dict(vault_pass=None) # become_user的密码
# Instantiate our ResultCallback for handling results as they come in. Ansible expects this to be one of its main display outlets
results_callback = ResultCallback()
# create inventory, use path to host config file as source or hosts in a comma separated string
inventory = InventoryManager(loader=loader, sources=f"{app_path}/ansible/hosts") # 主机清单的路径
# variable manager takes care of merging all the different sources to give you a unified view of variables available in each context
variable_manager = VariableManager(loader=loader, inventory=inventory)
# create data structure that represents our play, including tasks, this is basically what our YAML loader does internally.
play_source = dict(
name="Get IMT information",
hosts="all", # 传入hosts分组或main_ip
gather_facts="no",
tasks=[
dict(action=dict(module='shell', args='ls'), register='shell_out'),
# {{ file }}是在Inventory中定义好的变量
# dict(name='crontab', action=dict(module='shell', args='crontab -l'), register='shell_out'), # 获取IMT的Crontab信息
# dict(name='dep_env', action=dict(module='shell', args='cat {{ file }} | grep ^dep.admin.environmentID'), register='shell_out'),
dict(action=dict(module='debug', args=dict(msg='{{shell_out.stdout}}'))) # Print statements during execution
]
)
# Create play object, playbook objects use .load instead of init or new methods,
# this will also automatically create the task objects from the info provided in play_source
play = Play().load(play_source, variable_manager=variable_manager, loader=loader)
# Run it - instantiate task queue manager, which takes care of forking and setting up all objects to iterate over host list and tasks
tqm = None
try:
tqm = TaskQueueManager(
inventory=inventory,
variable_manager=variable_manager,
loader=loader,
passwords=passwords,
stdout_callback=results_callback, # Use our custom callback instead of the ``default`` callback plugin, which prints to stdout
)
result = tqm.run(play) # most interesting data for a play is actually sent to the callback's methods
finally:
# we always need to cleanup child procs and the structures we use to communicate with them
if tqm is not None:
tqm.cleanup()
和 hosts 文件
[PTPS]
172.17.193.162 ansible_user=imt ansible_ssh_pass=*****
当 python3 api.py
执行时,什么都没发生,没有打印信息。
有谁知道我错在哪里?任何指导将不胜感激。谢谢!
您可能应该为此使用 Ansible Runner,您正在连接的 Ansible Python API 没有向后兼容性的承诺。
遇到了类似的问题。要查看错误,请尝试删除:
stdout_callback=results_callback
同时添加到 hosts 文件:
ansible_connection=ssh