也许你会偶然发现Python的多线程程序使用Ctrl-C杀不掉,必须拿到pid用kill -9才能干掉,研究这个问题的原因可以使得对Python多线程的信号处理及线程的退出机制有更好的理解。

假如有一个Python写成的用多线程模拟生产者-消费者的程序,代码如下:

class Producer(threading.Thread):
    def run(self):
        global count
        while True:
            if cond.acquire():
                if count > 1000:
                    cond.wait()
                else:
                    count = count + 100
                    print "%s produce 100, count=%s" % (self.name, count)
                    cond.notify()
                cond.release()

class Consumer(threading.Thread):
    def run(self):
        global count
        while True:
            if cond.acquire():
                if count < 100:
                    cond.wait()
                else:
                    count = count - 100
                    print "%s consume 100, count=%s" % (self.name, count)
                    cond.notify()
                cond.release()
count = 500
cond = threading.Condition()

for i in xrange(4):
    p = Producer()
    producers.append(p)
    p.start()

for j in xrange(2):
    c = Consumer()
    consumers.append(c)
    c.start()

执行以上程序,直接Ctrl-C是杀不掉的,虽然Ctrl-C是向程序发送SIGINT信号,但在这里,这个Ctrl-C被无视了。从常识上讲,我们是希望程序可以响应Ctrl-C并立即停止的。

原因

Python多线程中,只有主线程可以响应并处理信号,然而我们看到,主线程在生成了各子线程之后,就歇了,我们知道,主线程不能挂的,那么主线程干嘛去了?答案是主线程在准备退出时,阻塞等待所有非daemon线程去了,看源码:

class _MainThread(Thread):

    def __init__(self):
        Thread.__init__(self, name="MainThread")
        self._Thread__started.set()
        self._set_ident()
        with _active_limbo_lock:
            _active[_get_ident()] = self

    def _set_daemon(self):
        return False

    def _exitfunc(self):
        self._Thread__stop()
        t = _pickSomeNonDaemonThread()
        if t:
            if __debug__:
                self._note("%s: waiting for other threads", self)
        while t:
            t.join() #阻塞等待该子线程结束
            t = _pickSomeNonDaemonThread()
        if __debug__:
            self._note("%s: exiting", self)
        self._Thread__delete()

    def _pickSomeNonDaemonThread():
        for t in enumerate():
            if not t.daemon and t.is_alive(): #只管非daemon的活跃线程
                return t
        return None

很明显,主线程已经阻塞等待在了join()上,所以我们的信号没有被主线程及时处理,同时,从以上源码可以看出,Python主线程退出时会等待所有活跃的非daemon线程退出

如果我们把子线程设为daemon=True,主线程则不会等待子线程,执行完之后会直接退出,所以我们还是得在主线程中显式的对各子线程join(),又回到了以上的情况而已,不能解决问题。

解决

从以上的分析看出:

首先,我们需要各子线程均为daemon=True,保证我们的主线程退出的时候,可以直接退出。

其次,主线程不能join()等待子线程,否则会阻塞而无法及时响应信号,那么主线程干点什么好呢?

方案就是:

在主线程注册信号处理函数,并监听Ctrl-C的信号SIGINTkill的默认信号SIGTERM,设置一全局变量,各子线程都会在循环执行任务时检测该全局变量,而主线程也会循环检测该全局变量。在信号处理函数中,收到信号时,修改该全局变量的值,通过该全局变量通知子线程退出,而主线程在各子线程退出后,也退出。

class Producer(threading.Thread):
    def run(self):
        global count
        global is_exit
        while True:
            if cond.acquire():
                if is_exit: #每次获取锁之后,先检查全局状态变量
                    cond.notifyAll() #退出前必须唤醒其他所有线程
                    cond.release() #退出前必须释放锁
                    break
                if count > 1000:
                    cond.wait()
                else:
                    count = count + 100
                    print "%s produce 100, count=%s" % (self.name, count)
                    cond.notify()
                cond.release()
class Consumer(threading.Thread):
    def run(self):
        global count
        global is_exit
        while True:
            if cond.acquire():
                if is_exit:
                    cond.notifyAll()
                    cond.release()
                    break
                if count < 100:
                    cond.wait()
                else:
                    count = count - 100
                    print "%s consume 100, count=%s" % (self.name, count)
                    cond.notify()
                cond.release()
count = 500
cond = threading.Condition()
is_exit = False #全局变量
def signal_handler(signum, frame): #信号处理函数
    global is_exit
    is_exit = True #主线程信号处理函数修改全局变量,提示子线程退出
    print "Get signal, set is_exit = True"
def test():
    producers = []
    consumers = []
    for i in xrange(4):
        p = Producer()
        producers.append(p)
        p.setDaemon(True) #子线程daemon
        p.start()
    for j in xrange(2):
        c = Consumer()
        consumers.append(c)
        c.setDaemon(True) #子线程daemon
        c.start()
    while 1:
        alive = False
        for t in itertools.chain(producers, consumers): #循环检查所有子线程
            alive = alive or t.isAlive() #保证所有子线程退出
        if not alive:
            break
if __name__ == "__main__":
    signal.signal(signal.SIGINT, signal_handler) #注册信号处理函数
    signal.signal(signal.SIGTERM, signal_handler) #注册信号处理函数
    test()
文章目录
  1. 1. 原因
  2. 2. 解决