连接到 docker.sock 以仅使用 python 套接字获取容器统计信息

Connect to docker.sock to get container stats using python socket only

因此,我试图获取服务器上 docker 个容器 运行 的一些统计信息。我尝试了多种方法:

前两个工作得很好,但我无法使用套接字方法。我想比较这些方法,以确定哪种方法更快、更适合我的使用。 基本上,我正在尝试将 curl 命令转换为 python 请求。 这是我使用的 curl 命令: curl -H --unix-socket /var/run/docker.sock http://localhost/containers/CONTAINER_ID/stats?stream=false

有人可以帮我解决这个问题吗?

  1. 确保您想要 运行 Python SDK 的进程可以访问 /var/run/docker.sock。在示例中,我将它作为一个卷安装到容器中
  2. python 代码无论如何都会使用套接字,因此无需生成额外的子进程来执行 curl。您可以完全访问所有状态信息。在需要更详细的地方使用
  htpc:
    container_name: htpc
    build:
      context: .
      dockerfile: flask-Dockerfile
    ports: 
      - "5000:5000"
    restart: unless-stopped
    volumes: 
      - /var/run/docker.sock:/var/run/docker.sock  # map through socket
def docker_api():
    client = docker.from_env()
    if request.args["action"] == "list":
        l = []
        for c in client.containers.list(all=True):
            # string conversion only works for 6 digit ms,  strip back to 26 chars
            if c.status == "running":
                utc_time = datetime.strptime(client.api.inspect_container(c.short_id)["State"]["StartedAt"][:26]+"Z", "%Y-%m-%dT%H:%M:%S.%fZ")
            else:
                utc_time = datetime.utcnow()
            l.append({"id":c.short_id, "name":c.name, "image":c.image.tags[0] if len(c.image.tags)>0 else "unknown", 
                      "status":c.status, "logs":True, "inspect":True,
                      "stats":c.status=="running", "restart":c.status=="running", "stop":c.status=="running", "start":c.status!="running", "delete":True,
                      "uptime":(utc_time - datetime(1970, 1, 1)).total_seconds(),
                      })
        return Response(json.dumps(l), mimetype="text/json")
    elif request.args["action"] == "logs":
        return Response(client.containers.get(request.args["id"]).logs().decode(), mimetype="text/plain")
    elif request.args["action"] == "stats":
        return Response(json.dumps(client.containers.get(request.args["id"]).stats(stream=False)), mimetype="text/json")
    elif request.args["action"] == "inspect":
        js = client.api.inspect_container(request.args["id"])
        m = {"StartedAt":js["State"]["StartedAt"], "Mounts":js["Mounts"]}
        for i, mp in enumerate(m["Mounts"]):
            m["Mounts"][i] = {k: mp[k] for k in ('Source', 'Destination')}
        e = {"Env":js["Config"]["Env"], 'RestartPolicy':js['HostConfig']['RestartPolicy']}
        p = {"Ports":js["NetworkSettings"]["Ports"]}
        js = {**m, **e, **p} #, **js}
        return Response(json.dumps(js), mimetype="text/json")