跟踪 python 函数

strace a python function

是否可以对打开的文件跟踪 python 函数,并区分它们是由 python 还是子进程打开的?

read_python, read_external = [], []

@strace_read(read_python, read_external)
function test():
    file = open("foo.txt", "r")
    subprocess.call(["cat", "bar.txt"])

for file in read_python:
    print("python:      ", file)
for file in read_external:
    print("external:    ", file)

所以输出为:

>>> python:      foo.txt
>>> external:    bar.txt

我最感兴趣的是使用装饰器。差异化不是优先事项。

从概念上讲,我最好的猜测是用包装器替换 load_function(open) 的实例......实际上,我不知道,有太多方法可以访问 open

我会用更简单的方法解决它,但结果相似。而不是弄清楚如何在单个函数上启用 strace:

  1. 像这样创建装饰器:(未测试)

-

def strace_mark(f):
    def wrapper(*args, **kwargs):
        try:
            open('function-%s-start' % f.__name__, 'r')
        except:
            pass
        ret = f(*args, **kwargs)
        try:
            open('function-%s-end' % f.__name__, 'r')
        except:
            pass
        return ret
  1. 运行 strace -e file.
  2. 下的整个应用
  3. 仅获取调用 open(function-something-start)open(function-something-end) 之间的部分。

如果您这样做 strace -f,您将免费获得 python/external 分离。看看pid调用什么函数就可以了

这是我使用的解决方案:

#!/usr/bin/env python3
import multiprocessing
import selectors
import os
import array
import fcntl
import termios
import subprocess
import decorator
import locale
import io
import codecs
import re
import collections

def strace(function):
    StraceReturn = collections.namedtuple("StraceReturn", ["return_data", "pid", "strace_data"])
    def strace_filter(stracefile, pid, exclude_system=False):
        system = ( "/bin"
                 , "/boot"
                 , "/dev"
                 , "/etc"
                 , "/lib"
                 , "/proc"
                 , "/root"
                 , "/run"
                 , "/sbin"
                 , "/srv"
                 , "/sys"
                 , "/tmp"
                 , "/usr"
                 , "/var"
                 )
        encoding = locale.getpreferredencoding(False)
        for line in stracefile:
            match = re.search(r'^(?:\[pid\s+(\d+)\]\s+)?open\(\"((?:\x[0-9a-f]{2})+)\",', line, re.IGNORECASE)
            if match:
                p, f = match.groups(pid)
                f = codecs.escape_decode(f.encode("ascii"))[0].decode(encoding)
                if exclude_system and f.startswith(system):
                    continue
                yield (p, f)
    def strace_reader(conn_parent, conn_child, barrier, pid):
        conn_parent.close()
        encoding = locale.getpreferredencoding(False)
        strace_args = ["strace", "-e", "open", "-f", "-s", "512", "-xx", "-p", str(pid)]
        process_data = io.StringIO()
        process = subprocess.Popen\
            ( strace_args
            , stdout = subprocess.DEVNULL
            , stderr = subprocess.PIPE
            , universal_newlines = True
            )
        selector = selectors.DefaultSelector()
        selector.register(process.stderr, selectors.EVENT_READ)
        selector.select()
        barrier.wait()
        selector.register(conn_child, selectors.EVENT_READ)
        while len(selector.get_map()):
            events = selector.select()
            for key, mask in events:
                if key.fd == conn_child.fileno():
                    conn_child.recv()
                    selector.unregister(key.fd)
                    process.terminate()
                    try:
                        process.wait(5)
                    except TimeoutError:
                        process.kill()
                    process.wait()
                else:
                    ioctl_buffer = array.array("i", [0])
                    try:
                        fcntl.ioctl(key.fd, termios.FIONREAD, ioctl_buffer)
                    except OSError:
                        read_bytes = 1024
                    else:
                        read_bytes = max(1024, ioctl_buffer[0])
                    data = os.read(key.fd, read_bytes)
                    if data:
                        # store all data, simpler but not as memory-efficient
                        # as:
                        #   result, leftover_line = strace_filter\
                        #         ( leftover_line + data.decode(encoding)
                        #         , pid
                        #         )
                        #   process_data.append(result)
                        # with, after this loop, a final:
                        #   result = strace_filter(leftover_line + "\n", pid)
                        #   process_data.append(result)
                        process_data.write(data.decode(encoding))
                    else:
                        selector.unregister(key.fd)
        selector.close()
        process_data.seek(0, io.SEEK_SET)
        for pidfile in strace_filter(process_data, pid):
            conn_child.send(pidfile)
        conn_child.close()
    def strace_wrapper(function, *args, **kw):
        strace_data = list()
        barrier = multiprocessing.Barrier(2)
        conn_parent, conn_child = multiprocessing.Pipe(duplex = True)
        process = multiprocessing.Process\
            ( target=strace_reader
            , args=(conn_parent, conn_child, barrier, os.getpid())
            )
        process.start()
        conn_child.close()
        barrier.wait()
        function_return = function()
        conn_parent.send(None)
        while True:
            try:
                strace_data.append(conn_parent.recv())
            except EOFError:
                break
        process.join(5)
        if process.is_alive():
            process.terminate()
        process.join(5)
        if process.is_alive():
            os.kill(process.pid, signal.SIGKILL)
        process.join()
        conn_parent.close()
        return StraceReturn(function_return, os.getpid(), strace_data)
    return decorator.decorator(strace_wrapper, function)

@strace
def test():
    print("Entering test()")
    process = subprocess.Popen("cat +μυρτιὲς.txt", shell=True)
    f = open("test\"test", "r")
    f.close()
    process.wait()
    print("Exiting test()")
    return 5

print(test())

请注意,将收集终止事件后生成的任何信息 strace。为避免这种情况,请使用 while not signaled 循环,并在 循环后终止子进程 (FIONREAD ioctl 是这种情况下的保留;我没有看到任何删除的理由它)。

事后看来,如果我使用临时文件而不是 multiprocessing/pipe,装饰器可能会大大简化。

一个 child 进程被 fork 到然后 fork strace - 换句话说,strace 正在跟踪它的 grandparent。一些 linux 发行版只允许 strace 追踪其 children。我不确定如何解决此限制 - 让主程序继续在 child 分支中执行(而 parent execs strace)可能是个坏主意 - 该程序将如果经常使用修饰函数,就像烫手山芋一样交易 PID。