1.IO编程 IO(input/output)。凡是用到数据交换的地方,都会涉及io编程,例如磁盘,网络的数据传输。在IO编程中,stream(流)是一种重要的概念,分为输入流(input stream)和输出流(output stream)。可以把流季节为一个水管,数据相当于水管中的水,但是只能单向流动,所以数据传输过程中需要假设两个水管,一个负责输入,一个负责输出,这样读写就可以实现同步。 2.GIL:global interpreter lock全局解释器锁 在Python中,GIL导致同一时刻只有一个线程进入解释器。 计算机有些操作不依赖于CPU,I/O操作几乎不利用,IO密集型不占用CPU,多线程可完成 计算操作都要用到CPU,适合多进程
线程和进程的区别:
进程优点:同时利用多个CPU,同时进行多个操作;缺点:耗费资源(需要开辟多个内存空间) 线程优点:共享多个资源,IO操作,创造并发操作;缺点:抢占资源 3.threading模块 threading模块对象 Thread 表示一个线程的执行的对象 Lock 锁原语对象(跟thread模块里的锁对象相同) RLock 可重入锁对象。使单线程可以再次获得已经获得了的锁(递归锁定) Condition条件变量对象能让一个线程停下来,等待其他线程满足了某个“条件”。如状态的改变或值的改变 Event 通用的条件变量。多个线程可以等待某个时间的发生,在事件发生后,所有的线程都被激活 Semaphore 为等待锁的线程提供一个类似“等候室”的结构 BoundedSemaphore 与Semaphore类似,只是它不允许超过初始值 Timer 与thread类似,只是它要等待一段时间后才开始运行 Barrier 创建一个障碍,必须达到指定数量的线程后才可以继续 3.1线程的两种调用方式 按 Ctrl+C 复制代码 按 Ctrl+C 复制代码 #继承式调用 import threading import time class MyThread(threading.Thread): def __init__(self, num): threading.Thread.__init__(self) self.num = num def run(self): #定义每个线程要运行的函数 print("running on number:%s" % self.num) time.sleep(4) if __name__ == '__main__': t1 = MyThread(111) t2 = MyThread(222) t1.start() t2.start() threading的Thread类主要的运行对象 函数 start() 开始线程的执行 run() 定义线程的功能的函数(一般会被子类重写) join(timeout=None) 程序挂起,直到线程结束;如果给了timeout,则最多阻塞timeout秒 getName() 返回线程的名字 setName(name) 设置线程的名字 isAlive() 布尔标志,表示这个线程是否还在运行中 isDaemon() 返回线程的daemon标志 setDaemon(daemonic) 把线程的daemon标志设为daemonic(一定要在调用start()函数前调用) 3.2 join and setDaemon import threading from time import ctime,sleep def music(func): for i in range(2): print ("Begin listening to %s. %s" %(func,ctime())) sleep(2) print("end listening %s"%ctime()) def move(func): for i in range(2): print ("Begin watching at the %s! %s" %(func,ctime())) sleep(5) print('end watching %s'%ctime()) threads = [] t1 = threading.Thread(target=music,args=('七里香',)) threads.append(t1) t2 = threading.Thread(target=move,args=('阿甘正传',)) threads.append(t2) # join在子线程完成运行之前,这个子线程的父线程将一直被阻塞。 if __name__ == '__main__': t2.setDaemon(True) for t in threads: t.setDaemon(True)#守护线程,必须在start() 方法调用之前设置, 如果不设置为守护线程程序会被无限挂起。 t.start() # t.join()#跟线程没有关系了,像正常一样运行,没意义 t1.join() # t2.join()########考虑这三种join位置下的结果? print(threading.current_thread()) print(threading.active_count()) print ("all over %s" %ctime()) join()会等到线程结束,或者在给了timeout参数的时候,等到超时为止。使用join()比使用一个等待锁释放的无限循环清楚一些(也称“自旋锁”)。 join()的另一个比较重要的方法是它可以完全不用调用。一旦线程启动后,就会一直运行,直到线程的函数结束,退出为止。 setDaemon(True):将线程声明为守护线程,必须在start() 方法调用之前设置, 如果不设置为守护线程程序会被无限挂起。这个方法基本和join是相反的。当我们 在程序运行中,执行一个主线程,如果主线程又创建一个子线程,主线程和子线程 就分兵两路,分别运行,那么当主线程完成想退出时,会检验子线程是否完成。如 果子线程未完成,则主线程会等待子线程完成后再退出。但是有时候我们需要的是 只要主线程完成了,不管子线程是否完成,都要和主线程一起退出,这时就可以 用setDaemon方法 3.3 同步锁Lock 由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,当多个线程同时修改同一条数据时可能会出现脏数据,所以,出现了线程锁 - 同一时刻允许一个线程执行操作。 import time import threading def addNum(): global num #在每个线程中都获取这个全局变量 num-=1 temp=num print('--get num:',num ) print('看下结果是0') print('再看下结果') time.sleep(0.001) num =temp-1 #对此公共变量进行-1操作 num = 100 #设定一个共享变量 thread_list = [] for i in range(100): t = threading.Thread(target=addNum) t.start() thread_list.append(t) for t in thread_list: #等待所有线程执行完毕 t.join() print('final num:', num ) 注意: 1: why num-=1没问题呢?这是因为动作太快(完成这个动作在切换的时间内) 2: if sleep(1),现象会更明显,100个线程每一个一定都没有执行完就进行了切换,我们说过sleep就等效于IO阻塞,1s之内不会再切换回来,所以最后的结果一定是99. 多个线程都在同时操作同一个共享资源,所以造成了资源破坏,怎么办呢? 有同学会想用join呗,但join会把整个线程给停住,造成了串行,失去了多线程的意义,而我们只需要把计算(涉及到操作公共数据)的时候串行执行。 我们可以通过同步锁来解决这种问题 import time import threading begin=time.time() def addNum(): global num #在每个线程中都获取这个全局变量 # num-=1 lock.acquire() temp=num # print('--get num:',num ) time.sleep(0.0001) num =temp-1 #对此公共变量进行-1操作 lock.release() num = 100 #设定一个共享变量 thread_list = [] lock=threading.Lock() for i in range(100): t = threading.Thread(target=addNum) t.start() thread_list.append(t) for t in thread_list: #等待所有线程执行完毕 t.join() end=time.time() print('final num:', num ) print(end-begin) 同步锁与GIL的关系? Python的线程在GIL的控制之下,线程之间,对整个python解释器,对python提供的C API的访问都是互斥的,这可以看作是Python内核级的互斥机制。但是这种互斥是我们不能控制的,我们还需要另外一种可控的互斥机制———用户级互斥。内核级通过互斥保护了内核的共享资源,同样,用户级互斥保护了用户程序中的共享资源。 GIL 的作用是:对于一个解释器,只能有一个thread在执行bytecode。所以每时每刻只有一条bytecode在被执行一个thread。GIL保证了bytecode 这层面上是thread safe的。
但是如果你有个操作比如 x += 1,这个操作需要多个bytecodes操作,在执行这个操作的多条bytecodes期间的时候可能中途就换thread了,这样就出现了data races的情况了。 Lock(指令锁)是可用的最低级的同步指令。Lock处于锁定状态时,不被特定的线程拥有。Lock包含两种状态——锁定和非锁定, 3.4 线程死锁和递归锁 在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁,因为系统判断这部分资源都正在使用,所有这两个线程在无外力作用下将一直等待下去。 RLock(可重入锁)是一个可以被同一个线程请求多次的同步指令。RLock使用了“拥有的线程”和“递归等级”的概念,
View Code
解决办法:使用递归锁,将 lockA=threading.Lock() lockB=threading.Lock() 改为Rlock
View Code
3.5 单线程和多线程执行对比
View Code
3.6 条件变量同步(Condition) 有一类线程需要满足条件之后才能够继续执行,Python提供了threading.Condition 对象用于条件变量线程的支持,它除了能提供RLock()或Lock()的方法外,还提供了 wait()、notify()、notifyAll()方法。 lock_con=threading.Condition([Lock/Rlock]): 锁是可选选项,不传人锁,对象自动创建一个RLock()。Condition(条件变量)通常与一个锁关联。需要在多个Contidion中共享一个锁时,可以传递一个Lock/RLock实例给构造方法,否则它将自己生成一个RLock实例。可以认为,除了Lock带有的锁定池外,Condition还包含一个等待池,池中的线程处于状态图中的等待阻塞状态,直到另一个线程调用notify()/notifyAll()通知;得到通知后线程进入锁定池等待锁定。 构造方法: Condition([lock/rlock])
View Code
3.7 事件event python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。 事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。
event.isSet():返回event的状态值; event.wait():如果 event.isSet()==False将阻塞线程; event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度; event.clear():恢复event的状态值为False。 #__author: greg #date: 2017/9/18 19:37 import threading import time event = threading.Event() def func(): # 等待事件,进入等待阻塞状态 print('%s wait for event...' % threading.currentThread().getName()) event.wait() # 收到事件后进入运行状态 print('%s recv event.' % threading.currentThread().getName()) t1 = threading.Thread(target=func) t2 = threading.Thread(target=func) t1.start() t2.start() time.sleep(2) print('MainThread set event.')# 发送事件通知 event.set() threading基于Java的线程模型设计。锁(Lock)和条件变量(Condition)在Java中是对象的基本行为(每一个对象都自带了锁和条件变量), Semaphore/BoundedSemaphore Semaphore(信号量)是计算机科学史上最古老的同步指令之一。Semaphore管理一个内置的计数器, 每当调用acquire()时-1,调用release() 时+1。计数器不能小于0;当计数器为0时, #__author: greg #date: 2017/9/18 10:02 import threading,time class myThread(threading.Thread): def run(self): if semaphore.acquire(): print(self.name) time.sleep(3) semaphore.release() if __name__=="__main__": semaphore=threading.Semaphore(10) thrs=[] for i in range(100): thrs.append(myThread()) for t in thrs: t.start()
实例2
|
|
来自: 昵称61973759 > 《待分类》