Python 的多进程编程(二)

释放双眼,带上耳机,听听看~!

原文出处

这里顺带说下刷新和关闭文件吧。

POSIX 系统提供了非常多的函数用于刷新:

  • fflush() 函数:用于刷新 stdio 流(stream)。对于输出流,它强制将用户空间的缓冲区(buffer),通过这个流底层的 write()函数写入到该流。对于关联到可定位(seekable)文件的输入流,它丢弃掉已经被底层文件读取(fetch),但还没被应用消费(consume)的buffer(因为对于 seekable 的文件而言,只需要重新定位再读一次即可)。如果它的 FILE *stream 参数为NULL,则刷新所有打开的流。简单来说,它的作用主要是将用户空间的缓冲区写入内核空间。但是用户空间的缓冲区是和所用的语言和库相关的,fflush() 刷新的只是 C 语言的 stdio 库创建的 FILE 类型的 buffer。对于 Python 而言,你应该使用 file.flush() 方法;如果没有输出缓冲区的话,直接使用os.write()和 socket.send() 就能写入内核空间了,不需要刷新。
  • fsync() 和 fdatasync()系统调用:用于将内核中缓存的对文件的修改写入磁盘。两者的区别在于后者不写入元数据(metadata)。它们都是 async-signal-safe 的函数。很显然,这两个函数都只对磁盘文件有效,不能用于刷新socket。此外,磁盘也有自己的缓冲区,内核也只是将数据写入磁盘缓冲区而已,并不一定真正被写入磁盘了。macOS 的fcntl()系统调用提供了 F_FULLFSYNC 选项,可以要求将数据写入磁盘的永久存储设备中(只对 HFS、FAT 和 UDF文件系统有效)。

    大部分的程序都不需要用到它们,因为只要不是断电等意外原因导致非正常关机的话,内核和磁盘驱动最终都会将文件写入磁盘的(尽管那时你可能已经关闭文件或者退出进程了)。要考虑到断电时的数据安全的话,主要也就是数据库之类的应用了。
    Python 提供了 os.fsync() 函数来封装 fsync() 系统调用。

  • aio_fsync() 函数:用于刷新异步 IO 操作到磁盘。功能类似于 fsync() 和 fdatasync(2),但是只发出请求,不等待实际的操作完成就返回。
  • sync() 和 syncfs() 系统调用:将文件系统的缓存写入磁盘。二者区别在于后者只刷新该 fd 所对应的文件的文件系统。
    很显然,这是两个很重的系统调用,一般不需要使用。

而 POSIX 的 close() 和 _exit() 系统调用并不会进行任何刷新操作,fclose()、exit() 和 abort() 函数可以看成隐式地调用了 fflush() 和 close()。

对应到 Python 中,文件对象的 close() 方法会刷新用户空间的缓冲区;os.close()、os._exit() 和 os.abort() 与对应的 C 函数行为一致,并且因为 abort() 函数并不知道 Python 是怎样使用用户空间的缓冲区的,所以并不会刷新 Python 的文件对象;其他正常的退出方式会刷新用户空间的缓冲区,并进行其他的清理工作。

最后,我们来利用上述知识来实现一个需求:编写一个长期运行的程序,它每隔一段时间(如 5 秒)访问一次远程服务器,查询一次是否有新的命令。有的话就执行该命令,并且上报命令的执行结果和输出。不会有并发的任务需要执行,但对于执行时间较长的命令,需要每隔一段时间上报它当前的输出。

很显然,这个程序应该是一个多进程程序,使用子进程来执行该命令;如果是多线程程序的话,执行时很可能导致主线程挂掉。

于是先来看看这个主进程的架构:

class Daemon(object):
    def __init__(self):
        self._action_id = None
        self._done_action = False

    def run(self):
        while True:
            start_time = time.time()

            status = self.collect_status()
            if status and self.report_status(status) and self._done_action:
                self.clean_status()

            if not self._action_id:
                action = self.fetch_action()
                if action and self.accept_action(action):
                    self.handle_action(action)

            wait_time = FETCH_ACTION_INTERVAL - (time.time() - start_time)
            if wait_time > 0:
                self.wait(wait_time)

    def fetch_action(self):
        raise NotImplementedError

    def handle_action(self, action):
        raise NotImplementedError

    def accept_action(self, action):
        raise NotImplementedError

    def collect_status(self):
        raise NotImplementedError

    def report_status(self, status):
        raise NotImplementedError

    def clean_status(self):
        raise NotImplementedError

    @staticmethod
    def wait(wait_time):
        time.sleep(wait_time)

