先来看一段程序:
import sys
import traceback
def foo():
print 'calling bar'
bar()
try:
raise IOError
except IOError:
print 'calling bar in exception handler'
bar()
# All exceptions cleared, how to tell if there is exception or not
print 'foo', sys.exc_type, traceback.format_exc()
def bar():
print 'bar1', sys.exc_type, traceback.format_exc()
try:
zoo()
except KeyError:
pass
print 'bar2', sys.exc_type, traceback.format_exc()
def zoo():
print 'happy in zoo'
raise KeyError
print 'bye'
foo()
输出:
20:56 jaime@oldtown ajourneyintopythonsource (master)$ python a.py
calling bar
bar1 None None
happy in zoo
bar2 <type 'exceptions.KeyError'> Traceback (most recent call last):
File "a.py", line 18, in bar
zoo()
File "a.py", line 25, in zoo
raise KeyError
KeyError
calling bar in exception handler
bar1 <type 'exceptions.IOError'> Traceback (most recent call last):
File "a.py", line 8, in foo
raise IOError
IOError
happy in zoo
bar2 <type 'exceptions.KeyError'> Traceback (most recent call last):
File "a.py", line 18, in bar
zoo()
File "a.py", line 25, in zoo
raise KeyError
KeyError
foo <type 'exceptions.IOError'> Traceback (most recent call last):
File "a.py", line 8, in foo
raise IOError
IOError
20:56 jaime@oldtown ajourneyintopythonsource (master)$
本节主要关注几个问题:
从输出看来,sys.exc_type保存的是当前frame的上一个异常。bar中的KeyError异常,并不会覆盖调用者foo 看到的sys.exc_type IOError。sys.exc_type并不是全局的,而是和frame相关的,当bar执行完毕之后返回foo之后, sys.exc_type被重新恢复成foo原来的IOError了。
而从bar1的输出可以知道,如果当前frame还没有抛出过异常,此时sys.exc_type和上一个frame即foo中的值一样。
带着这几个问题,让我们开始深入代码。
为了理解exception的工作原理,要分清以下三个变量,Python/ceval.c中解释的比较详细:
tstate->exc_type, sys.exc_type 线程运行过程中的异常状态,在每次发生异常时都被设置。但是为了便于 编写代码,它们被设计成和frame相关的。如果一个frame中改变了这些值,在该frame退出时,VM需要恢复它们的 状态。
frame->f_exc_type 恢复机制的关键所在,在frame发生第一个异常时,保存当时的状态,即tstate->exc_type值。 在frame退出时,如果保存过,则用该值进行恢复。在set_exc_info中仅被设置一次,如果后续该frame再多次调用 set_exc_info,已经不为null了,所以不会被新的值冲掉。
当frame被释放时,如果frame->f_exc_type不为null,则说明发生过异常,这时会调用reset_exc_info,将此前 保存的frame->f_exc_type恢复为tstate->exc_type和sys.exc_type,相当于这两个值都恢复到了该frame发生 第一次异常前的状态。这可以解释sys.exc_type在bar退出后,为什么又变回IOError了。
如果frame中没有发生过异常,没有调用过set_exc_info,frame->f_exc_type会一直是null, tstate->exc_type, sys.exc_type保持不变,当frame释放时,也就不需要恢复,不需要调用reset_exc_info, 所以后者有frame->f_exc_type!=null的assert。如果连着多层frames都没有发生过异常,则sys.exc_type会一直 保留为上溯到发生过异常那层frame的值,如果所有的上层frame栈都没有异常,则为None。
当frame中有多次异常时,每次调用set_exc_info都将tstate->exc_type, sys.exc_type设为最新值,这就是 为什么说它们是反映运行时异常状态的。
在python代码中,可以使用raise语句来显式抛出异常,使用说明 http://docs.python.org/2/reference/simple_stmts.html#the-raise-statement
python vm在执行指令时,或者python代码调用的c函数,也会抛出异常,通过调用C/API实现。
raise语句被翻译为 RAISE_VARARGS 指令,其解析代码如下 Python/ceval.c:
case RAISE_VARARGS:
u = v = w = NULL;
switch (oparg) {
case 3:
u = POP(); /* traceback */
/* Fallthrough */
case 2:
v = POP(); /* value */
/* Fallthrough */
case 1:
w = POP(); /* exc */
case 0: /* Fallthrough */
why = do_raise(w, v, u);
break;
default:
PyErr_SetString(PyExc_SystemError,
"bad RAISE_VARARGS oparg");
why = WHY_EXCEPTION;
break;
}
break;
利用case的默认fallthrough,来处理不同的参数情况,相当精巧。如果参数错误,则抛出异常,这里的PyErr_SetString 是python vm自身抛出异常的一个例子。
do_raise:
/* Logic for the raise statement (too complicated for inlining).
This *consumes* a reference count to each of its arguments. */
static enum why_code
do_raise(PyObject *type, PyObject *value, PyObject *tb)
{
if (type == NULL) {
/* Reraise */
PyThreadState *tstate = PyThreadState_GET();
type = tstate->exc_type == NULL ? Py_None : tstate->exc_type;
value = tstate->exc_value;
tb = tstate->exc_traceback;
Py_XINCREF(type);
Py_XINCREF(value);
Py_XINCREF(tb);
}
/* We support the following forms of raise:
raise <class>, <classinstance>
raise <class>, <argument tuple>
raise <class>, None
raise <class>, <argument>
raise <classinstance>, None
raise <string>, <object>
raise <string>, None
/* 一大堆raise的复杂逻辑,将各种参数类型最后统一成type, value, tb*/
...
PyErr_Restore(type, value, tb);
if (tb == NULL)
return WHY_EXCEPTION;
else
return WHY_RERAISE;
raise_error:
Py_XDECREF(value);
Py_XDECREF(type);
Py_XDECREF(tb);
return WHY_EXCEPTION;
}
do_raise的返回值只有WHY_EXCEPTION, WHY_RERAISE,vm通过这个why,就知道有异常发生了。
具体设置异常信息在 PyErr_Restore Python/errors.c:
void
PyErr_Restore(PyObject *type, PyObject *value, PyObject *traceback)
{
PyThreadState *tstate = PyThreadState_GET();
PyObject *oldtype, *oldvalue, *oldtraceback;
if (traceback != NULL && !PyTraceBack_Check(traceback)) {
/* XXX Should never happen -- fatal error instead? */
/* Well, it could be None. */
Py_DECREF(traceback);
traceback = NULL;
}
/* Save these in locals to safeguard against recursive
invocation through Py_XDECREF */
oldtype = tstate->curexc_type;
oldvalue = tstate->curexc_value;
oldtraceback = tstate->curexc_traceback;
tstate->curexc_type = type;
tstate->curexc_value = value;
tstate->curexc_traceback = traceback;
Py_XDECREF(oldtype);
Py_XDECREF(oldvalue);
Py_XDECREF(oldtraceback);
}
设置了tstate->curexc_type,而不是tstate->exc_type,才明白,原来当前线程的异常信息都是存在 前者里的,后者tstate->exc_type, 和sys.exc_type,只是为了方便在except的handler中拿到本frame的 异常信息之用。不然如果sys.exc_type不是frame相关的,如果handler自身也有异常,就会冲掉原来的异常。 当然除非你用except Exception, e的方法先保存下这个e。
由于任何时候都只有一个异常在被处理: 1. 查找handler中(只能有一个查找),2. 执行handler中(相当于已被处理, 如果再有异常则goto 1),所以用一个变量tstate->curexc_type来存储当前异常信息,是合理的。
相应读取当前异常信息的PyErr_Occurred就简单多了:
PyObject *
PyErr_Occurred(void)
{
PyThreadState *tstate = PyThreadState_GET();
return tstate->curexc_type;
}
判断是否有异常发生,简言之,如果当前线程的tstate->curexc_type 不是NULL,则python就认为有什么地方抛出异常了。
查看当前异常是否匹配某个异常exc PyErr_ExceptionMatches
清空异常信息 PyErr_Clear,将tstate->curexc_type设为null, 让系统认为没有异常发生。
NoMemory的异常比较有意思:
PyObject *
PyErr_NoMemory(void)
{
if (PyErr_ExceptionMatches(PyExc_MemoryError))
/* already current */
return NULL;
/* raise the pre-allocated instance if it still exists */
if (PyExc_MemoryErrorInst)
PyErr_SetObject(PyExc_MemoryError, PyExc_MemoryErrorInst);
else
/* this will probably fail since there's no memory and hee,
hee, we have to instantiate this class
*/
//已经没有内存了,所以只有抛出一个class了事
PyErr_SetNone(PyExc_MemoryError);
return NULL;
}
现在的情况是,执行到了raise语句,或者其他指令,函数执行过程中,发生了异常,异常信息 已经被记录到tstate->curexc_type,但是vm的主循环怎么感知到这个异常并启动处理机制呢?
RAISE_VARARGS 设置了why之后,就break掉指令解析switch结构,跳转到on_error label了。
why:
/* Status code for main loop (reason for stack unwind) */
enum why_code {
WHY_NOT = 0x0001, /* No error */
WHY_EXCEPTION = 0x0002, /* Exception occurred */
WHY_RERAISE = 0x0004, /* Exception re-raised by 'finally' */
WHY_RETURN = 0x0008, /* 'return' statement */
WHY_BREAK = 0x0010, /* 'break' statement */
WHY_CONTINUE = 0x0020, /* 'continue' statement */
WHY_YIELD = 0x0040 /* 'yield' operator */
};
指令善后处理 on_error:
on_error:
READ_TIMESTAMP(inst1);
/* Quickly continue if no error occurred */
if (why == WHY_NOT) {
if (err == 0 && x != NULL) {
// 调用PyErr_Occurred再检测一次,看是否真的没有错误
// CHECKEXC 主要是为了防止vm自身实现错误导致的意外情况
// Q: 本次exception处理完之后,是什么时候调用PyErr_Clear的?
#ifdef CHECKEXC
/* This check is expensive! */
if (PyErr_Occurred())
fprintf(stderr,
"XXX undetected error\n");
else {
#endif
READ_TIMESTAMP(loop1);
continue; /* Normal, fast path */
#ifdef CHECKEXC
}
#endif
}
// err 不为0,或者指令运算结果x为NULL
// 这应该是很多c扩展模块里面返回NULL就会抛出异常的原理
why = WHY_EXCEPTION;
x = Py_None;
err = 0;
}
/* Double-check exception status */
if (why == WHY_EXCEPTION || why == WHY_RERAISE) {
if (!PyErr_Occurred()) {
// 设置了why,但是却没有异常信息
PyErr_SetString(PyExc_SystemError,
"error return without exception set");
why = WHY_EXCEPTION;
}
}
#ifdef CHECKEXC
else {
// 没有设置why,却有异常信息
...
}
#endif
/* Log traceback info if this is a real exception */
...
/* For the rest, treat WHY_RERAISE as WHY_EXCEPTION */
if (why == WHY_RERAISE)
why = WHY_EXCEPTION;
好的,VM对why的处理到此结束,下面就开始寻找except handler了,对堆栈进行复杂的操作, 不停的回溯block, frame,直到遇到一个except语句为止。
异常!异常!Don’t panic,我们有except。
try..except..finally对应的vm指令:
10:07 jaime@oldtown ajourneyintopythonsource (master)$ cat e.py
def foo():
raise IOError
try:
a = 1
foo()
except IOError:
print 'IOError'
finally:
print 'Finally'
a = 2
10:26 jaime@oldtown ajourneyintopythonsource (master)$
10:07 jaime@oldtown ajourneyintopythonsource (master)$ python -m dis e.py
3 0 LOAD_CONST 0 (<code object foo at 0x1006f56b0, file "e.py", line 3>)
3 MAKE_FUNCTION 0
6 STORE_NAME 0 (foo)
6 9 SETUP_FINALLY 46 (to 58)
12 SETUP_EXCEPT 17 (to 32)
7 15 LOAD_CONST 1 (1)
18 STORE_NAME 1 (a)
8 21 LOAD_NAME 0 (foo)
24 CALL_FUNCTION 0
27 POP_TOP
28 POP_BLOCK
29 JUMP_FORWARD 22 (to 54)
9 >> 32 DUP_TOP
33 LOAD_NAME 2 (IOError)
36 COMPARE_OP 10 (exception match)
39 POP_JUMP_IF_FALSE 53
42 POP_TOP
43 POP_TOP
44 POP_TOP
10 45 LOAD_CONST 2 ('IOError')
48 PRINT_ITEM
49 PRINT_NEWLINE
50 JUMP_FORWARD 1 (to 54)
>> 53 END_FINALLY
>> 54 POP_BLOCK
55 LOAD_CONST 3 (None)
12 >> 58 LOAD_CONST 4 ('Finally')
61 PRINT_ITEM
62 PRINT_NEWLINE
63 END_FINALLY
14 64 LOAD_CONST 5 (2)
67 STORE_NAME 1 (a)
70 LOAD_CONST 3 (None)
73 RETURN_VALUE
SETUP_EXCEPT,SETUP_FINALLY:
case SETUP_EXCEPT:
case SETUP_FINALLY:
/* NOTE: If you add any new block-setup opcodes that
are not try/except/finally handlers, you may need
to update the PyGen_NeedsFinalizing() function.
*/
PyFrame_BlockSetup(f, opcode, INSTR_OFFSET() + oparg,
STACK_LEVEL());
continue;
可见一个try..except..finally,实际上生成了两级block,finally block套着except block。最内层block的handler由SETUP_EXCEPT指令设置为32,外层block的handler由SETUP_FINALLY 指令设置为58。
指令24 CALL_FUNCTION调用函数foo,如果没有异常,则指令28 pop掉内层block,指令29跳转到54, pop掉外层的finally block,开始执行finally部分。
如果有异常,则指令24返回时,确切的说是foo函数的raise语句执行完之后,python vm就会启动异常处理机制, 找到内层block的handler的except地址32,调整堆栈,跳转到这里执行。注意该跳转是vm自动完成的, 并没有对应的python指令,这一部分的介绍见下节。
开始执行内层block的handler 32时,由三种情况需要处理:
不管怎样,都要保证finally部分被执行,我们来看下python是怎么做到的。
对于第一种情况,异常匹配到了,而且执行的时候很顺利,没有再发生任何异常,则指令50又跳转到了54, 和上面没有发生异常的情形一样,去执行finally了。
第二种情况,处理异常的时候又发生了异常,则handler的执行会被强行终止,python vm会重新启动异常查找机制,此时回溯到的外层block恰好是finally的block,其handler为指令58,同样保证了finally的执行。
第三种情况,异常不匹配,但是注意我们事实上已经进入内层block的handler 指令32了, COMPARE_OP比较是否是我们要匹配的异常,不匹配则POP_JUMP_IF_FALSE指令39跳转到指令53,执行END_FINALLY:
case END_FINALLY:
v = POP();
if (PyInt_Check(v)) {
...
}
else if (PyExceptionClass_Check(v) ||
PyString_Check(v)) {
w = POP();
u = POP();
PyErr_Restore(v, w, u);
why = WHY_RERAISE;
break;
}
...
Py_DECREF(v);
break;
END_FINALLY做了什么呢?END_FINALLY 又恢复了异常信息,然后就像RAISE_VARARGS一样, break掉大switch语句,异常的进一步处理又交还给了python vm。至此,第三种情况归约为了第二种情况。
Note
要注意区分大switch case的break和continue语句,continue继续执行下一条指令,是在同一个block内。 而break则会跳转到fast_block_end,表示一个block要结束了,这通常表示控制流的变换,如RAISE_VARARGS指令 结束一个block,往上回溯找handler,END_FINALLY指令结束except块或finally块
进一步分析见 Python VM Loop
如果try有多个except语句,也和此类似,实际上内层block的handler只有一个,多个except只是 对应于该handler里的多个条件判断。SETUP_EXCEPT指令只记录第一个except的跳转地址。
如果try没有except语句,那么就不会有SETUP_EXCEPT指令,而只有一个SETUP_FINALLY指令,也就是说只有一层block。
只有在一个block结束的时候,不管是正常结束,还是发生了异常,python vm才会进行异常处理, 查找是否有可用handler:
/* Unwind stacks if a (pseudo) exception occurred */
/* block结束处理。Q: 有那些情况会生成一个新的block?
exception是一种特殊的block,先在本frame的所有blocks里面寻找handler,
如果没有找到,则退出本frame,到上一层frame里面继续寻找。frame的退出通常意味着函数的返回,
即CALL_FUNCTION指令的返回,在上一层frame里面,又会继续走到这段代码。
*/
fast_block_end:
// 逐层向上回溯block。Q: Block, Frame有什么区别?
while (why != WHY_NOT && f->f_iblock > 0) {
// 获得上一层block,循环迭代器
PyTryBlock *b = PyFrame_BlockPop(f);
assert(why != WHY_YIELD);
// continue语句
if (b->b_type == SETUP_LOOP && why == WHY_CONTINUE) {
/* For a continue inside a try block,
don't pop the block for the loop. */
PyFrame_BlockSetup(f, b->b_type, b->b_handler,
b->b_level);
why = WHY_NOT;
JUMPTO(PyInt_AS_LONG(retval));
Py_DECREF(retval);
break;
}
// 清除堆栈
while (STACK_LEVEL() > b->b_level) {
v = POP();
Py_XDECREF(v);
}
// break语句
if (b->b_type == SETUP_LOOP && why == WHY_BREAK) {
why = WHY_NOT;
JUMPTO(b->b_handler);
break;
}
// Good, 找到最近一层try..except..block块了
if (b->b_type == SETUP_FINALLY || // 对应于没有except只有finally的情况,finally必须得到执行
(b->b_type == SETUP_EXCEPT &&
why == WHY_EXCEPTION)) {
if (why == WHY_EXCEPTION) {
PyObject *exc, *val, *tb;
PyErr_Fetch(&exc, &val, &tb);
if (val == NULL) {
val = Py_None;
Py_INCREF(val);
}
/* Make the raw exception data
available to the handler,
so a program can emulate the
Python main loop. Don't do
this for 'finally'. */
// 设置 tstate->exc_type, sys.exc_type
if (b->b_type == SETUP_EXCEPT) {
PyErr_NormalizeException(
&exc, &val, &tb);
set_exc_info(tstate,
exc, val, tb);
}
// 把异常信息压入堆栈,为执行except handler做准备
// 对应于上面反汇编后 42, 43, 44 三个POP指令
if (tb == NULL) {
Py_INCREF(Py_None);
PUSH(Py_None);
} else
PUSH(tb);
PUSH(val);
PUSH(exc);
}
else {
// 函数返回的情况
if (why & (WHY_RETURN | WHY_CONTINUE))
PUSH(retval);
v = PyInt_FromLong((long)why);
PUSH(v);
}
/* 跳转到block的handler,即except handler
注意why已经变为WHY_NOT了,意味着异常处理handler已经找到了
至于是否匹配则交由python层去解决。还记得上面吗?如果没匹配,会执行
END_FINALLY重新raise异常*/
why = WHY_NOT;
JUMPTO(b->b_handler);
break;
}
} /* unwind stack */
/* End the loop if we still have an error (or return) */
// 在本frame里面没有找到handler,结束loop,退出本frame执行
if (why != WHY_NOT)
break;
READ_TIMESTAMP(loop1);
} /* main loop */ //指令解析for loop
...
// frame结束的时候,恢复tstate->exc_type, sys.exc_type
if (tstate->frame->f_exc_type != NULL)
reset_exc_info(tstate);
else {
assert(tstate->frame->f_exc_value == NULL);
assert(tstate->frame->f_exc_traceback == NULL);
}
/* pop frame */
exit_eval_frame:
Py_LeaveRecursiveCall();
tstate->frame = f->f_back;
return retval;
}
至此,你也许还有一个疑问,tstate->curexc_type是不是没有清掉?进入handler之后,已经恢复了正常的 指令执行流程,下一条指令执行的时候,PyErr_Occurred肯定不能返回true,这个问题不解决,就像有个东西 一直在hunting你,让你坐立不安。
看看 PyErr_Fetch 吧,Python/errors.c:
void
PyErr_Fetch(PyObject **p_type, PyObject **p_value, PyObject **p_traceback)
{
PyThreadState *tstate = PyThreadState_GET();
*p_type = tstate->curexc_type;
*p_value = tstate->curexc_value;
*p_traceback = tstate->curexc_traceback;
tstate->curexc_type = NULL;
tstate->curexc_value = NULL;
tstate->curexc_traceback = NULL;
}
世界安静了,一切都在恰当的位置。
Notes:
socket是python网络通信的主要模块,它实际上只是_socket的一个简单wrap。通过分析_socket模块的 源码 Modules/socketmodule.c,可以加深对python socket工作原理的理解。
这个文件有5000多行,如果我们只想要最基本的网络功能,建立连接,接收,发送tcp数据,别的 如ipv6,gethostby*, inet_*等辅助性函数都不需要,也不用考虑平台可移植性,这样一个 极简的可以工作socket模块会是什么样子呢?
注:虽然号称自制,但本例代码几乎全部copy自socketmodule.c
完整代码: https://github.com/nkchenz/cpythonjourney/blob/sockmini/Modules/socketmini.c
我们的新模块为_sockmini,将使用一个新的类型sockmini来标识自定义的socket对象, 对应关系: _socket -> _sockmini, _socket.socket -> _sockmini.sockmini
新建文件 Modules/socketmini.c:
static PyMethodDef sockmini_methods[] = {
{"setdefaulttimeout", socket_setdefaulttimeout,
METH_O, setdefaulttimeout_doc},
{NULL} /* Sentinel */
};
PyMODINIT_FUNC
init_sockmini(void)
{
PyObject* m;
m = Py_InitModule3("_sockmini", sockmini_methods, "sockmini module");
if(m == NULL)
return;
/* The general exception type */
socket_error = PyErr_NewException("_sockmini.error",
PyExc_IOError, NULL);
if(socket_error == NULL)
return;
Py_INCREF(socket_error);
PyModule_AddObject(m, "error", socket_error);
/* Add a new type to module */
Py_TYPE(&sockmini_type) = &PyType_Type;
Py_INCREF((PyObject *)&sockmini_type);
if (PyModule_AddObject(m, "sockmini",
(PyObject *)&sockmini_type) != 0)
return;
}
_sockmini模块只有一个模块级方法socket_setdefaulttimeout,成员error类似于socket.error,用来设置异常, 另一个成员sockmini就是我们的新socket对象。
sockmini和socket.socket类型使用同样的存储结构PySocketSockObject,这样很多函数参数的类型 都可以复用:
static PyMethodDef sock_methods[] = {
{"connect", (PyCFunction)sock_connect, METH_O,
connect_doc},
{"close", (PyCFunction)sock_close, METH_NOARGS,
close_doc},
{"recv", (PyCFunction)sock_recv, METH_VARARGS,
recv_doc},
{"send", (PyCFunction)sock_send, METH_VARARGS,
send_doc},
{NULL, NULL} /* sentinel */
};
/* Type object for socket objects. */
static PyTypeObject sockmini_type = {
PyVarObject_HEAD_INIT(0, 0) /* Must fill in type value later */
"_sockmini.sockmini", /* tp_name */
sizeof(PySocketSockObject), /* tp_basicsize */
0, /* tp_itemsize */
(destructor)sock_dealloc, /* tp_dealloc */
...
PyObject_GenericGetAttr, /* tp_getattro */
...
sock_methods, /* tp_methods */
...
sock_initobj, /* tp_init */
PyType_GenericAlloc, /* tp_alloc */
PyType_GenericNew, /* tp_new */
PyObject_Del, /* tp_free */
};
sockmini只有四个methods: connect,recv,send,close,底层的c struct是PySocketSockObject, 初始化函数 sock_initobj, 释放时调用sock_dealloc。
static int
sock_initobj(PyObject *self)
{
PySocketSockObject *s = (PySocketSockObject *)self;
int fd;
int family = AF_INET, type = SOCK_STREAM, proto = 0;
Py_BEGIN_ALLOW_THREADS
fd = socket(family, type, proto);
Py_END_ALLOW_THREADS
if (fd < 0) {
PyErr_SetString(socket_error, "Failed to create socket");
return -1;
}
s->sock_fd = fd;
s->sock_family = family;
s->sock_type = type;
s->sock_proto = proto;
/* Notes: Be carefull about the concept of timeout here
*
* It only measures the time when a socket becomes ready to read or write, not the time
* took to send or read your data, alas, it's the max idle time spent on waiting,
* not the real busy io time
* */
s->sock_timeout = defaulttimeout;
/* Set to non blocking if timeout is not negative */
if (s->sock_timeout >= 0.0)
set_blocking(s, 0);
return 0;
}
family,type,proto使用硬编码,仅支持tcp stream。创建sockmini对象时,默认使用defaulttimeout 全局变量,该变量可以通过模块函数 _sockmini.setdefaulttimeout 设置。默认值为-1,表示使用blocking fd。
sock_timeout的意义:
See http://docs.python.org/2/library/socket.html#socket.socket.settimeout
如果sockmini对象的超时时间>0,则要使用nonblocking的socket fd,这样底层的connect,send, recv等c函数才不会一直等待,timeout才有意义。注意,这里的timeout是等待IO变成可用的时间, 而不是实际执行IO的时间,后面可以详细看到。
sock_dealloc 函数并没有什么修改。
_socket模块的getsockaddrarg非常复杂,因为要解析各种各样的协议,而我们的_sockmini只用关心 tcp,就简单的多了:
static int
getsockaddrarg(PySocketSockObject *s, PyObject *args,
struct sockaddr *addr_ret, int *len_ret)
{
struct sockaddr_in* addr;
char *host;
int port, result;
struct addrinfo hints, *res;
size_t addr_ret_size;
if (!PyArg_ParseTuple(args, "si:connect", &host, &port))
return 0;
addr=(struct sockaddr_in*)addr_ret;
/* Getaddrinfo */
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_INET;
Py_BEGIN_ALLOW_THREADS
result = getaddrinfo(host, NULL, &hints, &res);
Py_END_ALLOW_THREADS
if (result) {
PyErr_SetString(socket_error, "Failed to getaddrinfo ");
return 0;
}
addr_ret_size = sizeof(*addr);
if (res->ai_addrlen < addr_ret_size)
addr_ret_size = res->ai_addrlen;
memcpy((char *) addr, res->ai_addr, addr_ret_size);
freeaddrinfo(res);
addr->sin_family = AF_INET;
addr->sin_port = htons((short)port);
*len_ret = sizeof *addr;
return 1;
}
/* s.connect(sockaddr) method */
static PyObject *
sock_connect(PySocketSockObject *s, PyObject *args)
{
sock_addr_t addrbuf;
int addrlen;
int res, timeout = 0;
if (!getsockaddrarg(s, args, SAS2SA(&addrbuf), &addrlen))
return NULL;
Py_BEGIN_ALLOW_THREADS
res = connect(s->sock_fd, SAS2SA(&addrbuf), addrlen);
if (s->sock_timeout > 0.0) {
// If timeout is given, use poll to check whether it's ready
if (res < 0 && errno == EINPROGRESS) {
timeout = poll_check(s, 1);
if (timeout == 0) {
/* Bug #1019808: in case of an EINPROGRESS,
use getsockopt(SO_ERROR) to get the real
error. */
socklen_t res_size = sizeof res;
(void)getsockopt(s->sock_fd, SOL_SOCKET,
SO_ERROR, &res, &res_size);
if (res == EISCONN)
res = 0;
errno = res;
}
else if (timeout == -1) {
res = errno; /* had error */
}
}
}
Py_END_ALLOW_THREADS
if (timeout == 1){
PyErr_SetString(socket_error, "timed out");
return NULL;
}
if (res != 0)
return PyErr_SetFromErrno(socket_error);
Py_INCREF(Py_None);
return Py_None;
}
connect的逻辑是这样的:
sock_close 没有修改。
poll_check 超时检测,查看fd是否就绪。返回1则表示等待超时,-1错误发生,0 IO就绪或其他。
如果sock对象不处于timeout mode,即sock_timeout<=0.0,则无需检测超时。不执行任何操作, 立即返回0。只有timeout>0.0时,才会调用系统的poll,等待IO事件发生,该函数返回之后, 调用者就可以立即操作non-blocking的fd了。
int poll_check(PySocketSockObject *s, int writing)
{
int n;
struct pollfd pollfd;
int timeout;
// If in blocking mode, do nothing
if (s->sock_timeout <= 0.0)
return 0;
pollfd.fd = s->sock_fd;
pollfd.events = writing ? POLLOUT : POLLIN;
/* s->sock_timeout is in seconds, timeout in ms */
timeout = (int)(s->sock_timeout * 1000 + 0.5);
n = poll(&pollfd, 1, timeout);
/* Returns 1 on timeout, -1 on error, 0 otherwise. */
if (n < 0)
return -1;
if (n == 0){
return 1;
}
return 0;
}
有了poll_check这个利器之后,真正的发送,接收函数反而比较简单。
sock_send 在发送数据前调用poll_check,非timeout mode下,poll_check什么也不做,一切交由 后续的send函数处理,blocking的fd有可能可能,non-blocking的fd则不会等待,符合上层的语义。
在timeout mode下,timeout>0.0,则poll_check最多等待timeout时间后返回。如果没超时也没出错 则为io就绪,后续send会一次发送尽可能多的数据,因为这是non-blocking fd, send不会等待。 timeout的实现借助non-blocking fd得以完成。需要注意,send发送数据的耗时并没有计算在timeout里, timeout的意义仅限于IO等待超时。
当底层的send返回时,进行错误检查,用PyErr_SetFromErrno根据errno设置了合理的异常。
sock_recv 函数的逻辑与此类似。
static PyObject *
sock_send(PySocketSockObject *s, PyObject *args)
{
char *buf;
int len, n = -1, flags = 0, timeout;
Py_buffer pbuf;
if (!PyArg_ParseTuple(args, "s*|i:send", &pbuf, &flags))
return NULL;
buf = pbuf.buf;
len = pbuf.len;
Py_BEGIN_ALLOW_THREADS
timeout = poll_check(s, 1);
if (!timeout) // no error and timeout
n = send(s->sock_fd, buf, len, flags);
Py_END_ALLOW_THREADS
PyBuffer_Release(&pbuf);
if (timeout == 1){
PyErr_SetString(socket_error, "timed out");
return NULL;
}
if (n < 0)
return PyErr_SetFromErrno(socket_error);
return PyInt_FromLong((long)n);
}
static PyObject *
sock_recv(PySocketSockObject *s, PyObject *args)
{
int recvlen, flags = 0, timeout;
ssize_t outlen;
PyObject *buf;
if (!PyArg_ParseTuple(args, "i|i:recv", &recvlen, &flags))
return NULL;
if (recvlen < 0) {
PyErr_SetString(PyExc_ValueError,
"negative buffersize in recv");
return NULL;
}
/* Allocate a new string. */
buf = PyString_FromStringAndSize((char *) 0, recvlen);
if (buf == NULL)
return NULL;
Py_BEGIN_ALLOW_THREADS
timeout = poll_check(s, 0);
if (!timeout) // no error and timeout
outlen = recv(s->sock_fd, PyString_AS_STRING(buf), recvlen, flags);
Py_END_ALLOW_THREADS
if (timeout == 1){
PyErr_SetString(socket_error, "timed out");
return NULL;
}
if (outlen < 0) {
/* An error occurred, release the string and return an
error. */
Py_DECREF(buf);
return PyErr_SetFromErrno(socket_error);
}
if (outlen != recvlen) {
/* We did not read as many bytes as we anticipated, resize the
string if possible and be succesful. */
if (_PyString_Resize(&buf, outlen) < 0)
/* Oopsy, not so succesful after all. */
return NULL;
}
return buf;
}
注意,当send写入0个字节,recv读取到空字符串时,不一定表示错误。
综合下来,其实整个模块复杂的地方就在于对timeout的处理,其他都是对socket c编程的直接封装。
修改setup.py,告诉make编译我们的_sockmini模块:
diff --git a/setup.py b/setup.py
index 6e02114..76b6afd 100644
--- a/setup.py
+++ b/setup.py
@@ -689,6 +689,9 @@ class PyBuildExt(build_ext):
# socket(2)
exts.append( Extension('_socket', ['socketmodule.c'],
depends = ['socketmodule.h']) )
+ exts.append( Extension('_sockmini', ['socketmini.c'],
+ depends = ['socketmodule.h']) )
+
# Detect SSL support for the socket module (via _ssl)
search_for_ssl_incs_in = [
'/usr/local/ssl/include',
make
13:59 jaime@oldtown 2.6.7 (sockmini)$ make
running build
running build_ext
building '_sockmini' extension
gcc -fno-strict-aliasing -g -O2 -DNDEBUG -g -fwrapv -O3 -Wall -Wstrict-prototypes -I. -I/Users/jaime/source/2.6.7/./Include -I/Users/jaime/source/2.6.7/./Mac/Include -I. -IInclude -I./Include -I/usr/local/include -I/Users/jaime/source/2.6.7/Include -I/Users/jaime/source/2.6.7 -c /Users/jaime/source/2.6.7/Modules/socketmini.c -o build/temp.macosx-10.4-x86_64-2.6/Users/jaime/source/2.6.7/Modules/socketmini.o
/Users/jaime/source/2.6.7/Modules/socketmini.c:389: warning: initialization from incompatible pointer type
/Users/jaime/source/2.6.7/Modules/socketmodule.h:228: warning: ‘PySocketModule_ImportModuleAndAPI’ defined but not used
gcc -bundle -undefined dynamic_lookup build/temp.macosx-10.4-x86_64-2.6/Users/jaime/source/2.6.7/Modules/socketmini.o -L/usr/local/lib -o build/lib.macosx-10.4-x86_64-2.6/_sockmini.so
...
running build_scripts
13:59 jaime@oldtown 2.6.7 (sockmini)$
#import _socket as _sockmini
#from _socket import socket as sockmini
import _sockmini
from _sockmini import sockmini
_sockmini.setdefaulttimeout(3)
s = sockmini()
s.connect(('www.google.com', 80))
#s.connect(('www.github.com', 80))
#s.connect(('www.douban.com', 80))
s.send('GET / HTTP/1.1\n\n')
data = ''
while 1:
tmp = s.recv(4096)
print '-', len(tmp), tmp
if not tmp:
break
data += tmp
print data
运行输出:
10:41 jaime@oldtown 2.6.7 (sockmini)$ ./python.exe tests/test_sockmini.py
- 1279 HTTP/1.1 302 Found
Location:
http://www.google.com.hk/url?sa=p&hl=zh-CN&pref=hkredirect&pval=yes&q=http://www.google.com.hk/&ust=1353811295390764&usg=AFQjCNGq9Gh7aZ15wEgee3rdzZBwbYxXUQ
Cache-Control: private
Content-Type: text/html; charset=UTF-8
Set-Cookie:
PREF=ID=1adfb854d08d14e0:FF=0:NW=1:TM=1353811265:LM=1353811265:S=XEogFSHieh_DmFUd;
...
Server: gws
Content-Length: 376
X-XSS-Protection: 1; mode=block
X-Frame-Options: SAMEORIGIN
<HTML><HEAD><meta http-equiv="content-type" content="text/html;charset=utf-8">
<TITLE>302 Moved</TITLE></HEAD><BODY>
<H1>302 Moved</H1>
The document has moved
<A
HREF="http://www.google.com.hk/url?sa=p&hl=zh-CN&pref=hkredirect&pval=yes&q=http://www.google.com.hk/&ust=1353811295390764&usg=AFQjCNGq9Gh7aZ15wEgee3rdzZBwbYxXUQ">here</A>.
</BODY></HTML>
虽然比较简陋,而且可能还有很多问题,但是我们确实有了一个可以工作的socket模块,it’s fun.
cProfile是python的性能测试模块,它只是_lsprof模块的一个封装,用来展示输出后者收集的数据。
运行profile实际上是在enable,disable Python VM的profiling功能。
Lib/cProfile.py:
class Profile(_lsprof.Profiler):
"""Profile(custom_timer=None, time_unit=None, subcalls=True, builtins=True)
...
def runctx(self, cmd, globals, locals):
self.enable()
try:
exec cmd in globals, locals
finally:
self.disable()
return self
Module/_lsprof.c:
static PyObject*
profiler_enable(ProfilerObject *self, PyObject *args, PyObject *kwds)
{
int subcalls = -1;
int builtins = -1;
static char *kwlist[] = {"subcalls", "builtins", 0};
if (!PyArg_ParseTupleAndKeywords(args, kwds, "|ii:enable",
kwlist, &subcalls, &builtins))
return NULL;
if (setSubcalls(self, subcalls) < 0 || setBuiltins(self, builtins) < 0)
return NULL;
PyEval_SetProfile(profiler_callback, (PyObject*)self);
self->flags |= POF_ENABLED;
Py_INCREF(Py_None);
return Py_None;
}
调用PyEval_SetProfile设置了一个callback profiler_callback, 这样python vm在进入函数, 从函数返回前就会告诉我们:
static int
profiler_callback(PyObject *self, PyFrameObject *frame, int what,
PyObject *arg)
{
switch (what) {
/* the 'frame' of a called function is about to start its execution */
case PyTrace_CALL:
ptrace_enter_call(self, (void *)frame->f_code,
(PyObject *)frame->f_code);
break;
/* the 'frame' of a called function is about to finish
(either normally or with an exception) */
case PyTrace_RETURN:
ptrace_leave_call(self, (void *)frame->f_code);
break;
/* case PyTrace_EXCEPTION:
If the exception results in the function exiting, a
PyTrace_RETURN event will be generated, so we don't need to
handle it. */
#ifdef PyTrace_C_CALL /* not defined in Python <= 2.3 */
/* the Python function 'frame' is issuing a call to the built-in
function 'arg' */
case PyTrace_C_CALL:
...
#endif
...
最重要是PyTrace_CALL, PyTrace_RETURN这两个信号,分别表示将要进入和返回函数。 详细请参考 http://docs.python.org/release/2.6.7/c-api/init.html#PyTrace_CALL
要搞清楚ptrace_enter_call, ptrace_leave_call怎么回事,需要明白两个数据结构:
/* represents a function or user defined block */
typedef struct _ProfilerEntry {
rotating_node_t header;
PyObject *userObj; /* PyCodeObject, or a descriptive str for builtins */
PY_LONG_LONG tt; /* total time in this entry */
PY_LONG_LONG it; /* inline time in this entry (not in subcalls) */
long callcount; /* how many times this was called */
long recursivecallcount; /* how many times called recursively */
long recursionLevel;
rotating_node_t *calls;
} ProfilerEntry;
typedef struct _ProfilerContext {
PY_LONG_LONG t0;
PY_LONG_LONG subt;
struct _ProfilerContext *previous;
ProfilerEntry *ctxEntry;
} ProfilerContext;
ProfilerContext
可以认为时调用堆栈链,previous指向上层调用者。存放单次计时的状态,比如进入该函数的时间t0,所有子函数耗时subt, 这两个数据在退出函数时即Stop函数中,用来计算本次调用的tt以及it,然后累加到该函数对应的全局entry中。
ProfilerEntry
计时汇总信息,每个callable只对应一个entry,在这里含有所有该函数的性能数据,如 调用次数callcount,递归调用次数recursivecallcount,当前递归深度recursionLevel,总耗时tt,去除subcall耗时之后该函数自身耗时it等
foo递归调用自己,然后又调用foo1,则上面的结构看起来如下:
Context: Entry:
foo foo
foo foo1
foo
foo
foo1
开始时间,结束时间分别在initContext, Stop中获得,调用CALL_TIMER(pObj),单位为微秒,参见 hpTimer()。
以下是在进入,退出函数时打印一些信息的patch:
diff --git a/Modules/_lsprof.c b/Modules/_lsprof.c
index 049c94d..53819ae 100644
--- a/Modules/_lsprof.c
+++ b/Modules/_lsprof.c
@@ -319,6 +319,10 @@ static void clearEntries(ProfilerObject *pObj)
static void
initContext(ProfilerObject *pObj, ProfilerContext *self, ProfilerEntry *entry)
{
+ if (PyCode_Check(entry->userObj)){ #要进入的函数可能不是PyCodeObject类型,比如上面的PyTrace_C_CALL
+ printf("Entering func %s\n", PyString_AS_STRING(((PyCodeObject *)entry->userObj)->co_name));
+ }
+
self->ctxEntry = entry;
self->subt = 0;
self->previous = pObj->currentProfilerContext;
@@ -339,17 +343,30 @@ initContext(ProfilerObject *pObj, ProfilerContext *self, ProfilerEntry *entry)
static void
Stop(ProfilerObject *pObj, ProfilerContext *self, ProfilerEntry *entry)
{
+ // Total time spent in this level of recursion of a function
PY_LONG_LONG tt = CALL_TIMER(pObj) - self->t0;
+ // Pure time not included sub calls
PY_LONG_LONG it = tt - self->subt;
if (self->previous)
self->previous->subt += tt; # 把本次调用的总耗时算到上一层调用者的子调用耗时里,这样上面的it=tt->self.subt就说的通了
pObj->currentProfilerContext = self->previous;
+
+ // Increase the time spent in a function after all recursion is over
if (--entry->recursionLevel == 0)
entry->tt += tt; # 累加
else
++entry->recursivecallcount;
+
+ // Increase pure time every recursion
entry->it += it; # 累加
entry->callcount++;
+ double collect_factor = hpTimerUnit();
+
+ if (PyCode_Check(entry->userObj)){
+ printf("Leaving func %20s ", PyString_AS_STRING(((PyCodeObject *)entry->userObj)->co_name));
+ printf("Timers: tt %.4f, it %.4f, nc %d, rl %d\n", entry->tt * collect_factor,
+ entry->it * collect_factor, entry->callcount, entry->recursionLevel);
+ }
if ((pObj->flags & POF_SUBCALLS) && self->previous) {
/* find or create an entry for me in my caller's entry */
ProfilerEntry *caller = self->previous->ctxEntry;
@@ -441,7 +458,8 @@ profiler_callback(PyObject *self, PyFrameObject *frame, int what,
/* the 'frame' of a called function is about to start its execution */
case PyTrace_CALL:
ptrace_enter_call(self, (void *)frame->f_code,
- (PyObject *)frame->f_code);
+ (PyObject *)frame->f_code);
+
break;
/* the 'frame' of a called function is about to finish
@@ -593,7 +611,7 @@ static int statsForEntry(rotating_node_t *node, void *arg)
entry->userObj,
entry->callcount,
entry->recursivecallcount,
- collect->factor * entry->tt,
+ collect->factor * entry->tt, // NOTE
collect->factor * entry->it,
collect->sublist);
Py_DECREF(collect->sublist);
@@ -628,6 +646,7 @@ profiler_subentry objects:\n\
inlinetime inline time (not in further subcalls)\n\
");
+
static PyObject*
profiler_getstats(ProfilerObject *pObj, PyObject* noarg)
{
20:46 jaime@oldtown Python-2.6.7 (cprofile)$
用来profile的测试文件, test.py:
import time
def foo1():
time.sleep(1)
def foo(n):
foo1()
if n > 0:
return foo(n - 1)
t = 1
i = 1
while i< 10000:
i += 1
t *= i
return 42
class A:
def test(self):
foo(3)
print 'foo', id(foo)
print 'foo1', id(foo1)
a = A()
print 'A.test', id(a.test)
print 'A.test', id(A().test)
a.test()
foo递归调用自己,每次都调用foo1。为了区别,我们在最后一次调用foo时做了一些计算,这次调用自身也消耗一些时间。profile以可执行的函数为最小单位来计算耗时,每个callable都是一个entry。class的method也是callable,具有全局唯一的地址即id,和绑定到哪个实例没有关系,只有一个entry。
output:
20:50 jaime@oldtown Python-2.6.7 (cprofile)$ ./python.exe -m cProfile test/profile.py
Entering func <module>
Entering func <module>
Entering func A
Leaving func A Timers: tt 0.0000, it 0.0000, nc 1, rl 0
foo 4299829448
foo1 4299808352
A.test 4299358368
A.test 4299358368
Entering func test
Entering func foo
Entering func foo1
Leaving func foo1 Timers: tt 1.0009, it 0.0000, nc 1, rl 0
Entering func foo
Entering func foo1
Leaving func foo1 Timers: tt 2.0021, it 0.0001, nc 2, rl 0
Entering func foo
Entering func foo1
Leaving func foo1 Timers: tt 3.0032, it 0.0001, nc 3, rl 0
Entering func foo
Entering func foo1
Leaving func foo1 Timers: tt 4.0043, it 0.0001, nc 4, rl 0
Leaving func foo Timers: tt 0.0000, it 0.0660, nc 1, rl 3
Leaving func foo Timers: tt 0.0000, it 0.0662, nc 2, rl 2
Leaving func foo Timers: tt 0.0000, it 0.0664, nc 3, rl 1
Leaving func foo Timers: tt 4.0708, it 0.0665, nc 4, rl 0
Leaving func test Timers: tt 4.0708, it 0.0000, nc 1, rl 0
Leaving func <module> Timers: tt 4.0715, it 0.0007, nc 1, rl 0
Leaving func <module> Timers: tt 4.0718, it 0.0000, nc 1, rl 0
22 function calls (19 primitive calls) in 4.072 CPU seconds
Ordered by: standard name
ncalls tottime percall cumtime percall filename:lineno(function)
1 0.000 0.000 4.072 4.072 <string>:1(<module>)
1 0.001 0.001 4.071 4.071 profile.py:1(<module>)
1 0.000 0.000 0.000 0.000 profile.py:17(A)
1 0.000 0.000 4.071 4.071 profile.py:18(test)
4 0.000 0.000 4.004 1.001 profile.py:3(foo1)
4/1 0.066 0.017 4.071 4.071 profile.py:6(foo)
1 0.000 0.000 4.072 4.072 {execfile}
4 0.000 0.000 0.000 0.000 {id}
1 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects}
4 4.004 1.001 4.004 1.001 {time.sleep}
可以看出,每次调用foo1返回后,foo1这个entry的总耗时就加1s,foo1没有自身耗时,调用次数加1,递归深度一直为0. 而foo则不同,输出最早的那个`Leaving func foo`是最深的那次递归,递归深度rl为3,自身耗时为0.0660s,其后各次递归都没有自身耗时。当最上层foo返回即rl为0时,才计算entry foo的总耗时,为4.0708s。
对比下面的cProfile输出,可以看到tottime实际上对应于it,而不是tt,是指函数自身耗时,不包括subcall的耗时,所以可能叫inlinetime更为合适:) cumtime才是tt,函数总耗时。
Lib/cProfile.py
def snapshot_stats(self):
entries = self.getstats()
self.stats = {}
callersdicts = {}
# call information
for entry in entries:
func = label(entry.code)
nc = entry.callcount # ncalls column of pstats (before '/')
cc = nc - entry.reccallcount # ncalls column of pstats (after '/')
tt = entry.inlinetime # tottime column of pstats
ct = entry.totaltime # cumtime column of pstats
cc 为递归除外的调用次数,即4/1中的1。
statprof 提供了另外一种思路。每次进行函数调用前后都执行trace操作,这算是同步的profile。如果让程序一直运行,只是定时的中断 一下,看看程序正在做什么,那么是不是可算作一种统计意义的profile?
具体做法是设置signal.SIGPROF,定时触发profile事件,在处理程序中查看当前堆栈信息,汇总之后就可大致知道程序大部分时间花在什么地方。
gunicorn: 0.14.2, gevent: 1.0b1
gunicorn是一个WSGI server,其核心是arbiter, worker管理模型。
arbiter, 也即master进程,负责管理多个worker进程。每个worker都监听 在同一个地址上,负责处理具体的web request。这个地址可以是ip:port, 也可以是本地socket。master负责spawn,monitor, kill workers,而workers 组成一个池子, 这个进程模型非常典型。
假设有greenlet F,包含三个操作A, B, C,依次顺序执行:
greenlet F: A -> B -> C
如果在执行B的时候,有io数据还没就绪,则gevent会挂起当前greenlet, 转而执行别的greenlet。当发现greenlet F的io数据就绪时,会继续原来B操作。 在greenlet F看来,一切照常运行,就像阻塞了一段时间一样。这非常类似于 操作系统和进程之间的关系,当一个进程进行阻塞IO时,os挂起该进程,选择 别的进程执行,当其IO就绪后,又恢复现场继续原来的进程。 如此看来,挂起阻塞的IO,转而执行别的任务,从而使cpu不至于空等待,这也是 一个很典型的pattern。
gevent要做的事情就是patch所有的阻塞io,在其中显示调用greenlet switch, io实际上变成异步的了,但是在greenlet内看来,结果仍是同步返回的。 如果稍有不慎,系统中仍然有遗漏的阻塞io没有patch,这个greenlet就会一直 占有cpu,导致其他greenlet无法运行,系统吞吐量则会急剧下降。
info: 串行: A, B, C 或者 A -> B -> C
并行: A | B | 或者 [A B C]
gevent(greenlet)在thread,process之外,提供了另外一种可能的并发模型。
上面说到gunicorn的arbiter:worker模型,ggevent就是gunicorn支持的一种worker类型, ggevent基于gevent,gevent基于greenlet。
http://gunicorn.org/design.html
阅读gunicorn代码请参阅 http://readthedocs.org/docs/gunicorn/en/latest/readstart.html
下面来看一下ggevent的工作流程:
# 从Application开始
gunicorn.app.base.WSGIApplication.run
gunicorn.app.base.Application.run
# 关联到一个Arbiter,启动workers
gunicorn.arbiter.Arbiter.run
.manager_workers
.spawn_workers
# Worker初始化
gunicorn.workers.base.Worker.init_process
gunicorn.workers.ggevent.GeventWorker.run:
from gevent.pool import Pool
from gevent.server import StreamServer
pool = Pool(self.worker_connections)
...
server = StreamServer(self.socket, handle=self.handle, spawn=pool)
server.start()
Pool是gevent用来控制并发greenlet的一种机制,如果pool没有满,则pool.spawn可以立即成功,否则需要等待。 http://www.gevent.org/gevent.pool.html#gevent.pool.Pool 该参数被传递给StreamServer,用来实现并发连接数控制。
handle 参数也需注意,每个连接的具体处理,都在这个函数中完成,当server accept新连接之后,即回调此函数。
gunicorn.workers.ggevent.GeventWorker.handle
gunicorn.workers.ggevent.AsyncWorker.handle
gunicorn.workers.ggevent.GeventWorker.handle_request
gunicorn.workers.ggevent.AsyncWorker.handle_request
细看handle:
def handle(self, client, addr):
try:
parser = http.RequestParser(self.cfg, client)
try:
while True:
req = None
with self.timeout_ctx():
req = parser.next()
if not req:
break
self.handle_request(req, client, addr)
except StopIteration, e:
self.log.debug("Closing connection. %s", e)
except socket.error, e:
...
finally:
util.close(client)
这是一个循环,从client连接中不断的读出http请求,依次处理,知道没有请求 可以读为止。这很有意思,因为它为你提供了在一个http连接中发送多个http请求 的可能性。实际上,由于client是一个普通的socket,你甚至可以不用http协议, 你可以自定义一个协议,只需将parser换成可以解析你的协议请求的parser。
pre_request, post_request钩子,具体wsgi执行都在 handle_request中。
Note
这是一般WSGI应用的标准处理流程。和gevent worker类似的,还有一个gevent_pywsgi worker, 它使用gevent自带的WSGI处理程序。work class为GeventPyWSGIWorker,server_class为 gevent.pywsgi.WSGIServer,在上面创建server的时候,走的是和StreamServer不同的分支, 在此就不深入了。
server = self.server_class( self.socket, application=self.wsgi, spawn=pool, log=self.log, handler_class=self.wsgi_handler)
application即为你的wsgi callable,handler_class则是gevent.pywsgi.WSGIHandler。
OK, 继续看server.start的流程:
gevent.server.StreamServer.start
gevent.server.BaseServer.start
gevent.server.BaseServer.start_accepting:
if self._watcher is None:
# just stop watcher without creating a new one?
self._watcher = self.loop.io(self.socket.fileno(), 1)
self._watcher.start(self._do_read)
这个watcher的作用是启动一个greenlet,利用libev来监听socket,一旦有io就调用_do_read callback,后者又调用do_handle会为每个连接启动一个新的greentlet处理:
gevent.server.BaseServer._do_read
gevent.server.BaseServer.do_handle
def set_spawn(self, spawn):
...
elif hasattr(spawn, 'spawn'):
self.pool = spawn # 即上面传进来的pool参数
self._spawn = spawn.spawn
elif ...
def do_handle(self, *args):
spawn = self._spawn
if spawn is None:
self._handle(*args) # 即创建server时的handle回调函数
else:
spawn(self._handle, *args)
def _do_read(self):
for _ in xrange(self.max_accept):
if self.full():
self.stop_accepting()
return
try:
args = self.do_read()
self.delay = self.min_delay
if not args:
return
except:
self.loop.handle_error(self, *sys.exc_info())
...
else:
try:
self.do_handle(*args)
except:
self.loop.handle_error((args[1:], self), *sys.exc_info())
...
_watcher.start并不是一个loop,只是spawn一个greenlet就返回了。 如果start_accepting 立即返回,start也就返回了,问:那么loop在哪里?整个server的主循环在哪里?答曰: 本来就没有loop,整个程序都是由gevent驱动greenlet的,gevent也没有loop,或者可以说, gvent没有显式loop,整个系统是由libev的主循环驱动的:
Unlike other network libraries and similar to eventlet, gevent starts the event
loop implicitly in a dedicated greenlet. There’s no reactor that you must run() or
dispatch() function to call. When a function from gevent API wants to block,
it obtains the Hub instance - a greenlet that runs the event loop - and switches to
it. If there’s no Hub instance yet, one is created on the fly.
http://www.gevent.org/intro.html#event-loop
更多请见下面的Hub.run。
http://www.gevent.org/gevent.hub.html#module-gevent.hub
watcher.start:
gevent.server.BaseServer:
self.loop = gevent.get_hub().loop
...
self._watcher = self.loop.io(self.socket.fileno(), 1)
self._watcher.start(self._do_read)
gevent.get_hub
gevent.hub.Hub.__init__:
loop_class = config('gevent.core.loop', 'GEVENT_LOOP')
...
self.loop = loop_class(flags=loop, default=default)
gevent.core.loop在gevent/gevent/core.ppyx中定义, loop.io方法返回一个 watcher:
gevent.core.loop.io:
def io(self, int fd, int events, ref=True):
return io(self, fd, events, ref)
gevent.core.io: # 调用ev_io_init初始化fd
libev.ev_io_init(&self._watcher, <void *>gevent_callback_io, fd, events)
ev_io_init的回调是gevent_callback_io, 而watcher.start的回调是callback self._do_read,这两者是怎么关联起来呢?gevent/gevent/callbacks.c:
#define GET_OBJECT(PY_TYPE, EV_PTR, MEMBER) \
((struct PY_TYPE *)(((char *)EV_PTR) - offsetof(struct PY_TYPE, MEMBER)))
...
#define DEFINE_CALLBACK(WATCHER_LC, WATCHER_TYPE) \
static void gevent_callback_##WATCHER_LC(struct ev_loop *_loop, void *c_watcher, int revents) { \
struct PyGevent##WATCHER_TYPE##Object* watcher = GET_OBJECT(PyGevent##WATCHER_TYPE##Object, c_watcher, _watcher); \
gevent_callback(watcher->loop, watcher->_callback, watcher->args, (PyObject*)watcher, c_watcher, revents); \
}
_callback实际上就是在io.start函数中设置的callback,请参见core.ppyx中WATCHER_BASE宏定义。
ev_io_init的第一个参数,watcher._watcher,纯的裸libev.ev_io类型,当gevent_callback_io 被调用时,又被传递回来了即这个c_watcher,那么怎么找到对应的python io class对象即 watcher呢?GET_OBJECT即是答案,它可以从一个对象成员的c指针,倒推出这个对象来,强大。
上面即是watcher.start的全部过程,get_hub自动创建了一个gevent.hub.Hub实例,一个greenlet, 整个event loop就在其Hub.run方法:
gevent.hub.Hub.run
gevent.core.loop.run:
def run(self, nowait=False, once=False):
cdef unsigned int flags = 0
if nowait:
flags |= libev.EVRUN_NOWAIT
if once:
flags |= libev.EVRUN_ONCE
with nogil:
libev.ev_run(self._ptr, flags)
终于,大boss出现,关于ev_run文档上这样描述:
bool ev_run (loop, int flags)
Finally, this is it, the event handler. This function usually is called after
you have initialised all your watchers and you want to start handling events.
It will ask the operating system for any new events, call the watcher
callbacks, and then repeat the whole process indefinitely: This is why event
loops are called loops.
gunicorn:
Application
/ \ \
WSGIApplication DjangoApplication PasterBaseApplication
Worker
/ \ \
AsyncWorker SyncWorker TornaoWorker
/ \
GeventWorker EventletWorker
gevent:
BaseServer
/ \
StreamServer DatagramServer
/
WSGIServer
gunicorn 目前尚无自动reload机制,修改代码后需要发送SIGHUB给master进程,通知重新加载。
https://github.com/benoitc/gunicorn/issues/154
gunicorn.aribter.Arbiter init_signals 函数设置signal函数为所有信号的handler,而signal函数 只是把信号放入队列中,具体的处理统一在run函数中,这样的好处可能是降低信号handler异步执行的风险。 只有SIGCHLD信号被特殊处理。
def init_signals(self):
...
map(lambda s: signal.signal(s, self.signal), self.SIGNALS)
signal.signal(signal.SIGCHLD, self.handle_chld)
def signal(self, sig, frame):
if len(self.SIG_QUEUE) < 5:
self.SIG_QUEUE.append(sig)
self.wakeup()
def run(self):
"Main master loop."
self.start()
...
self.manage_workers()
while True:
try:
self.reap_workers()
sig = self.SIG_QUEUE.pop(0) if len(self.SIG_QUEUE) else None
if sig is None:
self.sleep()
self.murder_workers()
self.manage_workers()
continue
...
signame = self.SIG_NAMES.get(sig)
handler = getattr(self, "handle_%s" % signame, None)
...
self.log.info("Handling signal: %s", signame)
handler()
self.wakeup()
...
def handle_chld(self, sig, frame):
"SIGCHLD handling"
self.wakeup()
def handle_hup(self):
"""\
HUP handling.
- Reload configuration
- Start the new worker processes with a new configuration
- Gracefully shutdown the old worker processes
"""
self.log.info("Hang up: %s", self.master_name)
self.reload()
handle_hup 负责处理HUB信号:
def reload(self):
...
# reload conf
self.app.reload()
self.setup(self.app)
...
# spawn new workers with new app & conf
self.cfg.on_reload(self)
...
self.manage_workers()
self.app.reload在gunicorn.app.base.Application中定义,完成的工作只是重新加载app配置。
生成新的worker process是在self.cfg.on_reload,gunicorn.config:
class OnReload(Setting):
name = "on_reload"
section = "Server Hooks"
validator = validate_callable(1)
type = "callable"
def on_reload(server):
for i in range(server.app.cfg.workers):
server.spawn_worker()
default = staticmethod(on_reload)
desc = """\
Called to recycle workers during a reload via SIGHUP.
The callable needs to accept a single instance variable for the Arbiter.
"""
又生成了同样数量的worker。但是,老的worker怎么办?到此为止,好像还没有被杀掉。。。且往下看。
gunicorn.arbiter.Arbiter:
def spawn_worker(self):
self.worker_age += 1
worker = self.worker_class(self.worker_age, self.pid, self.LISTENER,
self.app, self.timeout/2.0,
self.cfg, self.log)
self.cfg.pre_fork(self, worker)
pid = os.fork()
if pid != 0:
self.WORKERS[pid] = worker
return pid
# Process Child
worker_pid = os.getpid()
...
注意worker_age这个递增id,每个master唯一,被传递给了worker_class。gunicorn.workers.base.Worker:
class Worker(object):
...
def __init__(self, age, ppid, socket, app, timeout, cfg, log):
"""\
This is called pre-fork so it shouldn't do anything to the
current process. If there's a need to make process wide
changes you'll want to do that in ``self.init_process()``.
"""
self.age = age
...
此时系统中有双倍的worker,下次arbiter.run循环会调用manage_worker,我们已经知道,它会保证worker数量 在可控范围之内,杀掉多余的worker, gunicorn.arbiter.Arbiter:
def manage_workers(self):
if len(self.WORKERS.keys()) < self.num_workers:
self.spawn_workers()
workers = self.WORKERS.items()
workers.sort(key=lambda w: w[1].age)
while len(workers) > self.num_workers:
(pid, _) = workers.pop(0)
self.kill_worker(pid, signal.SIGQUIT)
原来manager_workers先根据worker的age排序,然后杀掉最老的worker,这样所有发送HUB前的老worker就全被kill了, 剩下只有更新后生成的同样数量的worker,至此worker process全部完成更新。
# TODO: greenlet, libev
Worker, I will free you.