envoy
envoy 库的介绍上说它是一个可以被人类理解的 python 子进程。可以看出这个库的主要做用是使得 python 子进程的使用更加便利高效,并且符合人类的使用习惯。
前置知识点
看源码前需要对下面几个 python 原生库的使用有了解:
shlex
subprocess.Popen
threading.Thread
可以参考本文最后的参考资料。我也是边看源码边看查阅资料 🙂 ,感觉这样比较高效
使用

源码
envoy 首先使用 shlex
对输入的命令行字符串进行解析:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| def expand_args(command): """Parses command strings and returns a Popen-ready list."""
if isinstance(command, (str, unicode)): splitter = shlex.shlex(command.encode('utf-8')) splitter.whitespace = '|' splitter.whitespace_split = True command = []
while True: token = splitter.get_token() if token: command.append(token) else: break
command = list(map(shlex.split, command))
return command
|
如果对 shlex
原生库的使用比较熟悉的(我也不熟悉),这个函数就很容易理解。
举个例子:当函数接受的参数是 ls -l | grep py
时, 函数的 while
循环首先会将输入解析为数组 ['ls -l', 'grep py']
,然后再使用 shlex.split
进一步解析为二维数组 [['ls', '-l'], ['grep', 'py']]
。
这里解析为二维数组是因为后面的 subprocess.Popen
方法的第一个参数是字符串列表。subprocess.Popen
的使用参考本文最后的参考文章。
然后是构建子进程和传递 stdin
和 stdout
的过程, 具体的解析我在代码里做了注释:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| def run(self, data, timeout, kill_timeout, env, cwd): self.data = data environ = dict(os.environ) environ.update(env or {})
def target(): try: ''' 执行子进程 这里的 self.cmd 就是上面 shlex 处理后返回的字符串列表 注意接受字符串列表时 shell=False 要给子进程的stdin发送数据,则Popen的时候,stdin要为PIPE; 同理,要可以发送数据的话,stdout或者stderr也要为PIPE ''' self.process = subprocess.Popen(self.cmd, universal_newlines=True, shell=False, env=environ, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=0, cwd=cwd, )
if sys.version_info[0] >= 3: ''' 和子进程交互: 将上一个子进程的 stdout 数据 PIPE 到这个子进程的 stdin 并从stdout和stderr读数据,直到收到EOF ''' self.out, self.err = self.process.communicate( input = bytes(self.data, "UTF-8") if self.data else None ) else: self.out, self.err = self.process.communicate(self.data) except Exception as exc: self.exc = exc
thread = threading.Thread(target=target) thread.start() thread.join(timeout) if self.exc: raise self.exc if _is_alive(thread) : _terminate_process(self.process) thread.join(kill_timeout) if _is_alive(thread): _kill_process(self.process) thread.join() self.returncode = self.process.returncode return self.out, self.err
|
另外,对于 Popen
的使用,下面是一个简单的例子:

参考资料
python 中的 subprocess.Popen()使用
Python 多线程编程(一):threading 模块 Thread 类的用法详解