FETCH_ACTION_INTERVAL = 5

先来看 wait() 方法,如果子进程提前执行结束了,应该尽快上报结果,而不该傻傻地 sleep。考虑到 signal 模块有个定时器的功能,先用它来实现吧:

class Daemon(object):
    def run(self):
        try:
            self.register_signals()

            while True:
                start_time = time.time()

                status = self.collect_status()
                if status and self.report_status(status) and self._done_action:
                    self.clean_status()

                if not self._action_id:
                    action = self.fetch_action()
                    if action and self.accept_action(action):
                        self.handle_action(action)

                wait_time = FETCH_ACTION_INTERVAL - (time.time() - start_time)
                if wait_time > 0:
                    self.wait(wait_time)
        finally:
            self.deregister_signals()

    @staticmethod
    def ignore_signal(signum, frame):
        return

    def register_signals(self):
        signal.signal(signal.SIGALRM, self.ignore_signal)
        signal.signal(signal.SIGCHLD, self.ignore_signal)

    def deregister_signals(self):
        signal.setitimer(signal.ITIMER_REAL, 0)
        signal.signal(signal.SIGALRM, signal.SIG_DFL)
        signal.signal(signal.SIGCHLD, signal.SIG_DFL)

    def wait(self):
        signal.setitimer(signal.ITIMER_REAL, wait_time)
        signal.pause()
        signal.setitimer(signal.ITIMER_REAL, 0)

因为 signal.pause() 会在收到任意信号后继续运行,所以定时器的 SIGALRM 信号和子进程退出的 SIGCHLD 都可以唤醒它,也就实现了子进程退出时立刻唤醒,否则等待最多 5 秒的功能。但是我们并不知道这次唤醒是不是由 SIGALRM 信号造成的,所以安全起见还是清除一下定时器,避免它可能打断之后的系统调用。

然而这种实现仍然有不足之处,如果使用 IDE 来调试的话,暂停在 signal.pause() 这步可能导致错过定时器发出的 SIGALRM 信号,使得程序没法继续执行,有时还可能出现各种意外的退出情况。更可靠的方式是使用唤醒 fd,再用 select 等方式监听 fd 是否可读,并设置超时时间。为了使用比较高效的 select 模型,又不用考虑平台的可移植性(例如我经常在 macOS 和 Linux 之间切换),我采用了 selectors 标准库(Python 3.4 之前的版本可以使用 selectors2 或 selectors34 这两个第三方库来替代):

class Daemon(object):
    def __init__(self):
        self._action_id = None
        self._done_action = False
        self._selector = None
        r, w = os.pipe()
        set_non_blocking(r)
        set_non_blocking(w)  # wakeup fd 需要是非阻塞的
        self._waker = r, w

    def register_signals(self):
        signal.signal(signal.SIGCHLD, self.ignore_signal)
        self._selector = selectors.DefaultSelector()
        self._selector.register(self._waker[0], selectors.EVENT_READ)
        signal.set_wakeup_fd(self._waker[1])

    def deregister_signals(self):
        signal.signal(signal.SIGALRM, signal.SIG_DFL)
        signal.set_wakeup_fd(-1)
        if self._selector:
            self._selector.close()
            self._selector = None

    def wait(self, wait_time):
        ready = self._selector.select(wait_time)
        if ready:
            waker = self._waker[0]
            # 读完并忽略所有的数据,避免 _waker[1] 被阻塞而无法写入
            try:
                while True:
                    data = os.read(waker, BUF_SIZE)
                    if not data or len(data) < BUF_SIZE:  # 没有更多数据了
                        break
            except OSError:  # 忽略 EAGAIN 和 EINTR
                pass

def set_non_blocking(fd):
    flags = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)

BUF_SIZE = 1024

经过这样改造后,就没有上述缺点了。顺带还获得了一个好处,如果和远程服务器之间是长链接,那就可以监听该 socket 的 fd,一有消息就立刻跳到下个循环,加快 fetch_action() 的调用。

再来看子进程的实现:

