Python同步机制(semaphore,event,queue)

Python 小记 2019-02-16 3410 字 1411 浏览 点赞

前言

Python中的线程锁Condition源码分析二文的基础之上,对Python多线程数据同步时可能用到的SemaphoreEventQueue做了一些粗略分析。

Semaphore

从一个小demo入手,要求:让子线程完成打印数字0-8的任务,并且同一时刻,仅允许有三个子线程存在。代码实现如下:

import time
import threading

def print_num(num):
    print("this is {}".format(num))
    time.sleep(1)
    sema.release()  # 释放锁

if __name__ == "__main__":
    sema = threading.Semaphore(3)  # 实例一个信号锁,并设置计数器为3(最大线程数为3)
    for i in range(9):
        sema.acquire()  # 上锁
        t = threading.Thread(target=print_num, args=(i,))  # 让线程执行打印数字的任务
        t.start()

为使效果明显,特意增加time.sleep(1)让所有子线程进入短暂睡眠。运行程序会发现,打印结果会三个三个的出现,也就是说,我们做到了“仅允许有三个子线程存在”的要求。


Semaphore的实现基于条件锁(Condition),只不过,它的状态锁默认使用Lock类型。

# Semaphore源码
class Semaphore:
    def __init__(self, value=1):
        if value < 0:
            raise ValueError("semaphore initial value must be >= 0")
        self._cond = Condition(Lock())  # 使用Lock类型的条件锁
        self._value = value
    ...

Semaphore类在实例化时接收一个value参数(默认value=1),这个参数用于限制锁中运行的线程数。整个限制逻辑在acquire()中实现:

# Semaphore源码
class Semaphore:
    ...
    def acquire(self, blocking=True, timeout=None):
        ...
        rc = False
        ...
        with self._cond:
            while self._value == 0:  # 当计数器为0时
                ...
                self._cond.wait(timeout)  # 进入等待,程序被阻塞
            else:  # 如果计数器不为0,每次调用acquire后,value自减1
                self._value -= 1
                rc = True
        return rc

通过release()方法解开一个Lock锁(之前Condition类源码分析时说过,条件锁的wait()方法会产生一个Lock锁),每解开一个锁,计数器自加1。尽管release()方法并不一定会做“解锁”的逻辑,因为它解锁是基于条件锁的notify()方法实现,只有在队列中有锁时才会解锁。

# Semaphore源码
class Semaphore:
    ...
    def release(self):
        ...
        with self._cond:
            self._value += 1  # 自加1
            self._cond.notify()  # 解锁

因为acquire()可能会阻塞程序,建议对应的release()最好在其他线程中调用。

Event

Event锁是Python中的事件锁,实例方式:

import threading

event = threading.Event()  # 获得一个事件锁

查看源码,会发现Event也是基于条件锁实现的。整个Event的逻辑,围绕self._flag标识符展开。self._flag为False时,表示wait会阻塞线程;如果为True,wait不阻塞线程。通过Event实例对象提供的clear()方法,可手动调控wait的行为。

# Event源码
class Event:
    def __init__(self):
        self._cond = Condition(Lock())
        self._flag = False
    ...

event对象有以下几个主要方法:

  • is_set:返回self._flag当前值
  • set:唤醒全部被阻塞的线程,同时将self._flag置为True
# Event源码
def set(self):
    with self._cond:
        self._flag = True
        self._cond.notify_all()
  • clear:重置self._flag的值(不借助任何条件,将self._flag置为False)
# Event源码
def clear(self):
    with self._cond:
        self._flag = False
  • wait:如果self._flag为False,阻塞当前线程;否则不阻塞
# Event源码
def wait(self, timeout=None):
    with self._cond:
        signaled = self._flag
        if not signaled:  # self._flag为false时,阻塞程序
            signaled = self._cond.wait(timeout)
        return signaled

Queue

queue模块中的Queue也是线程安全,这是因为它的成员方法总是基于一个信号mutex。事实上,这个mutex就是Lock的实例对象:

# Queue源码
class Queue:
    def __init__(self, maxsize=0):
        self.maxsize = maxsize
        self._init(maxsize)

        self.mutex = threading.Lock()  # Lock锁
        ...

其他时候,Queue与一个寻常队列行为一样。



本文由 Guan 创作,采用 知识共享署名 3.0,可自由转载、引用,但需署名作者且注明文章出处。

还不快抢沙发

添加新评论