Allison 是 Dropbox 的工程师,在那里她维护着这个世界上最大的 Python 客户端网络之一。在去 Dropbox 之前,她是 Recurse Center 的协调人, 是这个位于纽约的程序员深造机构的作者。她在北美的 PyCon 做过关于 Python 内部机制的演讲,并且她喜欢研究奇怪的 bug。她的博客地址是 akaptur.com。
介绍
Byterun 是一个用 Python 实现的 Python 解释器。随着我对 Byterun 的开发,我惊喜地的发现,这个 Python 解释器的基础结构用 500 行代码就能实现。在这一章我们会搞清楚这个解释器的结构,给你足够探索下去的背景知识。我们的目标不是向你展示解释器的每个细节 (3) … and the interpreter is here.
… return z
…
def foo():
… a = 1
… b = 2
… return a + bar(b) # < (1) We’re in the middle of a call to foo …
3
![调用栈](/data/attachment/album/201609/08/141633dwztqktnwthtttto.png)
现在,解释器处于`bar`函数的调用中。调用栈中有 3 个帧:一个对应于模块层,一个对应函数`foo`,另一个对应函数`bar`。(见上图)一旦`bar`返回,与它对应的帧就会从调用栈中弹出并丢弃。
字节码指令`RETURN_VALUE`告诉解释器在帧之间传递一个值。首先,它把位于调用栈栈顶的帧中的数据栈的栈顶值弹出。然后把整个帧弹出丢弃。最后把这个值压到下一个帧的数据栈中。
当 Ned Batchelder 和我在写 Byterun 时,很长一段时间我们的实现中一直有个重大的错误。我们整个虚拟机中只有一个数据栈,而不是每个帧都有一个。我们写了很多测试代码,同时在 Byterun 和真正的 Python 上运行,希望得到一致结果。我们几乎通过了所有测试,只有一样东西不能通过,那就是<ruby> 生成器 <rp> ( </rp> <rt> generators </rt> <rp> ) </rp></ruby>。最后,通过仔细的阅读 CPython 的源码,我们发现了错误所在(感谢 Michael Arntzenius 对这个 bug 的洞悉)。把数据栈移到每个帧就解决了这个问题。
回头在看看这个 bug,我惊讶的发现 Python 真的很少依赖于每个帧有一个数据栈这个特性。在 Python 中几乎所有的操作都会清空数据栈,所以所有的帧公用一个数据栈是没问题的。在上面的例子中,当`bar`执行完后,它的数据栈为空。即使`foo`公用这一个栈,它的值也不会受影响。然而,对应生成器,它的一个关键的特点是它能暂停一个帧的执行,返回到其他的帧,一段时间后它能返回到原来的帧,并以它离开时的相同状态继续执行。
### Byterun
现在我们有足够的 Python 解释器的知识背景去考察 Byterun。
Byterun 中有四种对象。
* `VirtualMachine`类,它管理高层结构,尤其是帧调用栈,并包含了指令到操作的映射。这是一个比前面`Inteprter`对象更复杂的版本。
* `Frame`类,每个`Frame`类都有一个代码对象,并且管理着其他一些必要的状态位,尤其是全局和局部命名空间、指向调用它的整的指针和最后执行的字节码指令。
* `Function`类,它被用来代替真正的 Python 函数。回想一下,调用函数时会创建一个新的帧。我们自己实现了`Function`,以便我们控制新的`Frame`的创建。
* `Block`类,它只是包装了块的 3 个属性。(块的细节不是解释器的核心,我们不会花时间在它身上,把它列在这里,是因为 Byterun 需要它。)
#### `VirtualMachine` 类
每次程序运行时只会创建一个`VirtualMachine`实例,因为我们只有一个 Python 解释器。`VirtualMachine` 保存调用栈、异常状态、在帧之间传递的返回值。它的入口点是`run_code`方法,它以编译后的代码对象为参数,以创建一个帧为开始,然后运行这个帧。这个帧可能再创建出新的帧;调用栈随着程序的运行而增长和缩短。当第一个帧返回时,执行结束。
class VirtualMachineError(Exception):
pass
class VirtualMachine(object):
def init(self):
self.frames = [] # The call stack of frames.
self.frame = None # The current frame.
self.return_value = None
self.last_exception = None
def run_code(self, code, global_names=None, local_names=None):
""" An entry point to execute code using the virtual machine."""
frame = self.make_frame(code, global_names=global_names,
local_names=local_names)
self.run_frame(frame)
#### `Frame` 类
接下来,我们来写`Frame`对象。帧是一个属性的集合,它没有任何方法。前面提到过,这些属性包括由编译器生成的代码对象;局部、全局和内置命名空间;前一个帧的引用;一个数据栈;一个块栈;最后执行的指令指针。(对于内置命名空间我们需要多做一点工作,Python 在不同模块中对这个命名空间有不同的处理;但这个细节对我们的虚拟机不重要。)
class Frame(object):
def init(self, code_obj, global_names, local_names, prev_frame):
self.code_obj = code_obj
self.global_names = global_names
self.local_names = local_names
self.prev_frame = prev_frame
self.stack = []
if prev_frame:
self.builtin_names = prev_frame.builtin_names
else:
self.builtin_names = local_names[‘builtins‘]
if hasattr(self.builtin_names, ‘dict‘):
self.builtin_names = self.builtin_names.dict
self.last_instruction = 0
self.block_stack = []
接着,我们在虚拟机中增加对帧的操作。这有 3 个帮助函数:一个创建新的帧的方法(它负责为新的帧找到名字空间),和压栈和出栈的方法。第四个函数,`run_frame`,完成执行帧的主要工作,待会我们再讨论这个方法。
class VirtualMachine(object):
[… 删节 …]
# Frame manipulation
def make_frame(self, code, callargs={}, global_names=None, local_names=None):
if global_names is not None and local_names is not None:
local_names = global_names
elif self.frames:
global_names = self.frame.global_names
local_names = {}
else:
global_names = local_names = {
'__builtins__': __builtins__,
'__name__': '__main__',
'__doc__': None,
'__package__': None,
}
local_names.update(callargs)
frame = Frame(code, global_names, local_names, self.frame)
return frame
def push_frame(self, frame):
self.frames.append(frame)
self.frame = frame
def pop_frame(self):
self.frames.pop()
if self.frames:
self.frame = self.frames[-1]
else:
self.frame = None
def run_frame(self):
pass
# we'll come back to this shortly
#### `Function` 类
`Function`的实现有点曲折,但是大部分的细节对理解解释器不重要。重要的是当调用函数时 —— 即调用 `__call__`方法 —— 它创建一个新的`Frame`并运行它。
class Function(object):
“””
Create a realistic function object, defining the things the interpreter expects.
“””
slots = [
‘func_code’, ‘func_name’, ‘func_defaults’, ‘func_globals’,
‘func_locals’, ‘func_dict’, ‘func_closure’,
‘name‘, ‘dict‘, ‘doc‘,
‘_vm’, ‘_func’,
]
def __init__(self, name, code, globs, defaults, closure, vm):
"""You don't need to follow this closely to understand the interpreter."""
self._vm = vm
self.func_code = code
self.func_name = self.__name__ = name or code.co_name
self.func_defaults = tuple(defaults)
self.func_globals = globs
self.func_locals = self._vm.frame.f_locals
self.__dict__ = {}
self.func_closure = closure
self.__doc__ = code.co_consts[0] if code.co_consts else None
# Sometimes, we need a real Python function. This is for that.
kw = {
'argdefs': self.func_defaults,
}
if closure:
kw['closure'] = tuple(make_cell(0) for _ in closure)
self._func = types.FunctionType(code, globs, **kw)
def __call__(self, *args, **kwargs):
"""When calling a Function, make a new frame and run it."""
callargs = inspect.getcallargs(self._func, *args, **kwargs)
# Use callargs to provide a mapping of arguments: values to pass into the new
# frame.
frame = self._vm.make_frame(
self.func_code, callargs, self.func_globals, {}
)
return self._vm.run_frame(frame)
def make_cell(value):
“””Create a real Python closure and grab a cell.”””
Thanks to Alex Gaynor for help with this bit of twistiness.
fn = (lambda x: lambda: x)(value)
return fn.__closure__[0]
接着,回到`VirtualMachine`对象,我们对数据栈的操作也增加一些帮助方法。字节码操作的栈总是在当前帧的数据栈。这些帮助函数让我们的`POP_TOP`、`LOAD_FAST`以及其他操作栈的指令的实现可读性更高。
class VirtualMachine(object):
[… 删节 …]
# Data stack manipulation
def top(self):
return self.frame.stack[-1]
def pop(self):
return self.frame.stack.pop()
def push(self, *vals):
self.frame.stack.extend(vals)
def popn(self, n):
"""Pop a number of values from the value stack.
A list of `n` values is returned, the deepest value first.
"""
if n:
ret = self.frame.stack[-n:]
self.frame.stack[-n:] = []
return ret
else:
return []
在我们运行帧之前,我们还需两个方法。
第一个方法,`parse_byte_and_args` 以一个字节码为输入,先检查它是否有参数,如果有,就解析它的参数。这个方法同时也更新帧的`last_instruction`属性,它指向最后执行的指令。一条没有参数的指令只有一个字节长度,而有参数的字节有3个字节长。参数的意义依赖于指令是什么。比如,前面说过,指令`POP_JUMP_IF_FALSE`,它的参数指的是跳转目标。`BUILD_LIST`,它的参数是列表的个数。`LOAD_CONST`,它的参数是常量的索引。
一些指令用简单的数字作为参数。对于另一些,虚拟机需要一点努力去发现它含意。标准库中的`dis`模块中有一个备忘单,它解释什么参数有什么意思,这让我们的代码更加简洁。比如,列表`dis.hasname`告诉我们`LOAD_NAME`、 `IMPORT_NAME`、`LOAD_GLOBAL`,以及另外的 9 个指令的参数都有同样的意义:对于这些指令,它们的参数代表了代码对象中的名字列表的索引。
class VirtualMachine(object):
[… 删节 …]
def parse_byte_and_args(self):
f = self.frame
opoffset = f.last_instruction
byteCode = f.code_obj.co_code[opoffset]
f.last_instruction += 1
byte_name = dis.opname[byteCode]
if byteCode >= dis.HAVE_ARGUMENT:
# index into the bytecode
arg = f.code_obj.co_code[f.last_instruction:f.last_instruction+2]
f.last_instruction += 2 # advance the instruction pointer
arg_val = arg[0] + (arg[1] * 256)
if byteCode in dis.hasconst: # Look up a constant
arg = f.code_obj.co_consts[arg_val]
elif byteCode in dis.hasname: # Look up a name
arg = f.code_obj.co_names[arg_val]
elif byteCode in dis.haslocal: # Look up a local name
arg = f.code_obj.co_varnames[arg_val]
elif byteCode in dis.hasjrel: # Calculate a relative jump
arg = f.last_instruction + arg_val
else:
arg = arg_val
argument = [arg]
else:
argument = []
return byte_name, argument
下一个方法是`dispatch`,它查找给定的指令并执行相应的操作。在 CPython 中,这个分派函数用一个巨大的 switch 语句实现,有超过 1500 行的代码。幸运的是,我们用的是 Python,我们的代码会简洁的多。我们会为每一个字节码名字定义一个方法,然后用`getattr`来查找。就像我们前面的小解释器一样,如果一条指令叫做`FOO_BAR`,那么它对应的方法就是`byte_FOO_BAR`。现在,我们先把这些方法当做一个黑盒子。每个指令方法都会返回`None`或者一个字符串`why`,有些情况下虚拟机需要这个额外`why`信息。这些指令方法的返回值,仅作为解释器状态的内部指示,千万不要和执行帧的返回值相混淆。
class VirtualMachine(object):
[… 删节 …]
def dispatch(self, byte_name, argument):
""" Dispatch by bytename to the corresponding methods.
Exceptions are caught and set on the virtual machine."""
# When later unwinding the block stack,
# we need to keep track of why we are doing it.
why = None
try:
bytecode_fn = getattr(self, 'byte_%s' % byte_name, None)
if bytecode_fn is None:
if byte_name.startswith('UNARY_'):
self.unaryOperator(byte_name[6:])
elif byte_name.startswith('BINARY_'):
self.binaryOperator(byte_name[7:])
else:
raise VirtualMachineError(
"unsupported bytecode type: %s" % byte_name
)
else:
why = bytecode_fn(*argument)
except:
# deal with exceptions encountered while executing the op.
self.last_exception = sys.exc_info()[:2] + (None,)
why = 'exception'
return why
def run_frame(self, frame):
"""Run a frame until it returns (somehow).
Exceptions are raised, the return value is returned.
"""
self.push_frame(frame)
while True:
byte_name, arguments = self.parse_byte_and_args()
why = self.dispatch(byte_name, arguments)
# Deal with any block management we need to do
while why and frame.block_stack:
why = self.manage_block_stack(why)
if why:
break
self.pop_frame()
if why == 'exception':
exc, val, tb = self.last_exception
e = exc(val)
e.__traceback__ = tb
raise e
return self.return_value
#### `Block` 类
在我们完成每个字节码方法前,我们简单的讨论一下块。一个块被用于某种控制流,特别是异常处理和循环。它负责保证当操作完成后数据栈处于正确的状态。比如,在一个循环中,一个特殊的迭代器会存在栈中,当循环完成时它从栈中弹出。解释器需要检查循环仍在继续还是已经停止。
为了跟踪这些额外的信息,解释器设置了一个标志来指示它的状态。我们用一个变量`why`实现这个标志,它可以是`None`或者是下面几个字符串之一:`"continue"`、`"break"`、`"excption"`、`return`。它们指示对块栈和数据栈进行什么操作。回到我们迭代器的例子,如果块栈的栈顶是一个`loop`块,`why`的代码是`continue`,迭代器就应该保存在数据栈上,而如果`why`是`break`,迭代器就会被弹出。
块操作的细节比这个还要繁琐,我们不会花时间在这上面,但是有兴趣的读者值得仔细的看看。
Block = collections.namedtuple(“Block”, “type, handler, stack_height”)
class VirtualMachine(object):
[… 删节 …]
# Block stack manipulation
def push_block(self, b_type, handler=None):
level = len(self.frame.stack)
self.frame.block_stack.append(Block(b_type, handler, stack_height))
def pop_block(self):
return self.frame.block_stack.pop()
def unwind_block(self, block):
"""Unwind the values on the data stack corresponding to a given block."""
if block.type == 'except-handler':
# The exception itself is on the stack as type, value, and traceback.
offset = 3
else:
offset = 0
while len(self.frame.stack) > block.level + offset:
self.pop()
if block.type == 'except-handler':
traceback, value, exctype = self.popn(3)
self.last_exception = exctype, value, traceback
def manage_block_stack(self, why):
""" """
frame = self.frame
block = frame.block_stack[-1]
if block.type == 'loop' and why == 'continue':
self.jump(self.return_value)
why = None
return why
self.pop_block()
self.unwind_block(block)
if block.type == 'loop' and why == 'break':
why = None
self.jump(block.handler)
return why
if (block.type in ['setup-except', 'finally'] and why == 'exception'):
self.push_block('except-handler')
exctype, value, tb = self.last_exception
self.push(tb, value, exctype)
self.push(tb, value, exctype) # yes, twice
why = None
self.jump(block.handler)
return why
elif block.type == 'finally':
if why in ('return', 'continue'):
self.push(self.return_value)
self.push(why)
why = None
self.jump(block.handler)
return why
return why
### 指令
剩下了的就是完成那些指令方法了:`byte_LOAD_FAST`、`byte_BINARY_MODULO`等等。而这些指令的实现并不是很有趣,这里我们只展示了一小部分,完整的实现[在 GitHub 上](https://github.com/nedbat/byterun)。(这里包括的指令足够执行我们前面所述的所有代码了。)
class VirtualMachine(object):
[… 删节 …]
## Stack manipulation
def byte_LOAD_CONST(self, const):
self.push(const)
def byte_POP_TOP(self):
self.pop()
## Names
def byte_LOAD_NAME(self, name):
frame = self.frame
if name in frame.f_locals:
val = frame.f_locals[name]
elif name in frame.f_globals:
val = frame.f_globals[name]
elif name in frame.f_builtins:
val = frame.f_builtins[name]
else:
raise NameError("name '%s' is not defined" % name)
self.push(val)
def byte_STORE_NAME(self, name):
self.frame.f_locals[name] = self.pop()
def byte_LOAD_FAST(self, name):
if name in self.frame.f_locals:
val = self.frame.f_locals[name]
else:
raise UnboundLocalError(
"local variable '%s' referenced before assignment" % name
)
self.push(val)
def byte_STORE_FAST(self, name):
self.frame.f_locals[name] = self.pop()
def byte_LOAD_GLOBAL(self, name):
f = self.frame
if name in f.f_globals:
val = f.f_globals[name]
elif name in f.f_builtins:
val = f.f_builtins[name]
else:
raise NameError("global name '%s' is not defined" % name)
self.push(val)
## Operators
BINARY_OPERATORS = {
'POWER': pow,
'MULTIPLY': operator.mul,
'FLOOR_DIVIDE': operator.floordiv,
'TRUE_DIVIDE': operator.truediv,
'MODULO': operator.mod,
'ADD': operator.add,
'SUBTRACT': operator.sub,
'SUBSCR': operator.getitem,
'LSHIFT': operator.lshift,
'RSHIFT': operator.rshift,
'AND': operator.and_,
'XOR': operator.xor,
'OR': operator.or_,
}
def binaryOperator(self, op):
x, y = self.popn(2)
self.push(self.BINARY_OPERATORS[op](x, y))
COMPARE_OPERATORS = [
operator.lt,
operator.le,
operator.eq,
operator.ne,
operator.gt,
operator.ge,
lambda x, y: x in y,
lambda x, y: x not in y,
lambda x, y: x is y,
lambda x, y: x is not y,
lambda x, y: issubclass(x, Exception) and issubclass(x, y),
]
def byte_COMPARE_OP(self, opnum):
x, y = self.popn(2)
self.push(self.COMPARE_OPERATORS[opnum](x, y))
## Attributes and indexing
def byte_LOAD_ATTR(self, attr):
obj = self.pop()
val = getattr(obj, attr)
self.push(val)
def byte_STORE_ATTR(self, name):
val, obj = self.popn(2)
setattr(obj, name, val)
## Building
def byte_BUILD_LIST(self, count):
elts = self.popn(count)
self.push(elts)
def byte_BUILD_MAP(self, size):
self.push({})
def byte_STORE_MAP(self):
the_map, val, key = self.popn(3)
the_map[key] = val
self.push(the_map)
def byte_LIST_APPEND(self, count):
val = self.pop()
the_list = self.frame.stack[-count] # peek
the_list.append(val)
## Jumps
def byte_JUMP_FORWARD(self, jump):
self.jump(jump)
def byte_JUMP_ABSOLUTE(self, jump):
self.jump(jump)
def byte_POP_JUMP_IF_TRUE(self, jump):
val = self.pop()
if val:
self.jump(jump)
def byte_POP_JUMP_IF_FALSE(self, jump):
val = self.pop()
if not val:
self.jump(jump)
## Blocks
def byte_SETUP_LOOP(self, dest):
self.push_block('loop', dest)
def byte_GET_ITER(self):
self.push(iter(self.pop()))
def byte_FOR_ITER(self, jump):
iterobj = self.top()
try:
v = next(iterobj)
self.push(v)
except StopIteration:
self.pop()
self.jump(jump)
def byte_BREAK_LOOP(self):
return 'break'
def byte_POP_BLOCK(self):
self.pop_block()
## Functions
def byte_MAKE_FUNCTION(self, argc):
name = self.pop()
code = self.pop()
defaults = self.popn(argc)
globs = self.frame.f_globals
fn = Function(name, code, globs, defaults, None, self)
self.push(fn)
def byte_CALL_FUNCTION(self, arg):
lenKw, lenPos = divmod(arg, 256) # KWargs not supported here
posargs = self.popn(lenPos)
func = self.pop()
frame = self.frame
retval = func(*posargs)
self.push(retval)
def byte_RETURN_VALUE(self):
self.return_value = self.pop()
return "return"
### 动态类型:编译器不知道它是什么
你可能听过 Python 是一种动态语言 —— 它是动态类型的。在我们建造解释器的过程中,已经透露出这样的信息。
动态的一个意思是很多工作是在运行时完成的。前面我们看到 Python 的编译器没有很多关于代码真正做什么的信息。举个例子,考虑下面这个简单的函数`mod`。它取两个参数,返回它们的模运算值。从它的字节码中,我们看到变量`a`和`b`首先被加载,然后字节码`BINAY_MODULO`完成这个模运算。
def mod(a, b):
… return a % b
dis.dis(mod)
2 0 LOAD_FAST 0 (a)
3 LOAD_FAST 1 (b)
6 BINARY_MODULO
7 RETURN_VALUE
mod(19, 5)
4
计算 19 % 5 得4,—— 一点也不奇怪。如果我们用不同类的参数呢?
mod(“by%sde”, “teco”)
‘bytecode’
刚才发生了什么?你可能在其它地方见过这样的语法,格式化字符串。
print(“by%sde” % “teco”)
bytecode
用符号`%`去格式化字符串会调用字节码`BUNARY_MODULO`。它取栈顶的两个值求模,不管这两个值是字符串、数字或是你自己定义的类的实例。字节码在函数编译时生成(或者说,函数定义时)相同的字节码会用于不同类的参数。
Python 的编译器关于字节码的功能知道的很少,而取决于解释器来决定`BINAYR_MODULO`应用于什么类型的对象并完成正确的操作。这就是为什么 Python 被描述为<ruby> 动态类型 <rp> ( </rp> <rt> dynamically typed </rt> <rp> ) </rp></ruby>:直到运行前你不必知道这个函数参数的类型。相反,在一个静态类型语言中,程序员需要告诉编译器参数的类型是什么(或者编译器自己推断出参数的类型。)
编译器的无知是优化 Python 的一个挑战 —— 只看字节码,而不真正运行它,你就不知道每条字节码在干什么!你可以定义一个类,实现`__mod__`方法,当你对这个类的实例使用`%`时,Python 就会自动调用这个方法。所以,`BINARY_MODULO`其实可以运行任何代码。
看看下面的代码,第一个`a % b`看起来没有用。
def mod(a,b):
a % b
return a %b
不幸的是,对这段代码进行静态分析 —— 不运行它 —— 不能确定第一个`a % b`没有做任何事。用 `%`调用`__mod__`可能会写一个文件,或是和程序的其他部分交互,或者其他任何可以在 Python 中完成的事。很难优化一个你不知道它会做什么的函数。在 Russell Power 和 Alex Rubinsteyn 的优秀论文中写道,“我们可以用多快的速度解释 Python?”,他们说,“在普遍缺乏类型信息下,每条指令必须被看作一个`INVOKE_ARBITRARY_METHOD`。”
### 总结
Byterun 是一个比 CPython 容易理解的简洁的 Python 解释器。Byterun 复制了 CPython 的主要结构:一个基于栈的解释器对称之为字节码的指令集进行操作,它们顺序执行或在指令间跳转,向栈中压入和从中弹出数据。解释器随着函数和生成器的调用和返回,动态的创建、销毁帧,并在帧之间跳转。Byterun 也有着和真正解释器一样的限制:因为 Python 使用动态类型,解释器必须在运行时决定指令的正确行为。
我鼓励你去反汇编你的程序,然后用 Byterun 来运行。你很快会发现这个缩短版的 Byterun 所没有实现的指令。完整的实现在 <https://github.com/nedbat/byterun>,或者你可以仔细阅读真正的 CPython 解释器`ceval.c`,你也可以实现自己的解释器!
### 致谢
感谢 Ned Batchelder 发起这个项目并引导我的贡献,感谢 Michael Arntzenius 帮助调试代码和这篇文章的修订,感谢 Leta Montopoli 的修订,以及感谢整个 Recurse Center 社区的支持和鼓励。所有的不足全是我自己没搞好。
---
via: <http://aosabook.org/en/500L/a-python-interpreter-written-in-python.html>
作者: Allison Kaptur 译者:[qingyunha](https://github.com/qingyunha) 校对:[wxy](https://github.com/wxy)
本文由 [LCTT](https://github.com/LCTT/TranslateProject) 原创翻译,[Linux中国](http://linux.cn/) 荣誉推出
发表回复