class Worker(object):
    def __init__(self, action):
        try:
            os.setsid()
        except OSError:
            worker_logger.warning('failed to setsid, the worker should be created in a forked process')

        user_name = action.get('user_name')
        if not user_name:
            raise InvalidAction('user_name is required')
        if not isinstance(user_name, basestring):
            raise InvalidAction('user_name must be a string object')

        try:
            user = pwd.getpwnam(user_name)
        except KeyError:
            raise UserNotFound

        group_name = action.get('group_name')
        if group_name:
            if not isinstance(group_name, basestring):
                raise InvalidAction('group_name must be a string object')
            try:
                user_group = grp.getgrnam(group_name)
            except KeyError:
                raise GroupNotFound
            if user_name not in user_group.gr_mem:
                raise UserNotInGroup('user %s not in group %s' % (user_name, group_name))
            os.setgid(user_group.gr_gid)
        else:
            os.setgid(user.pw_gid)

        os.setuid(user.pw_uid)
        os.chdir(user.pw_dir)

        redirect_fd(0, os.devnull, os.O_RDONLY)  # 避免运行一些需要 TTY 的命令,如 top
        redirect_fd(1, STDOUT_FILE_PATH), os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
        redirect_fd(2, STDERR_FILE_NAME), os.O_WRONLY | os.O_CREAT | os.O_TRUNC)

        self.action = action

        self.save_worker_pid()

    def do_action(self):
        gc.disable()  # https://bugs.python.org/issue1336
        try:
            pid = os.fork()
        except OSError:
            worker_logger.exception('failed to fork executor for action ' + self.action.id)
            gc.enable()
            raise
        else:
            if pid == 0:  # executor
                worker_logger.handlers[0].close()
                os.execve('/bin/bash', ['/bin/bash', '--norc', '-c', self.action.execution], {})  # 使用干净的环境来执行
            else:  # worker
                gc.enable()
                result = wait_pid(pid, 0)
                executor_exit_status = result[1]
                self.save_executor_exit_status(exit_status)

    def save_worker_pid(self):
        raise NotImplementedError

    def save_executor_exit_status(self, exit_status):
        raise NotImplementedError

def redirect_fd(fd, path, mode):
    try:
        os.close(fd)
    except OSError:
        pass
    new_fd = os.open(path, mode)
    if new_fd != fd:
        os.dup2(new_fd, fd)
        os.close(new_fd)

def wait_pid(pid, options):
    while True:
        try:
            return os.waitpid(pid, options)
        except OSError as e:
            if e.errno == errno.EINTR:  # 忽略等待时被中断
                continue
            raise

这里先做的工作是设置 UID 和 GID 等,因为执行的用户和 woker 的运行用户可能不一样(后者一般是用 root 用户)。
而在 do_action() 里,worker 并没有直接调用 os.execve(),而是让它的子进程去执行,为的是执行完毕后有机会调用 save_executor_exit_status() 方法。这个方法和 save_worker_pid() 方法的实现我就不列出了,只需要写入文件即可。

为什么要将退出状态和 PID 写入文件,而不是直接通知 daemon 进程呢?因为 daemon 进程虽然是长期运行的,但是有可能因为意外情况退出,也可能需要重启。在它重启之后,新的 daemon 已经和正在运行的 worker 没有联系了,就只能通过文件读取到 worker 的 PID 和 executor 的退出状态,从而恢复运行。这也是我不用 subprocess 模块的原因,因为进程退出后 PIPE 就丢失了。FIFO 也是不可靠的,因为读出来后如果 daemon 进程崩溃了,数据也没法找回;甚至也有可能两个进程都崩溃了,这个 FIFO 就被销毁了。

最后,daemon 剩下的一些繁琐的功能实现我也不列出了,只把 handle_action() 实现一下:

class Daemon(object):
    def handle_action(self, action):
        gc.disable()
        try:
            pid = os.fork()
        except OSError:
            daemon_logger.exception('failed to create worker for action ' + action.id)
            gc.enable()
            # 保存 action exit status
        else:
            if pid == 0:  # worker
                self.deregister_signals()
                os.close(self._waker[0])
                os.close(self._waker[1])
                daemon_logger.handlers[0].close()
                gc.enable()
                exit_code = 0
                try:
                    worker = Worker(action)
                    worker.do_action()
                except SystemExit as e:
                    exit_code = e.code
                except:
                    exit_code = 1  # 根据实际的设置返回不同的错误码
                finally:
                    try:
                        logging.shutdown()  # 刷新未写入的日志
                    except:
                        pass
                    os._exit(exit_code)
            else:  # daemon
                gc.enable()
                # 保存 worker PID,因为 worker 有可能还没保存 PID 就退出了

【转自慕课】https://www.imooc.com

Python

【计算机控制系统课程设计】基于树莓派的智能家居系统

2022-3-3 5:00:40

Python

Python分布式计算包dispy

2022-3-3 5:00:50

搜索