Condition源码分析

Python 小记 2019-02-15 6609 字 1367 浏览 点赞

前言

除了之前在Python中的线程锁中提到的Lock和RLock,Python中的同步机制还有Conditions,我称其为条件锁。以下是我对Condition类的使用以及相关源码分析。

Condition

条件同步机制中,通过通知的方式(notify)激活其他等待中的线程(wait)。在Python内部,Condition的实现是基于RLock之上(同时也有Lock)。获得一个条件锁的方式:

import threading

cond = threading.Condition()

现在需要一个小demo展示条件锁的用法。我想利用多线程打印如下这段对话:

男:今天吃什么?
女:随便。
男:五花肉好不好?
女:油腻了啦!
男:水煮西兰花呢?
女:会不会清淡了?
男:那你想吃什么?
女:随便。

由于对话应该被人理解,所以需要规定线程们的执行顺序。尽管Lock和RLock可以实现期望的那样,但Python已经针对这种需求做好准备——封装成Condition类,轮到它出场了。

import threading

def grilfriend():
    with cond:  # 上第一层锁
        print("女:随便。")
        cond.notify()

        cond.wait()
        print("女:油腻了啦!")
        cond.notify()

        cond.wait()
        print("女:会不会清淡了?")
        cond.notify()

        cond.wait()
        print("女:随便。")

def boyfriend():
    with cond:
        print("男:今天吃什么?")
        cond.notify()

        cond.wait()
        print("男:五花肉好不好?")
        cond.notify()

        cond.wait()
        print("男:水煮西兰花呢?")
        cond.notify()

        cond.wait()
        print("男:那你想吃什么?")
        cond.notify()

if __name__ == "__main__":
    cond = threading.Condition()

    bthread = threading.Thread(target=boyfriend)
    gthread = threading.Thread(target=grilfriend)

    bthread.start() # 为让 男友 先说话,所以先执行boyfriend的线程
    gthread.start()

    bthread.join()
    gthread.join()

上面这个demo中,有三点需要注意:

  • 为让“男友”先说话,需要先开启boyfriend的线程;
  • 条件锁的notify()方法和wait()方法必须在已经上过条件锁的状态下使用;
  • 当线程之间进行交互时,合理安排notify()的次数——它的出现次数应该总是>=wait()出现次数。

Condition源码分析

实例化

实例Condition时可以指定一个lock,如果没有指定,默认创建RLock的实例。同时Condition拥有与RLock一样的上锁方法acquire()和解锁方法release()。事实上,这两个方法直接来源于RLock类。相关源码如下:

# Condition源码
class Condition:
    def __init__(self, lock=None):
        if lock is None:
            lock = RLock()  # 实例化RLock的对象
        self._lock = lock
        self.acquire = lock.acquire  # 借用RLock中的方法
        self.release = lock.release  # 借用RLock中的方法
        ...

初探notify和wait

条件锁的两个重要方法是notify()wait()。前面已经说过,notify()wait()必须在条件锁上锁的状态下使用——拿wait的源码举例,当该方法被调用,程序会先去调用self._is_owned(),判断当前线程号与条件锁中的self._ower是否一致,如果不一致,抛出异常RuntimeError

# Condition源码
class Condition:
    ...
    def _is_owned(self):
        return self._owner == get_ident()
    
    def wait(self, timeout=None):
        if not self._is_owned():
            raise RuntimeError("cannot wait on un-acquired lock")
        ...

    def notify(self, n=1):
       if not self._is_owned():
           raise RuntimeError("cannot notify on un-acquired lock")
       ...
    ...

事实上,调用self._is_owned()时,程序是去lock中找成员变量_ower。又因为lock是RLock的实例,所以追踪进RLock的源码中发现,如果lock不在“锁住”状态,self._owner 一定等于None,那么self._is_owned()会返回False,必然导致程序抛异常。这种行为对notify()来说也是一样。

