从Ctrl-C看Python多线程的信号处理
也许你会偶然发现Python的多线程程序使用Ctrl-C
杀不掉,必须拿到pid用kill -9
才能干掉,研究这个问题的原因可以使得对Python多线程的信号处理及线程的退出机制有更好的理解。
假如有一个Python写成的用多线程模拟生产者-消费者
的程序,代码如下:
class Producer(threading.Thread):
def run(self):
global count
while True:
if cond.acquire():
if count > 1000:
cond.wait()
else:
count = count + 100
print "%s produce 100, count=%s" % (self.name, count)
cond.notify()
cond.release()
class Consumer(threading.Thread):
def run(self):
global count
while True:
if cond.acquire():
if count < 100:
cond.wait()
else:
count = count - 100
print "%s consume 100, count=%s" % (self.name, count)
cond.notify()
cond.release()
count = 500
cond = threading.Condition()
for i in xrange(4):
p = Producer()
producers.append(p)
p.start()
for j in xrange(2):
c = Consumer()
consumers.append(c)
c.start()
执行以上程序,直接Ctrl-C
是杀不掉的,虽然Ctrl-C
是向程序发送SIGINT
信号,但在这里,这个Ctrl-C
被无视了。从常识上讲,我们是希望程序可以响应Ctrl-C
并立即停止的。
原因
Python多线程中,只有主线程可以响应并处理信号,然而我们看到,主线程在生成了各子线程之后,就歇了,我们知道,主线程不能挂的,那么主线程干嘛去了?答案是主线程在准备退出时,阻塞等待所有非daemon线程去了,看源码:
class _MainThread(Thread):
def __init__(self):
Thread.__init__(self, name="MainThread")
self._Thread__started.set()
self._set_ident()
with _active_limbo_lock:
_active[_get_ident()] = self
def _set_daemon(self):
return False
def _exitfunc(self):
self._Thread__stop()
t = _pickSomeNonDaemonThread()
if t:
if __debug__:
self._note("%s: waiting for other threads", self)
while t:
t.join() #阻塞等待该子线程结束
t = _pickSomeNonDaemonThread()
if __debug__:
self._note("%s: exiting", self)
self._Thread__delete()
def _pickSomeNonDaemonThread():
for t in enumerate():
if not t.daemon and t.is_alive(): #只管非daemon的活跃线程
return t
return None
很明显,主线程已经阻塞等待在了join()上,所以我们的信号没有被主线程及时处理,同时,从以上源码可以看出,Python主线程退出时会等待所有活跃的非daemon线程退出。
如果我们把子线程设为daemon=True
,主线程则不会等待子线程,执行完之后会直接退出,所以我们还是得在主线程中显式的对各子线程join(),又回到了以上的情况而已,不能解决问题。
解决
从以上的分析看出:
首先,我们需要各子线程均为daemon=True
,保证我们的主线程退出的时候,可以直接退出。
其次,主线程不能join()等待子线程,否则会阻塞而无法及时响应信号,那么主线程干点什么好呢?
方案就是:
在主线程注册信号处理函数,并监听Ctrl-C
的信号SIGINT
和kill
的默认信号SIGTERM
,设置一全局变量,各子线程都会在循环执行任务时检测该全局变量,而主线程也会循环检测该全局变量。在信号处理函数中,收到信号时,修改该全局变量的值,通过该全局变量通知子线程退出,而主线程在各子线程退出后,也退出。
class Producer(threading.Thread):
def run(self):
global count
global is_exit
while True:
if cond.acquire():
if is_exit: #每次获取锁之后,先检查全局状态变量
cond.notifyAll() #退出前必须唤醒其他所有线程
cond.release() #退出前必须释放锁
break
if count > 1000:
cond.wait()
else:
count = count + 100
print "%s produce 100, count=%s" % (self.name, count)
cond.notify()
cond.release()
class Consumer(threading.Thread):
def run(self):
global count
global is_exit
while True:
if cond.acquire():
if is_exit:
cond.notifyAll()
cond.release()
break
if count < 100:
cond.wait()
else:
count = count - 100
print "%s consume 100, count=%s" % (self.name, count)
cond.notify()
cond.release()
count = 500
cond = threading.Condition()
is_exit = False #全局变量
def signal_handler(signum, frame): #信号处理函数
global is_exit
is_exit = True #主线程信号处理函数修改全局变量,提示子线程退出
print "Get signal, set is_exit = True"
def test():
producers = []
consumers = []
for i in xrange(4):
p = Producer()
producers.append(p)
p.setDaemon(True) #子线程daemon
p.start()
for j in xrange(2):
c = Consumer()
consumers.append(c)
c.setDaemon(True) #子线程daemon
c.start()
while 1:
alive = False
for t in itertools.chain(producers, consumers): #循环检查所有子线程
alive = alive or t.isAlive() #保证所有子线程退出
if not alive:
break
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal_handler) #注册信号处理函数
signal.signal(signal.SIGTERM, signal_handler) #注册信号处理函数
test()