基于初始连接将流量定向到 Kubernetes Pod
Direct Traffic to a Kubernetes Pod Based on Initial Connection
我想将流量从 Kubernetes 中的负载均衡器定向到部署。但是,我不想尝试在部署的所有 pods 中实现统一负载,而是希望每个连接都连接到并保持与特定 pod 的连接。我会将 GRPC 请求发送到 pod 上的有状态实例,并且客户端的 GRPC 请求不会发送到其他 pods.
至关重要
我当前的实现可能不必要地复杂。这是伪代码:
- 使用自定义 python 调度程序初始化集群。
- 创建了几个 pods 有状态应用程序,每个都有一个节点端口服务和唯一的节点端口。
- 客户端使用套接字接口与 python 调度程序通信并分配一个端口。
- 客户端使用分配的节点端口与 pod 对话。
- 客户端(或调度程序)终止 pod。
我受到端口数量的限制,由于节点端口限制,我无法使用 AKS 引导流量。此外,虽然调度程序的优点是客户端可以请求 pods 不同的资源,但它的测试和维护太多了。
是否有更好的解决方案将外部流量定向到单个有状态 pods?
默认的 iptables 服务代理实现使用非常简单的随机循环算法来选择要使用的 pod。如果您改用 IPVS 实现,那确实会提供更多选项,尽管这不太可能成为像 AKS 这样的托管提供商的选项。这样一来,您就只能使用支持 gRPC 的用户空间代理,例如 Traefik 或 Istio Ingress。选择一个超出了 SO 的范围,但这些代理中的大多数都支持某种形式的某种连接粘性。
这可能被认为是一种反模式,但这最终成为了我们需要让我们的集群起步的奥卡姆剃刀。
python 调度程序已大大简化,每当客户端请求我们的应用程序实例时都会创建一个新的 pod。调度程序一直等到应用程序 运行(通过查询 pod),然后将 pod 本身的 ip 地址提供给客户端。由于我们的应用程序正在由 Azure 上同一网络上的 jupyterhub 集群访问,因此我们可以看到 pod,因为我们使用的是 Azure CNI 网络。
不,我们并没有使用负载均衡器,但是这个解决方案允许我们根据用户的请求生成不同大小(vcpu 和 ram)的 pods。此外,如果无法请求 pod,Python 调度程序可以向用户发送一条有用的消息(例如可用不足 RAM/CPU)。
虽然这没有利用 kubernetes 负载平衡,但我们不需要负载平衡,我们需要具有可变资源分配的 n-scalable 隔离容器。 Kubernetes 非常擅长生成 pods,并且使用自定义 python 调度程序,很容易获取那些 pods 的 IP 地址并在必要时终止它们(通过在调度程序中生成一个线程)每当创建 pod 时)。
我仍然认为 coderanger 的答案最多 "kubernetes",但对于那些不想研究其他 kubernetes 框架的人来说,也许这个解决方案也是一个选择。
如果有人感兴趣,这里是调度程序的消毒版本:
#!/usr/bin/python3.7
"""
"""
import signal
import socket
import urllib3
from threading import Timer
import tarfile
from tempfile import TemporaryFile
import logging
import re
import yaml
import time
import threading
import os
import random
import string
from threading import Thread
from urllib3.exceptions import ProtocolError
import argparse
from kubernetes import client, config, watch, utils
from kubernetes.client.rest import ApiException
from kubernetes.stream import stream
from kubernetes import watch
NAME_MASKED_PORT_NAME = 'name_masked-port'
NAMESPACE = 'name_masked'
NAME_MASKED_GRPC_PORT = 49999
SERVER_PORT = 29999
DEFAULT_TIMEOUT = 3600 # kill jobs that run longer than this
DEFAULT_KEEP_RESERVE = False
STORAGE_SECRET_NAME = 'storage-secret'
# in NCPU
DEFAULT_CPU_REQUEST = 0.5
DEFAULT_CPU_LIMIT = 1
# in GB
DEFAULT_RAM_REQUEST = 1
DEFAULT_RAM_LIMIT = 2
# InsecureRequestWarning
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
LOG = logging.getLogger(__name__)
LOG.setLevel('DEBUG')
def threaded(fn):
""" calls a function using a thread """
def wrapper(*args, **kwargs):
thread = Thread(target=fn, args=args, kwargs=kwargs)
thread.daemon = kwargs.pop('daemon', True)
thread.start()
return thread
return wrapper
def decode_request(request):
# expect request to be formatted as
# '{n_cpu_request}, {ram_request}, {n_cpu_limit}, {ram_limit}, {command}, {instance_timeout} {assign_port}'
request.decode()
LOG.debug('Received client request %s', request)
n_cpu_request, ram_request, n_cpu_limit, ram_limit, command, pod_timeout, _ = eval(request)
# command may be formated as a "None" string
if command == "None":
command = None
return n_cpu_request, ram_request, n_cpu_limit, ram_limit, command, pod_timeout
def random_string(stringLength=10):
"""Generate a random string of fixed length """
letters = string.ascii_lowercase
return ''.join(random.choice(letters) for i in range(stringLength))
NAME_MASKED_IMAGE = 'name_maskedhelm.azurecr.io/name_masked_lite:v0.1'
IMAGE_SECRET_NAME = 'containersecret'
# using MPI
LAUNCH_NAME_MASKED = '/company_inc/bin'
NFS_NAME_MASKED_VOLUME = {'name': 'nfs-name_masked-volume',
'nfs': {'server': '10.0.0.12', # gobetween
'path': '/mnt/company_inc',
'readOnly': True}}
NAME_MASKED_NFS_CONTAINER = {
'name': 'name_masked-ctr',
'image': NAME_MASKED_IMAGE,
'command': ['/bin/sh',
'-ec',
LAUNCH_NAME_MASKED],
'volumeMounts': [{'name': 'nfs-name_masked-volume', 'mountPath': '/company_inc'}],
'resources': {'requests': {'cpu': '500m', 'memory': '512Mi'},
'limits': {'cpu': '1000m', 'memory': '1024Mi'}}}
SOMENAME_JOB_NFS = {
'apiVersion': 'batch/v1',
'kind': 'Job',
'metadata': {'name': 'name_masked-UNNAMED',
'namespace': NAMESPACE},
'spec': {'backoffLimit': 1,
'template': {'spec': {'restartPolicy': 'Never',
'backoffLimit': 1,
'containers': [NAME_MASKED_NFS_CONTAINER],
'imagePullSecrets': [{'name': IMAGE_SECRET_NAME}],
'volumes': [NFS_NAME_MASKED_VOLUME]}}}
}
BASE_SOMENAME_POD = {
'apiVersion': 'v1',
'kind': 'Pod',
'metadata': {'name': 'name_masked',
'namespace': NAMESPACE},
'spec': {'restartPolicy': 'Never',
'backoffLimit': 1,
'containers': [NAME_MASKED_NFS_CONTAINER],
'imagePullSecrets': [{'name': IMAGE_SECRET_NAME}],
'volumes': [NFS_NAME_MASKED_VOLUME]}}
IMAGE_SECRET = {'apiVersion': 'v1',
'type': 'kubernetes.io/dockerconfigjson',
'kind': 'Secret',
'metadata' : {'name': IMAGE_SECRET_NAME,
'namespace': NAMESPACE},
'data': {'.dockerconfigjson': 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXx'}
}
# above secret is created with:
# kubectl create secret docker-registry containersecret --docker-server=name_maskedhelm.azurecr.io --docker-username=name_maskedhelm --docker-password=IZUV0FI/XXXzove9KLa7FOvikO6eKFLt --docker-email=name@company.com --namespace name_masked
def open_logger(loglevel='DEBUG'):
""" Opens a log """
# don't add another handler if log has already been initialized
if hasattr(open_logger, 'log'):
open_logger.log.handlers[0].setLevel(loglevel.upper())
else:
log = logging.getLogger()
ch = logging.StreamHandler()
ch.setLevel(loglevel.upper())
formatstr = '%(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s'
ch.setFormatter(logging.Formatter(formatstr))
log.addHandler(ch)
open_logger.log = log
return open_logger.log
class PodScheduler():
"""Pod scheduler"""
def __init__(self, log_level='DEBUG',
reserve_target=4,
clear_on_init=False,
server_port=SERVER_PORT,
block=True,
instance_timeout=DEFAULT_TIMEOUT,
image=None,
default_cpu_request=DEFAULT_CPU_REQUEST,
default_cpu_limit=DEFAULT_CPU_LIMIT,
default_ram_request=DEFAULT_RAM_REQUEST,
default_ram_limit=DEFAULT_RAM_LIMIT,
keep_reserve=DEFAULT_KEEP_RESERVE,
clear_on_exit=True):
"""Initialize job controller"""
open_logger(log_level)
self.deployment_name = None
self._reserve_target = reserve_target
self.enable_watch = True
self._active = True
self._events = []
self._server_port = server_port
self._server = None
self._clear_on_exit = clear_on_exit
# self._instance_state_locked = True
self._assigned = {}
self._pods = None
self._terminate = False
self._image = image
self._purge_dangling_resources = False
# self._job_count = 0
# self._lock_count = False
self._requested_pods = []
self._timeouts = {}
self._name_lock = False
self._default_cpu_request = default_cpu_request
self._default_cpu_limit = default_cpu_limit
self._default_ram_request = default_ram_request
self._default_ram_limit = default_ram_limit
LOG.info('Default culling SOMENAME instances at %d seconds', instance_timeout)
self._instance_timeout = instance_timeout
# load credentials based on if running within a kubernetes pod
if 'KUBERNETES_SERVICE_HOST' in os.environ:
config.load_incluster_config()
else:
# config file must contain the correct IP address, cert, and admin password
# copy from microk8s server with:
# microk8s.kubectl config view --raw > $HOME/.kube/config
path = os.path.dirname(os.path.realpath(__file__))
config.load_kube_config(os.path.join(path, 'config'))
configuration = client.Configuration()
configuration.verify_ssl = False # necessary for microk8s
client.Configuration.set_default(configuration)
# connect to kubernetes api
self.core_api = client.CoreV1Api()
# create name_masked namespace if it doesn't exist
self.create_namespace(NAMESPACE)
# add secrets
self._add_image_secret()
if clear_on_init:
LOG.info('Clearing on initialization')
self.clear()
# start port assignment thread
self.start_port_server()
if block:
self.block_execution()
def _add_image_secret(self):
"""Initialize Azure container resource secret"""
if IMAGE_SECRET_NAME not in self.secret_names:
self.core_api.create_namespaced_secret(NAMESPACE, IMAGE_SECRET)
LOG.info(f'Added image secret "{IMAGE_SECRET_NAME}"')
else:
LOG.info(f'Secret "{IMAGE_SECRET_NAME}" already exists')
def _remove_image_secret(self):
"""Remove Azure container resource secret"""
if IMAGE_SECRET_NAME in self.secret_names:
response = self.core_api.delete_namespaced_secret(IMAGE_SECRET_NAME,
NAMESPACE)
LOG.info(f'Removed image secret "{IMAGE_SECRET_NAME}"')
@property
def secret_names(self):
secret_names = []
for secret in self.secrets:
secret_names.append(secret.metadata.name)
return secret_names
@property
def secrets(self):
"""NAME_MASKED namespace secrets"""
return self.core_api.list_namespaced_secret(NAMESPACE).items
def clear(self):
"""Delete all services, jobs, and pods in the 'name_masked' namespace"""
# self.delete_all_services()
# self.delete_all_jobs()
self.delete_all_pods()
# self._instances = {}
@property
def pods(self):
"""Name_Masked pods"""
self._pods = self.core_api.list_namespaced_pod(NAMESPACE).items
return self._pods
@property
def active_pods(self):
"""running pods"""
active_pods = []
for pod in self.pods:
status = pod.status
metadata = pod.metadata
if status.container_statuses:
container_status = status.container_statuses[0]
if hasattr(container_status, 'ready'):
if container_status.ready:
active_pods.append(metadata.name)
return active_pods
def delete_all_pods(self):
"""Delete all jobs"""
for pod in self.pods:
self.delete_pod(pod.metadata.name)
def delete_pod(self, pod_name):
"""Delete a NAME_MASKED namespaced pod"""
try:
self.core_api.delete_namespaced_pod(pod_name, NAMESPACE)
LOG.info(f'Deleted pod {pod_name}')
except ApiException:
LOG.error(f'Unable to delete pod {pod_name}')
def _wait_for_pod(self, pod_name, timeout=20):
"""Returns pod yaml when pod is ready. Otherwise, raises an exception"""
tstart = time.time()
while (time.time() - tstart) < timeout:
time.sleep(0.5)
pod = self.core_api.read_namespaced_pod(pod_name, NAMESPACE)
status = pod.status
metadata = pod.metadata
if status.conditions:
if status.conditions[0].reason == 'Unschedulable':
reason = status.conditions[0].message
raise RuntimeError(f'Unable to create pod: {reason}')
if status.container_statuses:
container_status = status.container_statuses[0]
if hasattr(container_status, 'ready'):
if container_status.ready:
return pod
if hasattr(container_status, 'state'):
if container_status.state.terminated:
if hasattr(container_status.state.terminated, 'exit_code'):
if container_status.state.terminated.exit_code:
reason = container_status.state.terminated.reason
raise RuntimeError(f'Unable to create pod: {reason}')
if hasattr(container_status.state, 'waiting'):
if container_status.state.waiting is not None:
reason = container_status.state.waiting.message
if reason is not None:
raise RuntimeError(f'Unable to create pod: {reason}')
raise RuntimeError(f'TIMEOUT at {timeout} seconds:\nUnable to create pod: Logs:\n {pod}')
def _build_pod_yaml(self, cpu_request=None, ram_request=None,
cpu_limit=None, ram_limit=None, custom_command=None):
"""Build a job body"""
# copy job yaml and modify it
pod_name = self._assign_pod_name()
pod_yaml = dict(BASE_SOMENAME_POD)
pod_yaml['metadata']['name'] = pod_name
if cpu_request is None:
cpu_request = self._default_cpu_request
if cpu_limit is None:
cpu_limit = self._default_cpu_limit
if ram_request is None:
ram_request = self._default_ram_request
if ram_limit is None:
ram_limit = self._default_ram_limit
container = pod_yaml['spec']['containers'][0]
limits = container['resources']['limits']
requests = container['resources']['requests']
limits['cpu'] = '%.2f' % float(cpu_limit)
limits['memory'] = '%.2fGi' % float(ram_limit)
requests['cpu'] = '%.2f' % float(cpu_request)
requests['memory'] = '%.2fGi' % float(ram_request)
LOG.info(f'Pod "{pod_name}" requested with:')
LOG.info(f'Requests: {requests}')
LOG.info(f'Limits: {limits}')
# configure name_masked number of CPUs
if custom_command is not None:
command = custom_command
else:
command = LAUNCH_NAME_MASKED.replace('-grpc', '-np %d -grpc' % int(cpu_limit))
LOG.info(f'Launching NAME_MASKED with "{command}"')
container['command'][-1] = command
return pod_yaml
def _spawn_pod(self, pod_yaml):
"""Initialize pod and return the pod name"""
# multiple attepts to create job due to potential naming conflicts
resp = None
pod_name = pod_yaml['metadata']['name']
while resp is None:
try:
resp = self.core_api.create_namespaced_pod(NAMESPACE, pod_yaml)
except Exception as exception:
# sometimes a there's a conflict with the job name
# due to multiple simultaneous requests
if 'AlreadyExists' in str(exception):
pod_name = self._assign_pod_name()
pod_yaml['metadata']['name'] = pod_name
else:
raise exception
# might have changed
return pod_name
def create_name_masked_pod(self, cpu_request=None, ram_request=None,
cpu_limit=None, ram_limit=None, timeout=120,
custom_command=None, pod_timeout=None):
"""Create a name_masked pod"""
pod_yaml = self._build_pod_yaml(cpu_request, ram_request,
cpu_limit, ram_limit, custom_command)
pod_name = self._spawn_pod(pod_yaml)
# set pod timeout countdown thread
self._kill_pod_timeout(pod_name, pod_timeout)
# wait until a pod is created
try:
pod = self._wait_for_pod(pod_name)
except Exception as e:
time.sleep(1)
self.delete_pod(pod_name)
raise Exception(e)
return pod
def _assign_pod_name(self):
"""Generates a unique pod name"""
while self._name_lock:
time.sleep(0.001)
self._name_lock = True
pod_name = 'name_masked-%s' % random_string(10)
while pod_name in self._requested_pods:
pod_name = 'name_masked-%s' % random_string(10)
self._requested_pods.append(pod_name)
self._name_lock = False
return pod_name
@threaded
def _kill_pod_timeout(self, pod_name, pod_timeout=None):
"""Kill a pod once a timeout has been exceeded"""
if pod_timeout is None:
pod_timeout = self._instance_timeout
elif pod_timeout > 86400: # ensure pod timeouts cannot exceed 1 day
pod_timeout = 86400
elif pod_timeout < 20: # minimum 20 seconds
pod_timeout = 20
LOG.info(f'Configured timeout for {pod_name} to {pod_timeout} seconds')
time.sleep(pod_timeout)
LOG.info(f'Triggered timeout for {pod_name} exceeded instance timeout ' +
f'of {pod_timeout} seconds')
self.delete_pod(pod_name)
def create_namespace(self, namespace):
"""Creates a namespace"""
namespace_yaml = {
"apiVersion": "v1",
"kind": "Namespace",
"metadata": {
"name": namespace,
"labels": {
"name": namespace
}
}
}
namespace_exists = False
for _namespace in self.namespaces:
if namespace == _namespace.metadata.name:
namespace_exists = True
break
if not namespace_exists:
response = self.core_api.create_namespace(namespace_yaml)
LOG.debug(f'Created namespace "{namespace}"')
else:
LOG.debug(f'Namespace "{namespace}" exists')
@property
def namespaces(self):
"""cluster namespaces"""
return self.core_api.list_namespace().items
@threaded
def _assign_client(self, client):
"""Receives a request from a client and returns an ip address
"""
while True:
# data received from client
request = client.recv(1024)
if not request:
break
cpu_request, ram_request, cpu_limit, ram_limit, command, pod_timeout = decode_request(request)
try:
# assign a port
pod = self.create_name_masked_pod(cpu_request=cpu_request,
ram_request=ram_request,
cpu_limit=cpu_limit,
ram_limit=ram_limit,
custom_command=command,
pod_timeout=pod_timeout)
# ip is the pod ip
ip = pod.status.pod_ip
port = 49999
message = f'{ip}:{port}'
client.send(message.encode())
except Exception as e:
message = 'EXCEPTION: %s' % str(e)
client.send(message.encode())
client.close()
@threaded
def start_port_server(self):
"""accepts incoming connections on _server_port and returns a port"""
self._server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._server.bind(("", self._server_port))
# put the socket into listening mode
self._server.listen(5)
LOG.info(f'Listening on port {self._server_port}')
# listen indefinitely for new clients
while True:
# establish connection with client
client, addr = self._server.accept()
LOG.info('Opening connection from %s:%s', str(addr[0]), str(addr[1]))
self._assign_client(client)
def block_execution(self):
"""Main thread to stop python from exiting.
This instance can be exited gracefully in two ways:
- With Ctrl-c
- With SIGTERM
"""
def received_sigterm(*args):
"""Handles SIGTERM"""
LOG.info('Received SIGTERM')
self._terminate = True
# listen for sigterm
LOG.info('PID: %d' % os.getpid())
signal.signal(signal.SIGTERM, received_sigterm)
LOG.debug('Blocking execution. Press Ctrl-c to open breakpoint')
# user_break = False
while not self._terminate:
try:
time.sleep(0.1)
except KeyboardInterrupt:
import pdb; pdb.set_trace()
# user_break = True
resp = input('Break? [Y/n]').lower()
if resp == '' or resp == 'y':
break
LOG.info('Deleting all NAME_MASKED jobs and pods due to clear_on_exit=True')
if self._clear_on_exit:
self.clear()
self._close_server()
def __del__(self):
self._close_server()
def exit(self, clear_jobs=True):
"""Clear jobs and close server"""
if clear_jobs:
self.clear()
self._close_server()
def _close_server(self):
"""Close connection to server"""
if self._server is not None:
self._server.close()
LOG.info(f'Closed server on port {self._server_port}')
if __name__ == '__main__':
""" Supports calling gui directly with command line arguments """
parser = argparse.ArgumentParser(description='name_masked job controller')
# parser.add_argument('-loglevel', metavar='DEBUG', type=str,
# required=False, default='DEBUG',
# help='Log level to use (DEBUG, INFO, WARNING, ERROR)')
parser.add_argument('-timeout', type=str, metavar='',
required=False,
help='timeout in seconds for interactive name_masked sessions')
parser.add_argument('-default_cpu_request', type=str, metavar='',
required=False,
help='Number of cpus to request')
parser.add_argument('-default_cpu_limit', type=str, metavar='',
required=False,
help='Number of cpus to limit to')
parser.add_argument('-default_ram_request', type=str, metavar='',
required=False,
help='RAM request in GB')
parser.add_argument('-default_ram_limit', type=str, metavar='',
required=False,
help='RAM limit in GB')
parser.add_argument('--keep_reserve',
help='Keep a minimum number of name_masked instances on reserve',
action="store_true")
args = parser.parse_args()
script = None
if args.default_cpu_request:
default_cpu_request = args.default_cpu_request
else:
default_cpu_request = DEFAULT_CPU_REQUEST
if args.default_cpu_limit:
default_cpu_limit = args.default_cpu_limit
else:
default_cpu_limit = DEFAULT_CPU_LIMIT
if args.default_ram_request:
default_ram_request = args.default_ram_request
else:
default_ram_request = DEFAULT_RAM_REQUEST
if args.default_ram_limit:
default_ram_limit = args.default_ram_limit
else:
default_ram_limit = DEFAULT_RAM_LIMIT
if args.keep_reserve:
keep_reserve = args.keep_reserve
else:
keep_reserve = DEFAULT_KEEP_RESERVE
# default timeout of one hour
if args.timeout:
timeout = int(args.timeout)
else:
timeout = DEFAULT_TIMEOUT
PodScheduler(clear_on_init=False,
log_level='INFO',
instance_timeout=timeout,
keep_reserve=keep_reserve,
default_cpu_request=default_cpu_request,
default_cpu_limit=default_cpu_limit,
default_ram_request=default_ram_request,
default_ram_limit=default_ram_limit)
我想将流量从 Kubernetes 中的负载均衡器定向到部署。但是,我不想尝试在部署的所有 pods 中实现统一负载,而是希望每个连接都连接到并保持与特定 pod 的连接。我会将 GRPC 请求发送到 pod 上的有状态实例,并且客户端的 GRPC 请求不会发送到其他 pods.
至关重要我当前的实现可能不必要地复杂。这是伪代码:
- 使用自定义 python 调度程序初始化集群。
- 创建了几个 pods 有状态应用程序,每个都有一个节点端口服务和唯一的节点端口。
- 客户端使用套接字接口与 python 调度程序通信并分配一个端口。
- 客户端使用分配的节点端口与 pod 对话。
- 客户端(或调度程序)终止 pod。
我受到端口数量的限制,由于节点端口限制,我无法使用 AKS 引导流量。此外,虽然调度程序的优点是客户端可以请求 pods 不同的资源,但它的测试和维护太多了。
是否有更好的解决方案将外部流量定向到单个有状态 pods?
默认的 iptables 服务代理实现使用非常简单的随机循环算法来选择要使用的 pod。如果您改用 IPVS 实现,那确实会提供更多选项,尽管这不太可能成为像 AKS 这样的托管提供商的选项。这样一来,您就只能使用支持 gRPC 的用户空间代理,例如 Traefik 或 Istio Ingress。选择一个超出了 SO 的范围,但这些代理中的大多数都支持某种形式的某种连接粘性。
这可能被认为是一种反模式,但这最终成为了我们需要让我们的集群起步的奥卡姆剃刀。
python 调度程序已大大简化,每当客户端请求我们的应用程序实例时都会创建一个新的 pod。调度程序一直等到应用程序 运行(通过查询 pod),然后将 pod 本身的 ip 地址提供给客户端。由于我们的应用程序正在由 Azure 上同一网络上的 jupyterhub 集群访问,因此我们可以看到 pod,因为我们使用的是 Azure CNI 网络。
不,我们并没有使用负载均衡器,但是这个解决方案允许我们根据用户的请求生成不同大小(vcpu 和 ram)的 pods。此外,如果无法请求 pod,Python 调度程序可以向用户发送一条有用的消息(例如可用不足 RAM/CPU)。
虽然这没有利用 kubernetes 负载平衡,但我们不需要负载平衡,我们需要具有可变资源分配的 n-scalable 隔离容器。 Kubernetes 非常擅长生成 pods,并且使用自定义 python 调度程序,很容易获取那些 pods 的 IP 地址并在必要时终止它们(通过在调度程序中生成一个线程)每当创建 pod 时)。
我仍然认为 coderanger 的答案最多 "kubernetes",但对于那些不想研究其他 kubernetes 框架的人来说,也许这个解决方案也是一个选择。
如果有人感兴趣,这里是调度程序的消毒版本:
#!/usr/bin/python3.7
"""
"""
import signal
import socket
import urllib3
from threading import Timer
import tarfile
from tempfile import TemporaryFile
import logging
import re
import yaml
import time
import threading
import os
import random
import string
from threading import Thread
from urllib3.exceptions import ProtocolError
import argparse
from kubernetes import client, config, watch, utils
from kubernetes.client.rest import ApiException
from kubernetes.stream import stream
from kubernetes import watch
NAME_MASKED_PORT_NAME = 'name_masked-port'
NAMESPACE = 'name_masked'
NAME_MASKED_GRPC_PORT = 49999
SERVER_PORT = 29999
DEFAULT_TIMEOUT = 3600 # kill jobs that run longer than this
DEFAULT_KEEP_RESERVE = False
STORAGE_SECRET_NAME = 'storage-secret'
# in NCPU
DEFAULT_CPU_REQUEST = 0.5
DEFAULT_CPU_LIMIT = 1
# in GB
DEFAULT_RAM_REQUEST = 1
DEFAULT_RAM_LIMIT = 2
# InsecureRequestWarning
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
LOG = logging.getLogger(__name__)
LOG.setLevel('DEBUG')
def threaded(fn):
""" calls a function using a thread """
def wrapper(*args, **kwargs):
thread = Thread(target=fn, args=args, kwargs=kwargs)
thread.daemon = kwargs.pop('daemon', True)
thread.start()
return thread
return wrapper
def decode_request(request):
# expect request to be formatted as
# '{n_cpu_request}, {ram_request}, {n_cpu_limit}, {ram_limit}, {command}, {instance_timeout} {assign_port}'
request.decode()
LOG.debug('Received client request %s', request)
n_cpu_request, ram_request, n_cpu_limit, ram_limit, command, pod_timeout, _ = eval(request)
# command may be formated as a "None" string
if command == "None":
command = None
return n_cpu_request, ram_request, n_cpu_limit, ram_limit, command, pod_timeout
def random_string(stringLength=10):
"""Generate a random string of fixed length """
letters = string.ascii_lowercase
return ''.join(random.choice(letters) for i in range(stringLength))
NAME_MASKED_IMAGE = 'name_maskedhelm.azurecr.io/name_masked_lite:v0.1'
IMAGE_SECRET_NAME = 'containersecret'
# using MPI
LAUNCH_NAME_MASKED = '/company_inc/bin'
NFS_NAME_MASKED_VOLUME = {'name': 'nfs-name_masked-volume',
'nfs': {'server': '10.0.0.12', # gobetween
'path': '/mnt/company_inc',
'readOnly': True}}
NAME_MASKED_NFS_CONTAINER = {
'name': 'name_masked-ctr',
'image': NAME_MASKED_IMAGE,
'command': ['/bin/sh',
'-ec',
LAUNCH_NAME_MASKED],
'volumeMounts': [{'name': 'nfs-name_masked-volume', 'mountPath': '/company_inc'}],
'resources': {'requests': {'cpu': '500m', 'memory': '512Mi'},
'limits': {'cpu': '1000m', 'memory': '1024Mi'}}}
SOMENAME_JOB_NFS = {
'apiVersion': 'batch/v1',
'kind': 'Job',
'metadata': {'name': 'name_masked-UNNAMED',
'namespace': NAMESPACE},
'spec': {'backoffLimit': 1,
'template': {'spec': {'restartPolicy': 'Never',
'backoffLimit': 1,
'containers': [NAME_MASKED_NFS_CONTAINER],
'imagePullSecrets': [{'name': IMAGE_SECRET_NAME}],
'volumes': [NFS_NAME_MASKED_VOLUME]}}}
}
BASE_SOMENAME_POD = {
'apiVersion': 'v1',
'kind': 'Pod',
'metadata': {'name': 'name_masked',
'namespace': NAMESPACE},
'spec': {'restartPolicy': 'Never',
'backoffLimit': 1,
'containers': [NAME_MASKED_NFS_CONTAINER],
'imagePullSecrets': [{'name': IMAGE_SECRET_NAME}],
'volumes': [NFS_NAME_MASKED_VOLUME]}}
IMAGE_SECRET = {'apiVersion': 'v1',
'type': 'kubernetes.io/dockerconfigjson',
'kind': 'Secret',
'metadata' : {'name': IMAGE_SECRET_NAME,
'namespace': NAMESPACE},
'data': {'.dockerconfigjson': 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXx'}
}
# above secret is created with:
# kubectl create secret docker-registry containersecret --docker-server=name_maskedhelm.azurecr.io --docker-username=name_maskedhelm --docker-password=IZUV0FI/XXXzove9KLa7FOvikO6eKFLt --docker-email=name@company.com --namespace name_masked
def open_logger(loglevel='DEBUG'):
""" Opens a log """
# don't add another handler if log has already been initialized
if hasattr(open_logger, 'log'):
open_logger.log.handlers[0].setLevel(loglevel.upper())
else:
log = logging.getLogger()
ch = logging.StreamHandler()
ch.setLevel(loglevel.upper())
formatstr = '%(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s'
ch.setFormatter(logging.Formatter(formatstr))
log.addHandler(ch)
open_logger.log = log
return open_logger.log
class PodScheduler():
"""Pod scheduler"""
def __init__(self, log_level='DEBUG',
reserve_target=4,
clear_on_init=False,
server_port=SERVER_PORT,
block=True,
instance_timeout=DEFAULT_TIMEOUT,
image=None,
default_cpu_request=DEFAULT_CPU_REQUEST,
default_cpu_limit=DEFAULT_CPU_LIMIT,
default_ram_request=DEFAULT_RAM_REQUEST,
default_ram_limit=DEFAULT_RAM_LIMIT,
keep_reserve=DEFAULT_KEEP_RESERVE,
clear_on_exit=True):
"""Initialize job controller"""
open_logger(log_level)
self.deployment_name = None
self._reserve_target = reserve_target
self.enable_watch = True
self._active = True
self._events = []
self._server_port = server_port
self._server = None
self._clear_on_exit = clear_on_exit
# self._instance_state_locked = True
self._assigned = {}
self._pods = None
self._terminate = False
self._image = image
self._purge_dangling_resources = False
# self._job_count = 0
# self._lock_count = False
self._requested_pods = []
self._timeouts = {}
self._name_lock = False
self._default_cpu_request = default_cpu_request
self._default_cpu_limit = default_cpu_limit
self._default_ram_request = default_ram_request
self._default_ram_limit = default_ram_limit
LOG.info('Default culling SOMENAME instances at %d seconds', instance_timeout)
self._instance_timeout = instance_timeout
# load credentials based on if running within a kubernetes pod
if 'KUBERNETES_SERVICE_HOST' in os.environ:
config.load_incluster_config()
else:
# config file must contain the correct IP address, cert, and admin password
# copy from microk8s server with:
# microk8s.kubectl config view --raw > $HOME/.kube/config
path = os.path.dirname(os.path.realpath(__file__))
config.load_kube_config(os.path.join(path, 'config'))
configuration = client.Configuration()
configuration.verify_ssl = False # necessary for microk8s
client.Configuration.set_default(configuration)
# connect to kubernetes api
self.core_api = client.CoreV1Api()
# create name_masked namespace if it doesn't exist
self.create_namespace(NAMESPACE)
# add secrets
self._add_image_secret()
if clear_on_init:
LOG.info('Clearing on initialization')
self.clear()
# start port assignment thread
self.start_port_server()
if block:
self.block_execution()
def _add_image_secret(self):
"""Initialize Azure container resource secret"""
if IMAGE_SECRET_NAME not in self.secret_names:
self.core_api.create_namespaced_secret(NAMESPACE, IMAGE_SECRET)
LOG.info(f'Added image secret "{IMAGE_SECRET_NAME}"')
else:
LOG.info(f'Secret "{IMAGE_SECRET_NAME}" already exists')
def _remove_image_secret(self):
"""Remove Azure container resource secret"""
if IMAGE_SECRET_NAME in self.secret_names:
response = self.core_api.delete_namespaced_secret(IMAGE_SECRET_NAME,
NAMESPACE)
LOG.info(f'Removed image secret "{IMAGE_SECRET_NAME}"')
@property
def secret_names(self):
secret_names = []
for secret in self.secrets:
secret_names.append(secret.metadata.name)
return secret_names
@property
def secrets(self):
"""NAME_MASKED namespace secrets"""
return self.core_api.list_namespaced_secret(NAMESPACE).items
def clear(self):
"""Delete all services, jobs, and pods in the 'name_masked' namespace"""
# self.delete_all_services()
# self.delete_all_jobs()
self.delete_all_pods()
# self._instances = {}
@property
def pods(self):
"""Name_Masked pods"""
self._pods = self.core_api.list_namespaced_pod(NAMESPACE).items
return self._pods
@property
def active_pods(self):
"""running pods"""
active_pods = []
for pod in self.pods:
status = pod.status
metadata = pod.metadata
if status.container_statuses:
container_status = status.container_statuses[0]
if hasattr(container_status, 'ready'):
if container_status.ready:
active_pods.append(metadata.name)
return active_pods
def delete_all_pods(self):
"""Delete all jobs"""
for pod in self.pods:
self.delete_pod(pod.metadata.name)
def delete_pod(self, pod_name):
"""Delete a NAME_MASKED namespaced pod"""
try:
self.core_api.delete_namespaced_pod(pod_name, NAMESPACE)
LOG.info(f'Deleted pod {pod_name}')
except ApiException:
LOG.error(f'Unable to delete pod {pod_name}')
def _wait_for_pod(self, pod_name, timeout=20):
"""Returns pod yaml when pod is ready. Otherwise, raises an exception"""
tstart = time.time()
while (time.time() - tstart) < timeout:
time.sleep(0.5)
pod = self.core_api.read_namespaced_pod(pod_name, NAMESPACE)
status = pod.status
metadata = pod.metadata
if status.conditions:
if status.conditions[0].reason == 'Unschedulable':
reason = status.conditions[0].message
raise RuntimeError(f'Unable to create pod: {reason}')
if status.container_statuses:
container_status = status.container_statuses[0]
if hasattr(container_status, 'ready'):
if container_status.ready:
return pod
if hasattr(container_status, 'state'):
if container_status.state.terminated:
if hasattr(container_status.state.terminated, 'exit_code'):
if container_status.state.terminated.exit_code:
reason = container_status.state.terminated.reason
raise RuntimeError(f'Unable to create pod: {reason}')
if hasattr(container_status.state, 'waiting'):
if container_status.state.waiting is not None:
reason = container_status.state.waiting.message
if reason is not None:
raise RuntimeError(f'Unable to create pod: {reason}')
raise RuntimeError(f'TIMEOUT at {timeout} seconds:\nUnable to create pod: Logs:\n {pod}')
def _build_pod_yaml(self, cpu_request=None, ram_request=None,
cpu_limit=None, ram_limit=None, custom_command=None):
"""Build a job body"""
# copy job yaml and modify it
pod_name = self._assign_pod_name()
pod_yaml = dict(BASE_SOMENAME_POD)
pod_yaml['metadata']['name'] = pod_name
if cpu_request is None:
cpu_request = self._default_cpu_request
if cpu_limit is None:
cpu_limit = self._default_cpu_limit
if ram_request is None:
ram_request = self._default_ram_request
if ram_limit is None:
ram_limit = self._default_ram_limit
container = pod_yaml['spec']['containers'][0]
limits = container['resources']['limits']
requests = container['resources']['requests']
limits['cpu'] = '%.2f' % float(cpu_limit)
limits['memory'] = '%.2fGi' % float(ram_limit)
requests['cpu'] = '%.2f' % float(cpu_request)
requests['memory'] = '%.2fGi' % float(ram_request)
LOG.info(f'Pod "{pod_name}" requested with:')
LOG.info(f'Requests: {requests}')
LOG.info(f'Limits: {limits}')
# configure name_masked number of CPUs
if custom_command is not None:
command = custom_command
else:
command = LAUNCH_NAME_MASKED.replace('-grpc', '-np %d -grpc' % int(cpu_limit))
LOG.info(f'Launching NAME_MASKED with "{command}"')
container['command'][-1] = command
return pod_yaml
def _spawn_pod(self, pod_yaml):
"""Initialize pod and return the pod name"""
# multiple attepts to create job due to potential naming conflicts
resp = None
pod_name = pod_yaml['metadata']['name']
while resp is None:
try:
resp = self.core_api.create_namespaced_pod(NAMESPACE, pod_yaml)
except Exception as exception:
# sometimes a there's a conflict with the job name
# due to multiple simultaneous requests
if 'AlreadyExists' in str(exception):
pod_name = self._assign_pod_name()
pod_yaml['metadata']['name'] = pod_name
else:
raise exception
# might have changed
return pod_name
def create_name_masked_pod(self, cpu_request=None, ram_request=None,
cpu_limit=None, ram_limit=None, timeout=120,
custom_command=None, pod_timeout=None):
"""Create a name_masked pod"""
pod_yaml = self._build_pod_yaml(cpu_request, ram_request,
cpu_limit, ram_limit, custom_command)
pod_name = self._spawn_pod(pod_yaml)
# set pod timeout countdown thread
self._kill_pod_timeout(pod_name, pod_timeout)
# wait until a pod is created
try:
pod = self._wait_for_pod(pod_name)
except Exception as e:
time.sleep(1)
self.delete_pod(pod_name)
raise Exception(e)
return pod
def _assign_pod_name(self):
"""Generates a unique pod name"""
while self._name_lock:
time.sleep(0.001)
self._name_lock = True
pod_name = 'name_masked-%s' % random_string(10)
while pod_name in self._requested_pods:
pod_name = 'name_masked-%s' % random_string(10)
self._requested_pods.append(pod_name)
self._name_lock = False
return pod_name
@threaded
def _kill_pod_timeout(self, pod_name, pod_timeout=None):
"""Kill a pod once a timeout has been exceeded"""
if pod_timeout is None:
pod_timeout = self._instance_timeout
elif pod_timeout > 86400: # ensure pod timeouts cannot exceed 1 day
pod_timeout = 86400
elif pod_timeout < 20: # minimum 20 seconds
pod_timeout = 20
LOG.info(f'Configured timeout for {pod_name} to {pod_timeout} seconds')
time.sleep(pod_timeout)
LOG.info(f'Triggered timeout for {pod_name} exceeded instance timeout ' +
f'of {pod_timeout} seconds')
self.delete_pod(pod_name)
def create_namespace(self, namespace):
"""Creates a namespace"""
namespace_yaml = {
"apiVersion": "v1",
"kind": "Namespace",
"metadata": {
"name": namespace,
"labels": {
"name": namespace
}
}
}
namespace_exists = False
for _namespace in self.namespaces:
if namespace == _namespace.metadata.name:
namespace_exists = True
break
if not namespace_exists:
response = self.core_api.create_namespace(namespace_yaml)
LOG.debug(f'Created namespace "{namespace}"')
else:
LOG.debug(f'Namespace "{namespace}" exists')
@property
def namespaces(self):
"""cluster namespaces"""
return self.core_api.list_namespace().items
@threaded
def _assign_client(self, client):
"""Receives a request from a client and returns an ip address
"""
while True:
# data received from client
request = client.recv(1024)
if not request:
break
cpu_request, ram_request, cpu_limit, ram_limit, command, pod_timeout = decode_request(request)
try:
# assign a port
pod = self.create_name_masked_pod(cpu_request=cpu_request,
ram_request=ram_request,
cpu_limit=cpu_limit,
ram_limit=ram_limit,
custom_command=command,
pod_timeout=pod_timeout)
# ip is the pod ip
ip = pod.status.pod_ip
port = 49999
message = f'{ip}:{port}'
client.send(message.encode())
except Exception as e:
message = 'EXCEPTION: %s' % str(e)
client.send(message.encode())
client.close()
@threaded
def start_port_server(self):
"""accepts incoming connections on _server_port and returns a port"""
self._server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._server.bind(("", self._server_port))
# put the socket into listening mode
self._server.listen(5)
LOG.info(f'Listening on port {self._server_port}')
# listen indefinitely for new clients
while True:
# establish connection with client
client, addr = self._server.accept()
LOG.info('Opening connection from %s:%s', str(addr[0]), str(addr[1]))
self._assign_client(client)
def block_execution(self):
"""Main thread to stop python from exiting.
This instance can be exited gracefully in two ways:
- With Ctrl-c
- With SIGTERM
"""
def received_sigterm(*args):
"""Handles SIGTERM"""
LOG.info('Received SIGTERM')
self._terminate = True
# listen for sigterm
LOG.info('PID: %d' % os.getpid())
signal.signal(signal.SIGTERM, received_sigterm)
LOG.debug('Blocking execution. Press Ctrl-c to open breakpoint')
# user_break = False
while not self._terminate:
try:
time.sleep(0.1)
except KeyboardInterrupt:
import pdb; pdb.set_trace()
# user_break = True
resp = input('Break? [Y/n]').lower()
if resp == '' or resp == 'y':
break
LOG.info('Deleting all NAME_MASKED jobs and pods due to clear_on_exit=True')
if self._clear_on_exit:
self.clear()
self._close_server()
def __del__(self):
self._close_server()
def exit(self, clear_jobs=True):
"""Clear jobs and close server"""
if clear_jobs:
self.clear()
self._close_server()
def _close_server(self):
"""Close connection to server"""
if self._server is not None:
self._server.close()
LOG.info(f'Closed server on port {self._server_port}')
if __name__ == '__main__':
""" Supports calling gui directly with command line arguments """
parser = argparse.ArgumentParser(description='name_masked job controller')
# parser.add_argument('-loglevel', metavar='DEBUG', type=str,
# required=False, default='DEBUG',
# help='Log level to use (DEBUG, INFO, WARNING, ERROR)')
parser.add_argument('-timeout', type=str, metavar='',
required=False,
help='timeout in seconds for interactive name_masked sessions')
parser.add_argument('-default_cpu_request', type=str, metavar='',
required=False,
help='Number of cpus to request')
parser.add_argument('-default_cpu_limit', type=str, metavar='',
required=False,
help='Number of cpus to limit to')
parser.add_argument('-default_ram_request', type=str, metavar='',
required=False,
help='RAM request in GB')
parser.add_argument('-default_ram_limit', type=str, metavar='',
required=False,
help='RAM limit in GB')
parser.add_argument('--keep_reserve',
help='Keep a minimum number of name_masked instances on reserve',
action="store_true")
args = parser.parse_args()
script = None
if args.default_cpu_request:
default_cpu_request = args.default_cpu_request
else:
default_cpu_request = DEFAULT_CPU_REQUEST
if args.default_cpu_limit:
default_cpu_limit = args.default_cpu_limit
else:
default_cpu_limit = DEFAULT_CPU_LIMIT
if args.default_ram_request:
default_ram_request = args.default_ram_request
else:
default_ram_request = DEFAULT_RAM_REQUEST
if args.default_ram_limit:
default_ram_limit = args.default_ram_limit
else:
default_ram_limit = DEFAULT_RAM_LIMIT
if args.keep_reserve:
keep_reserve = args.keep_reserve
else:
keep_reserve = DEFAULT_KEEP_RESERVE
# default timeout of one hour
if args.timeout:
timeout = int(args.timeout)
else:
timeout = DEFAULT_TIMEOUT
PodScheduler(clear_on_init=False,
log_level='INFO',
instance_timeout=timeout,
keep_reserve=keep_reserve,
default_cpu_request=default_cpu_request,
default_cpu_limit=default_cpu_limit,
default_ram_request=default_ram_request,
default_ram_limit=default_ram_limit)