# RLock源码
class _RLock:
    def __init__(self):
        ...
        self._owner = None
        ...

    def acquire(self, blocking=True, timeout=-1):
        me = get_ident()
        # 非第一次上锁逻辑
        if self._owner == me:
            self._count += 1
            return 1
        
        # 第一次上锁逻辑
        rc = self._block.acquire(blocking, timeout)
        if rc:
            self._owner = me  # 第一次上锁时,对self._owner赋值
            self._count = 1
        return rc
        
    def release(self):
        if self._owner != get_ident():
            raise RuntimeError("cannot release un-acquired lock")
        self._count = count = self._count - 1
        if not count:  # 如果没锁了,将self._owner 置为 None
            self._owner = None
            self._block.release()

notify

# Condition源码
class Condition:
    ...
    def notify(self, n=1):
        if not self._is_owned():
            raise RuntimeError("cannot notify on un-acquired lock")
        all_waiters = self._waiters  # 双端队列,存放lock
        waiters_to_notify = _deque(_islice(all_waiters, n))
        if not waiters_to_notify:  # 如果为空,退出
            return
        for waiter in waiters_to_notify:  # 否则遍历
            waiter.release()  # 一一释放锁
            try:
                all_waiters.remove(waiter)  # 同时移除队列中已经释放的锁
            except ValueError:
                pass

notify()是在一个双端队列中进行操作,这个队列在Condition中名为_waiters。默认情况下,notify只会释放一个锁(按先进先出原则)。如果队列中没有锁,直接退出函数,不报任何异常。

Condition中还有一个notify_all()方法,调用它会释放队列中全部的锁:

# Condition源码
class Condition:
    ...
    def notify_all(self):
        self.notify(len(self._waiters))

wait

事实上wait()中的代码有那么长,但为了突出重点,这里仅仅截取了需要的部分:

# Condition源码
class Condition:
    ...
    def wait(self, timeout=None):
        
        if not self._is_owned():
            raise RuntimeError("cannot wait on un-acquired lock")
        waiter = _allocate_lock()  # 获得一个Lock类型的锁
        waiter.acquire()  # 上锁
        self._waiters.append(waiter)  # 加入队列中去
        ...

也就是说,经过_is_owned()的校验后,程序向下执行到waiter = _allocate_lock(),这时需要注意:函数 _allocate_lock()返回的是一个Lock类型的锁。因此整个结构是:在RLock中,又分布了至少一个Lock锁。多个线程的交互时,就是通过增加队列中的Lock和释放这些Lock来实现阻塞或是激活线程的。

有一点需要理清楚:wait()用来阻塞调用它的线程本身,而notify()会激活那个/哪些程序,得看它从队列中取出的Lock锁住的是谁。

wait_for

# Condition源码
class Condition:
    ...
    def wait_for(self, predicate, timeout=None):
        endtime = None
        waittime = timeout
        result = predicate()
        while not result:  # 当执行结果为True时,
                           # 或者超过了设置的等待时间,返回predicate()运行的结果
            if waittime is not None:
                if endtime is None:
                    endtime = _time() + waittime
                else:
                    waittime = endtime - _time()
                    if waittime <= 0:
                        break
            self.wait(waittime)
            result = predicate()
        return result

wait_for()也是Condition中的方法,它接收的第一个参数是可以调用的函数名,第二个参数是等待的时间(单位秒)。从源码来看,在wait_for()中会执行你传入的函数。当满足两个条件中的其一,wait_for()执行结束。条件1:传入的函数返回有效值时会退出wait_for,并返回这个有效值;条件2:超出预设的等待时长,退出wait_for并返回非有效值。由于其中借助了Condition的wait()方法,所以需要先上条件锁才可以保证程序的正常运行。

有一点也需要明确,在等待过程中,程序不是等到经过timeout秒后,才去第二次执行predicate()。而是等待期间会一直在循环中执行predicate(),一旦拿到有效结果,立马退出循环,并返回这个结果。测试代码可以这样写:

import time
import threading

def return_num():
    start = time.time()
    while not (time.time() - start >= 3):
        pass
    return 1

if __name__ == "__main__":
    cond = threading.Condition()
    with cond:
        print(cond.wait_for(return_num, 10))

运行会发现,程序在4s内(3s+其他时间)执行完毕。



本文由 Guan 创作,采用 知识共享署名 3.0,可自由转载、引用,但需署名作者且注明文章出处。

还不快抢沙发

添加新评论