图片 31

IO多路复用,进程和线程

    大家大多数的时候利用三十多线程,甚至多进度,不过python中出于GIL全局解释器锁的案由,python的四线程并不曾真正落到实处

目录

一、开启线程的两种方式
    1.1 直接利用利用threading.Thread()类实例化
    1.2 创建一个类,并继承Thread类
    1.3 在一个进程下开启多个线程与在一个进程下开启多个子进程的区别
        1.3.1 谁的开启速度更快?
        1.3.2 看看PID的不同
        1.3.3 练习
        1.3.4 线程的join与setDaemon
        1.3.5 线程相关的其他方法补充

二、 Python GIL
    2.1 什么是全局解释器锁GIL
    2.2 全局解释器锁GIL设计理念与限制

三、 Python多进程与多线程对比
四、锁
    4.1 同步锁
    GIL vs Lock
    4.2 死锁与递归锁
    4.3 信号量Semaphore
    4.4 事件Event
    4.5 定时器timer
    4.6 线程队列queue

五、协程
    5.1 yield实现协程
    5.2 greenlet实现协程
    5.3 gevent实现协程

六、IO多路复用

七、socketserver实现并发
    7.1 ThreadingTCPServer

八、基于UDP的套接字

后生可畏、进度和线程的定义

     
实际上,python在执行八线程的时候,是由此GIL锁,进行上下文切换线程实行,每趟真实独有贰个线程在运营。所以下边才说,未有真的达成多现程。

生机勃勃、开启线程的二种艺术

在python中展开线程要导入threading,它与开启进度所急需导入的模块multiprocessing在应用上,有超级大的雷同性。在接下去的利用中,就足以窥见。

同开启进度的二种办法雷同:

先是,引出“多职务”的概念:多职责管理是指顾客能够在同一时间内运转三个应用程序,每一种应用程序被称作二个任务。Linux、windows正是支撑多职责的操作系统,比起单职责系统它的机能巩固了不菲。

      那么python的三十二线程就一直不什么样用了吗?

1.1 直接行使利用threading.Thread()类实例化

from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.start()

    print('主线程')

举个例子,你一只在用浏览器上网,风姿洒脱边在听微博云音乐,风流罗曼蒂克边在用Word赶作业,那就是多任务,最少还要有3个职责正在运维。还应该有大多任务悄悄地在后台同时运转着,只是桌面上未有显得而已。

             
不是那么些样子的,python多线程平日用于IO密集型的程序,那么怎么着叫做IO密集型呢,举个例子,比方说带有梗塞的。当前线程拥塞等待其余线程实施。

1.2 创制四个类,并一而再Thread类

from threading import Thread
import time
calss Sayhi(Thread):
    def __init__(self,name):
        super().__init__()
        self.name = name
    def run(self):
        time.sleep(2)
        print("%s say hello" %self.name)

if __name__ == "__main__":
    t = Sayhi("egon")
    t.start()
    print("主线程")

不过,这几个职责是同不平时候在运转着的呢?人所共知,运转二个任务就须要cpu去管理,那还要运维四个职务就亟须须求多个cpu?这假设有九十多个职责急需同时运转,就得买二个100核的cpu吗?显然不可能!

      即然聊起切合python八线程的,那么哪些的不契合用python多线程呢?

1.3 在二个进度下打开多个线程与在三个经过下张开多少个子进度的区分

今日,多核CPU已经不行普遍了,但是,就算过去的单核CPU,也足以实行多义务。由于CPU实施代码都是各样实行的,那么,单核CPU是怎么实行多职分的呢?

             
答案是CPU密集型的,那么怎么着的是CPU密集型的吧?百度时而您就知晓。

1.3.1 什么人的打开速度越来越快?

from threading import Thread
from multiprocessing import Process
import os

def work():
    print('hello')

if __name__ == '__main__':
    #在主进程下开启线程
    t=Thread(target=work)
    t.start()
    print('主线程/主进程')
    '''
    打印结果:
    hello
    主线程/主进程
    '''

    #在主进程下开启子进程
    t=Process(target=work)
    t.start()
    print('主线程/主进程')
    '''
    打印结果:
    主线程/主进程
    hello
    '''

结论:鉴于创造子进度是将主进程完全拷贝后生可畏份,而线程不要求,所以线程的创导速度更加快。

答案便是操作系统交替让各样职分更换试行,职责1实行0.01秒,切换来职分2,职分2实行0.01秒,再切换来职分3,试行0.01秒……那样再三施行下去。表面上看,各类义务都以更迭实施的,可是,由于CPU的履行进度其实是太快了,大家感到就好像全数职责都在同一时间实践同风流倜傥。

      

1.3.2 看看PID的不同

from threading import Thread
from multiprocessing import Process
import os

def work():
    print('hello',os.getpid())

if __name__ == '__main__':
    #part1:在主进程下开启多个线程,每个线程都跟主进程的pid一样
    t1=Thread(target=work)
    t2=Thread(target=work)
    t1.start()
    t2.start()
    print('主线程/主进程pid',os.getpid())

    #part2:开多个进程,每个进程都有不同的pid
    p1=Process(target=work)
    p2=Process(target=work)
    p1.start()
    p2.start()
    print('主线程/主进程pid',os.getpid())


'''
hello 13552
hello 13552
主线程pid: 13552
主线程pid: 13552
hello 1608
hello 6324
'''

总结:能够看看,主进度下开启三个线程,每一个线程的PID都跟主进程的PID雷同;而开多个经过,每一种进度都有两样的PID。

总计:三个cpu同不时刻只可以运行一个“任务”;真正的并行执行多职分只可以在多核CPU上贯彻,不过,由于义务数量远远多于CPU的基本数据,所以,操作系统也会活动把无数任务轮流动调查治到每在这之中央上实行。

       未来有这么风流倜傥项职务:供给从200W个url中获取数据?

1.3.3 练习

练习一:应用多线程,完结socket 并发连接
服务端:

from threading import Thread
from socket import *
import os

tcpsock = socket(AF_INET,SOCK_STREAM)
tcpsock.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
tcpsock.bind(("127.0.0.1",60000))
tcpsock.listen(5)

def work(conn,addr):
    while True:
        try:
            data = conn.recv(1024)
            print(os.getpid(),addr,data.decode("utf-8"))
            conn.send(data.upper())
        except Exception:
            break

if __name__ == '__main__':
    while True:
        conn,addr = tcpsock.accept()
        t = Thread(target=work,args=(conn,addr))
        t.start()

"""
开启了4个客户端
服务器端输出:
13800 ('127.0.0.1', 63164) asdf
13800 ('127.0.0.1', 63149) asdf
13800 ('127.0.0.1', 63154) adsf
13800 ('127.0.0.1', 63159) asdf

可以看出每个线程的PID都是一样的。
""

客户端:

from socket import *

tcpsock = socket(AF_INET,SOCK_STREAM)
tcpsock.connect(("127.0.0.1",60000))

while True:
    msg = input(">>: ").strip()
    if not msg:continue
    tcpsock.send(msg.encode("utf-8"))
    data = tcpsock.recv(1024)
    print(data.decode("utf-8"))

练习二:有三个任务,贰个抽取客户输入,一个将客户输入的开始和结果格式化成大写,一个将格式化后的结果存入文件。

from threading import Thread

recv_l = []
format_l = []

def Recv():
    while True:
        inp = input(">>: ").strip()
        if not inp:continue
        recv_l.append(inp)

def Format():
    while True:
        if recv_l:
            res = recv_l.pop()
            format_l.append(res.upper())

def Save(filename):
    while True:
        if format_l:
            with open(filename,"a",encoding="utf-8") as f:
                res = format_l.pop()
                f.write("%sn" %res)

if __name__ == '__main__':
    t1 = Thread(target=Recv)
    t2 = Thread(target=Format)
    t3 = Thread(target=Save,args=("db.txt",))
    t1.start()
    t2.start()
    t3.start()

对此操作系统来讲,两个职分正是贰个经过(Process卡塔 尔(阿拉伯语:قطر‎,举个例子张开二个浏览器便是开发银行一个浏览器进程,展开叁个记事本就开动了多个记事本进度,张开三个记事本就运行了多少个记事本进度,张开多少个Word就开发银行了二个Word进程。

      
那么我们真切不能够用多线程,上下文切换是索要时日的,数据量太大,无法承当。这里大家就要用到多进程+协程

1.3.4 线程的join与setDaemon

与经过的措施都以相似的,其实multiprocessing模块是没有主见只会见风转舵threading模块的接口;

from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.setDaemon(True) #设置为守护线程,主线程结束,子线程也跟着线束。
    t.start()
    t.join()  #主线程等待子线程运行结束
    print('主线程')
    print(t.is_alive())

微微进度还不唯有同有时间干意气风发件事,比如Word,它能够同期举行打字、拼写检查、打字与印刷等专门的职业。在叁个经过之中,要同临时间干多件事,就须要同有的时候候运营几个“子职分”,大家把进度内的这一个“子职务”称为线程(Thread卡塔尔。

      那么什么样是协程呢?

1.3.5 线程相关的别的格局补充

Thread实例对象的章程:

  • isAlive():再次回到纯种是或不是是活跃的;
  • getName():再次来到线程名;
  • setName():设置线程名。

threading模块提供的有的主意:

  • threading.currentThread():重回当前的线程变量
  • threading.enumerate():重临贰个含有正在运转的线程的列表。正在运维指线程运行后、停止前,不包蕴运维前和结束后。
  • threading.activeCount():重返正在运转的线程数量,与len(threading.enumerate())有相近结果。

from threading import Thread
import threading
import os

def work():
    import time
    time.sleep(3)
    print(threading.current_thread().getName())


if __name__ == '__main__':
    #在主进程下开启线程
    t=Thread(target=work)
    t.start()

    print(threading.current_thread().getName()) #获取当前线程名
    print(threading.current_thread()) #主线程
    print(threading.enumerate()) #连同主线程在内有两个运行的线程,返回的是活跃的线程列表
    print(threading.active_count())  #活跃的线程个数
    print('主线程/主进程')

    '''
    打印结果:
    MainThread
    <_MainThread(MainThread, started 140735268892672)>
    [<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>]
    2
    主线程/主进程
    Thread-1
    '''

出于每一个进度最少要干生龙活虎件事,所以,一个进度至稀少二个线程。当然,像Word这种复杂的长河能够有三个线程,三个线程能够同不平时间实施,八线程的实施办法和多进程是平等的,也是由操作系统在七个线程之间超级快切换,让各种线程都为期不远地轮番运行,看起来就好像同一时候举办相仿。当然,真正地同期执行八十六线程须要多核CPU才可能完结。

      协程,又称微线程,纤程。罗马尼亚(罗曼ia卡塔 尔(阿拉伯语:قطر‎语名Coroutine。

二、 Python GIL

GIL全称Global Interpreter
Lock
,即全局解释器锁。首先要求明白的一点是GIL实际不是Python的风味,它是在促成Python拆解解析器(CPython)时所引入的二个概念。就好比C++是后生可畏套语言(语法卡塔尔标准,不过足以用不一样的编写翻译器来编写翻译成可进行代码。盛名的编写翻译器比如GCC,INTEL
C++,Visual
C++等。Python也如出生机勃勃辙,相符风姿浪漫段代码能够通过CPython,PyPy,Psyco等不等的Python实施情状来进行。像此中的JPython就从不GIL。不过因为CPython是超越八分之四条件下暗许的Python实行情况。所以在数不完人的定义里CPython便是Python,也就想当然的把GIL归咎为Python语言的缺陷。所以那边要先分明一点:GIL并非Python的性状,Python完全能够不凭借于GIL

小结:

     
协程的定义很已经建议来了,但甘休眼前几年才在一些语言(如Lua卡塔尔国中拿到布满应用。

2.1 什么是大局解释器锁GIL

Python代码的施行由Python
虚构机(也叫解释器主循环,CPython版本)来调节,Python
在陈设之初就构思到要在解释器的主循环中,同一时间唯有三个线程在实施,即在任意时刻,唯有三个线程在解释器中运转。对Python
虚构机的拜望由全局解释器锁(GIL卡塔 尔(阿拉伯语:قطر‎来决定,正是以此锁能保障平等时刻唯有一个线程在运维。
在四十二十四线程蒙受中,Python 设想机按以下办法奉行:

  1. 设置GIL
  2. 切换成一个线程去运维
  3. 运行:
    a. 钦命数量的字节码指令,或然
    b. 线程主动让出调控(能够调用time.sleep(0)卡塔 尔(阿拉伯语:قطر‎
  4. 把线程设置为睡眠状态
  5. 解锁GIL
  6. 重新重新以上全数手续

在调用外界代码(如C/C++扩张函数卡塔 尔(英语:State of Qatar)的时候,GIL
将会被锁定,直到那些函数截至截至(由于在当中间从不Python
的字节码被运转,所以不会做线程切换卡塔 尔(英语:State of Qatar)。

  • 进度正是三个前后相继在三个数目集上的一次动态试行进程。进度日常由程序、数据集、进度调整块三局地组成。
  • 线程也叫轻量级进程,它是二个骨干的CPU施行单元,也是程序实践进程中的最小单元,由线程ID、程序计数器、寄放器集结和仓库同盟组成。线程的引进减小了程序现身实施时的开辟,提升了操作系统的产出品质。线程未有和睦的系统财富。

     
协程有什么样平价吗,协程只在单线程中实践,无需cpu实行上下文切换,协程自动完结子程序切换。

2.2 全局解释器锁GIL设计观念与约束

GIL的设计简化了CPython的贯彻,使得对象模型,满含首要的内建项目如字典,都是富含可以并发访谈的。锁住全局解释器使得比较便于的得以达成对多线程的支撑,但也损失了多微型机主机的并行计算才干。
不过,无论规范的,还是第三方的扩大模块,都被规划成在进展密集总结职分是,释放GIL。
还会有,便是在做I/O操作时,GIL总是会被释放。对持有面向I/O
的(会调用内建的操作系统C 代码的)程序来讲,GIL 会在此个I/O
调用早先被放飞,以允许别的的线程在此个线程等待I/O
的时候运转。假如是纯总计的顺序,未有 I/O 操作,解释器会每间距 100
次操作就自由那把锁,让其他线程有机会实践(那些次数能够通过
sys.setcheckinterval 来调度卡塔尔假若某线程并未有接收过多I/O
操作,它会在和睦的时辰片内平素据有微型机(和GIL卡塔 尔(阿拉伯语:قطر‎。也正是说,I/O
密集型的Python 程序比揣摸密集型的顺序更能丰富利用二十四线程情况的益处。

下边是Python 2.7.9手册中对GIL的粗略介绍:
The mechanism used by the CPython interpreter to assure that only one
thread executes Python bytecode at a time. This simplifies the CPython
implementation by making the object model (including critical built-in
types such as dict) implicitly safe against concurrent access. Locking
the entire interpreter makes it easier for the interpreter to be
multi-threaded, at the expense of much of the parallelism afforded by
multi-processor machines.
However, some extension modules, either standard or third-party, are
designed so as to release the GIL when doing computationally-intensive
tasks such as compression or hashing. Also, the GIL is always released
when doing I/O.
Past efforts to create a “free-threaded” interpreter (one which locks
shared data at a much finer granularity) have not been successful
because performance suffered in the common single-processor case. It is
believed that overcoming this performance issue would make the
implementation much more complicated and therefore costlier to maintain.

从上文中得以看来,针对GIL的主题材料做的累累改过,如应用越来越细粒度的锁机制,在单微型机遭遇下反而产生了质量的下降。布满认为,制服这一个特性难题会促成CPython达成更为错综相连,由此维护开销更是高昂。

二、进程和线程的涉及

     
这里未有应用yield协程,这些python自带的并非很圆满,至于何以有待于你去探究了。

三、 Python多进度与三十二线程相比较

有了GIL的留存,同有的时候刻同风流倜傥进度中唯有一个线程被奉行?这里可能人有四个疑点:多进程能够动用多核,可是付出大,而Python多线程花费小,但却无计可施利用多核的优势?要消除那一个难点,大家需求在以下几点上达到规定的规范共鸣:

  • CPU是用来测算的!
  • 多核CPU,意味着能够有八个核并行达成总计,所以多核进级的是计算品质;
  • 每种CPU风度翩翩旦遇上I/O堵塞,依旧供给等待,所以多核对I/O操作没什么用途。

道理当然是那样的,对于几个程序来讲,不会是纯计算照旧纯I/O,大家一定要绝没错去看二个程序到底是计量密集型,照旧I/O密集型。进而特别分析Python的二十多线程有无发挥特长。

分析:

我们有四个任务急需管理,管理访求显明是要有现身的意义,建设方案能够是:

  • 方案生龙活虎:开启多个进程;
  • 方案二:二个过程下,开启多个经过。

单核境况下,深入分析结果:

  • 只要八个义务是计算密集型,未有多核来并行计算,方案后生可畏徒增了成立进程的开荒,方案二胜;
  • 要是多个职务是I/O密集型,方案生机勃勃制程的支付大,且经过的切换速度远不及线程,方案二胜。

多核景况下,解析结果:

  • 假诺多少个职分是密集型,多核意味着并行
    计算,在python中二个进程中相同一时候刻只有二个线程实践用不上多核,方案生机勃勃胜;
  • 设若八个职分是I/O密集型,再多的核 也裁撤不了I/O难题,方案二胜。

结论:目前的Computer基本上都以多核,python对于总计密集型的天职开三十二线程的功效并无法推动多大质量上的提高,以至不比串行(未有大气切换卡塔尔国,可是,对于I/O密集型的职分效用依旧有明显提高的。

代码完结相比

计量密集型:

#计算密集型
from threading import Thread
from multiprocessing import Process
import os
import time
def work():
    res=0
    for i in range(1000000):
        res+=i

if __name__ == '__main__':
    t_l=[]
    start_time=time.time()
    for i in range(100):
        # t=Thread(target=work) #我的机器4核cpu,多线程大概15秒
        t=Process(target=work) #我的机器4核cpu,多进程大概10秒
        t_l.append(t)
        t.start()

    for i in t_l:
        i.join()
    stop_time=time.time()
    print('run time is %s' %(stop_time-start_time))
    print('主线程')

I/O密集型:

#I/O密集型
from threading import Thread
from multiprocessing import Process
import time
import os
def work():
    time.sleep(2) #模拟I/O操作,可以打开一个文件来测试I/O,与sleep是一个效果
    print(os.getpid())

if __name__ == '__main__':
    t_l=[]
    start_time=time.time()
    for i in range(500):
        # t=Thread(target=work) #run time is 2.195
        t=Process(target=work) #耗时大概为37秒,创建进程的开销远高于线程,而且对于I/O密集型,多cpu根本不管用
        t_l.append(t)
        t.start()

    for t in t_l:
        t.join()
    stop_time=time.time()
    print('run time is %s' %(stop_time-start_time))

总结:
使用途景:
四线程用于I/O密集型,如socket、爬虫、web
多进度用于计算密集型,如金融深入分析

经过是Computer中的程序关于某数码集上的三次运转活动,是系统进行财富分配和调节的中坚单位,是操作系统结构的底工。可能说进度是全部自然独立功效的主次关于有些数据集上的一回运转活动,进程是系统实行资源分配和调解的一个单身单位。
线程则是经过的叁个实体,是CPU调解和分担的中坚单位,它是比进度越来越小的能独立运转的着力单位。

      这里运用比较完备的第三方协程包gevent

四、锁

图片 1

      pip  install    gevent

4.1 同步锁

急需:对一个全局变量,开启九贰十三个线程,各样线程都对该全局变量做减1操作;

不加锁,代码如下:

import time
import threading

num = 100  #设定一个共享变量
def addNum():
    global num #在每个线程中都获取这个全局变量
    #num-=1

    temp=num
    time.sleep(0.1)
    num =temp-1  # 对此公共变量进行-1操作

thread_list = []

for i in range(100):
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)

for t in thread_list: #等待所有线程执行完毕
    t.join()

print('Result: ', num)

分析:如上程序开启100线程并不能够把全局变量num减为0,第贰个线程推行addNum遇见I/O梗塞后比比较快切换来下五个线程推行addNum,由于CPU实践切换的快慢特别快,在0.1秒内就切换实现了,那就导致了第三个线程在获得num变量后,在time.sleep(0.1)时,别的的线程也都获得了num变量,全部线程得到的num值都以100,所以最后减1操作后,便是99。加锁达成。

加锁,代码如下:

import time
import threading

num = 100   #设定一个共享变量
def addNum():
    with lock:
        global num
        temp = num
        time.sleep(0.1)
        num = temp-1    #对此公共变量进行-1操作

thread_list = []

if __name__ == '__main__':
    lock = threading.Lock()   #由于同一个进程内的线程共享此进程的资源,所以不需要给每个线程传这把锁就可以直接用。
    for i in range(100):
        t = threading.Thread(target=addNum)
        t.start()
        thread_list.append(t)

    for t in thread_list:  #等待所有线程执行完毕
        t.join()

    print("result: ",num)

加锁后,第三个线程拿到锁后初步操作,第四个线程必须等待第八个线程操作达成后将锁释放后,再与别的线程竞争锁,获得锁的线程才有权操作。那样就保持了多少的双鸭山,可是拖慢了实践进程。
注意:with locklock.acquire()(加锁)与lock.release()(释放锁)的简写。

import threading

R=threading.Lock()

R.acquire()
'''
对公共数据的操作
'''
R.release()

小结:

各类进程下N个协程,   

GIL vs Lock

机智的同学可能会问到这个问题,就是既然你之前说过了,Python已经有一个GIL来保证同一时间只能有一个线程来执行了,为什么这里还需要lock? 

先是大家必要完成共鸣:锁的指标是为着保险分享的数码,同时只可以有贰个线程来矫正分享的多少

接下来,大家得以得出结论:保养不相同的数码就应当加不相同的锁。

终极,难题就很晴朗了,GIL
与Lock是两把锁,爱慕的数量不平等,前者是解释器级其余(当然维护的就是解释器级其余数额,举个例子垃圾回笼的多少卡塔尔,后面一个是维护顾客自身支付的应用程序的数量,很分明GIL不担当那事,只好顾客自定义加乌鱼理,即Lock

详细的:

因为Python解释器帮你活动定期开展内部存款和储蓄器回笼,你可以见到为python解释器里有一个单身的线程,每过后生可畏段时间它起wake
up做贰次全局轮询看看怎样内部存款和储蓄器数据是能够被清空的,那时候您和谐的程序
里的线程和
py解释器本人的线程是并发运维的,若是你的线程删除了贰个变量,py解释器的废料回笼线程在清空那些变量的长河中的clearing时刻,恐怕叁个别样线程刚巧又再一次给那些还未来及得清空的内部存款和储蓄器空间赋值了,结果就有异常的大希望新赋值的数据被去除了,为了消除相通的标题,python解释器轻松阴毒的加了锁,即当贰个线程运转时,此外人都不能够动,那样就消除了上述的主题素材,
那能够说是Python开始时代版本的遗留难点。

  • 三个线程只可以归属一个经过,而二个历程能够有八个线程,但至稀有贰个线程。

  • 能源分配给进程,同大器晚成进度的持有线程分享该进程的有所财富。

  • CPU分给线程,即确实在CPU上运行的是线程。
#coding=utf-8
from multiprocessing import Process
import gevent
#from gevent import monkey; monkey.patch_socket()
#用于协程的了程序
def yield_execFunc(x):
    print('______________%s'%x)


#yield_clist决定协程的数量
#开始协程操作
def yield_start(yield_clist):
    task=[] #用来存储协程
    for i in yield_clist:
        task.append(gevent.spawn(yield_execFunc,i))

    gevent.joinall(task) #执行协程

if  __name__=="__main__":
    list1=[1,2,3,4,5,6,7,8,9,10] #元素个数决定开起的协程数量
    list2=[1,2,3,4,5,6,7,8,9,10]
    list3=[1,2,3,4,5,6,7,8,9,10]
    process_list =[list1,list2,list3] #元素个数决定进程数量
    for plist in process_list:
        p = Process(target=yield_start,args=(plist,))
        p.start()

4.2 死锁与递归锁

所谓死锁:是指三个或多少个以上的历程或线程在实践进程中,因争夺财富而引致的后生可畏种相互作用等待的面貌,若无外力功能,它们都将不能够推动下去。那时称系统处于死锁状态,或系统一发布生了死锁。那此永久在互相等待的进度称死锁进度

如下代码,就能够时有产生死锁:

from threading import Thread,Lock
import time
mutexA=Lock()
mutexB=Lock()

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()
    def func1(self):
        mutexA.acquire()
        print('33[41m%s 拿到A锁33[0m' %self.name)

        mutexB.acquire()
        print('33[42m%s 拿到B锁33[0m' %self.name)
        mutexB.release()

        mutexA.release()

    def func2(self):
        mutexB.acquire()
        print('33[43m%s 拿到B锁33[0m' %self.name)
        time.sleep(2)

        mutexA.acquire()
        print('33[44m%s 拿到A锁33[0m' %self.name)
        mutexA.release()

        mutexB.release()

if __name__ == '__main__':
    for i in range(10):
        t=MyThread()
        t.start()

'''
Thread-1 拿到A锁
Thread-1 拿到B锁
Thread-1 拿到B锁
Thread-2 拿到A锁
然后就卡住,死锁了
'''

减轻死锁的点子

制止生出死锁的措施便是用递归锁,在python中为了扶助在同一线程中一再哀告同一能源,python提供了可重入锁RLock

这个RLock其间维护着多少个Lock和多少个counter变量,counter记录了acquire(获得锁卡塔 尔(英语:State of Qatar)的次数,进而使得财富能够被再三require。直到叁个线程全数的acquire都被release(释放卡塔尔国后,别的的线程技术博得能源。下边包车型大巴例子尽管采用RLock代替Lock,就不会产生死锁的场馆了。

mutexA=mutexB=threading.RLock()
#一个线程拿到锁,counter加1,该线程内又遇上加锁的事态,则counter继续加1,这里面具备别的线程都一定要等待,等待该线程释放具备锁,即counter依次减少到0甘休。

三、并行(xing)和并发

施行结果:开了八个进度,每种进度下推行十二个体协会程合作职务

4.3 信号量Semaphore

同进度的功率信号量同样。
用三个世俗的事例来讲,锁也正是独立卫生间,唯有四个坑,同不时刻只可以有一位得到锁,进去使用;而非实信号量也正是国有更衣室,比如有5个坑,同不经常刻能够有5个人获得锁,并利用。

Semaphore管理叁个停放的流速计,每当调用acquire()时,内置计数器-1;调用release()时,内置流量计+1;流量计无法小于0,当流速计为0时,acquire()将堵塞线程,直到其余线程调用release()

实例:
再就是独有5个线程能够收获Semaphore,即能够限定最罗安达接数为5:

import threading
import time

sem = threading.Semaphore(5)
def func():
    if sem.acquire():   #也可以用with进行上下文管理
        print(threading.current_thread().getName()+"get semaphore")
        time.sleep(2)
        sem.release()

for i in range(20):
    t1 = threading.Thread(target=func)
    t1.start()

利用with举办上下文物管理理:

import threading
import time

sem = threading.Semaphore(5)

def func():
    with sem:   
        print(threading.current_thread().getName()+"get semaphore")
        time.sleep(2)

for i in range(20):
    t1 = threading.Thread(target=func)
    t1.start()

注:复信号量与进度池是完全差异风姿罗曼蒂克的定义,进度池Pool(4)最大不能不发出4个经过,况且自始自终都只是那4个进度,不会产生新的,而非确定性信号量是产生一批线程/进度。

并行管理(Parallel
Processing卡塔尔是Computer种类中能同不经常间奉行五个或更多少个管理的生龙活虎种计算办法。并行处理可同一时间专业于意气风发致程序的差别方面。并行管理的关键目标是省去大型和复杂难题的清除岁月。

C:Python27python.exe D:/weixin/temp/yield_tmp.py
______________1
______________2
______________3
______________4
______________5
______________6
______________7
______________8
______________9
______________10
______________1
______________1
______________2
______________2
______________3
______________3
______________4
______________4
______________5
______________5
______________6
______________6
______________7
______________7
______________8
______________8
______________9
______________9
______________10
______________10

Process finished with exit code 0

4.4 事件Event

同进程的意气风发律

线程的贰个入眼脾气是每一种线程都是单独运作且情形不行预测。即便程序中的其余线程通过判别有些线程的气象来显明自身下一步的操作,这时候线程同步难题就能变得那个劳累,为驾驭除这么些标题我们运用threading库中的Event对象。

Event对象包含三个可由线程设置的能量信号标记,它同意线程等待某个事件的发生。在发轫情状下,伊芙nt对象中的非实信号标记被安装为假。借使有线程等待多少个伊夫nt对象,而那几个伊芙nt对象的标识为假,那么这几个线程将会被
平素不通直至该
标识为真。三个线程假诺将三个Event对象的能量信号标记设置为真,它将唤起全体等待那么些Event对象的线程。假诺二个线程等待三个已经被
设置 为确实Event对象,那么它将忽视那么些事件,继续实施。

伊芙nt对象具备部分方法:
event = threading.Event() #产生一个风云指标

  • event.isSet():返回event状态值;
  • event.wait():如果event.isSet() == False,将卡住线程;
  • event.set():设置event的情事值为True,全体堵塞池的线程踏向就绪状态,等待操作系统中度;
  • event.clear():恢复生机event的动静值False。

应用项景:

举例说,大家有五个线程需求连接数据库,我们想要在运维时确认保证Mysql服务正常,才让这个工作线程去老是Mysql服务器,那么大家就足以行使threading.Event()体制来协和各类专门的工作线程的连年操作,主线程中会去尝试连接Mysql服务,要是不奇怪的话,触发事件,各工作线程会尝试连接Mysql服务。

from threading import Thread,Event
import threading
import time,random
def conn_mysql():
    print('33[42m%s 等待连接mysql。。。33[0m' %threading.current_thread().getName())
    event.wait()  #默认event状态为False,等待
    print('33[42mMysql初始化成功,%s开始连接。。。33[0m' %threading.current_thread().getName())


def check_mysql():
    print('33[41m正在检查mysql。。。33[0m')
    time.sleep(random.randint(1,3))
    event.set()   #设置event状态为True
    time.sleep(random.randint(1,3))

if __name__ == '__main__':
    event=Event()
    t1=Thread(target=conn_mysql) #等待连接mysql
    t2=Thread(target=conn_mysql) #等待连接myqsl
    t3=Thread(target=check_mysql) #检查mysql

    t1.start()
    t2.start()
    t3.start()


'''
输出如下:
Thread-1 等待连接mysql。。。
Thread-2 等待连接mysql。。。
正在检查mysql。。。
Mysql初始化成功,Thread-1开始连接。。。
Mysql初始化成功,Thread-2开始连接。。。
'''

注:threading.Eventwait方式还是能够采用二个逾期参数,暗许景况下,如若事件间接还未生出,wait方法会一向不通下去,而到场这些超时参数之后,要是打断时间超越这几个参数设定的值之后,wait方法会重回。对应于上边包车型地铁运用处景,如果mysql服务器一向还未运行,我们希望子线程能够打字与印刷一些日志来不断提醒大家当前不曾一个方可连绵不断的mysql服务,大家就能够安装这几个超时参数来到达这样的目标:

上例代码更改后如下:

from threading import Thread,Event
import threading
import time,random
def conn_mysql():
    count = 1
    while not event.is_set():
        print("33[42m%s 第 <%s> 次尝试连接。。。"%(threading.current_thread().getName(),count))
        event.wait(0.2)
        count+=1
    print("33[45mMysql初始化成功,%s 开始连接。。。33[0m"%(threading.current_thread().getName()))

def check_mysql():
    print('33[41m正在检查mysql。。。33[0m')
    time.sleep(random.randint(1,3))
    event.set()
    time.sleep(random.randint(1,3))

if __name__ == '__main__':
    event=Event()
    t1=Thread(target=conn_mysql) #等待连接mysql
    t2=Thread(target=conn_mysql) #等待连接mysql
    t3=Thread(target=check_mysql) #检查mysql

    t1.start()
    t2.start()
    t3.start()

这样,大家就足以在等待Mysql服务运营的同不经常候,见到职业线程长史在等候的状态。应用:连接池。

并发管理(concurrency
Processing)指一个时光段中有多少个程序都处在已开发银行运营到运维实现之间,且那多少个程序都以在同三个处理机(CPU)上运转,但任一个时刻点上唯有二个主次在处理机(CPU)上运转。

 

4.5 定时器timer

计时器,钦定n秒后施行某操作。

from threading import Timer

def hello():
    print("hello, world")

t = Timer(1, hello)  #1秒后执行任务hello
t.start()   # after 1 seconds, "hello, world" will be printed

图片 2

   

4.6 线程队列queue

queue队列:使用import queue,用法与经过Queue一样。

queue下有两种队列:

  • queue.Queue(maxsize) 先进先出,先放进队列的多寡,先被收取来;
  • queue.LifoQueue(maxsize) 后进先出,(Lifo 意为last in first
    out卡塔尔,后放进队列的数据,先被抽取来
  • queue.PriorityQueue(maxsize) 优先级队列,优先级越高优先抽取来。

举例:
先进先出:

import queue

q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
结果(先进先出):
first
second
third
'''

后进先出:

import queue

q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
结果(后进先出):
third
second
first
'''

初期级队列:

import queue

q=queue.PriorityQueue()
#put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())
'''
结果(数字越小优先级越高,优先级高的优先出队):
(10, 'b')
(20, 'a')
(30, 'c')
'''

现身的非常重借使您有管理八个职责的力量,不显明要同不经常候。并行的要紧是您有同期管理三个义务的力量。所以说,并行是现身的子集。

五、协程

协程:是单线程下的面世,又称微线程、纤程,乌Crane语名:Coroutine协程是大器晚成种顾客态的轻量级线程,协程是由顾客程序自个儿说了算调整的。

亟待重申的是:

1.
python的线程归于基本品级的,即由操作系统调控调整(如单线程生机勃勃旦际遇io就被迫交出cpu实施权限,切换别的线程运营卡塔 尔(英语:State of Qatar)

  1. 单线程内展开协程,大器晚成旦相遇io,从应用程序等级(而非操作系统卡塔 尔(英语:State of Qatar)调控切换

相比操作系统调节线程的切换,顾客在单线程内决定协程的切换,优点如下:

1.
协程的切换成本更加小,归属程序等第的切换,操作系统完全感知不到,因此尤其轻量级

  1. 单线程内就可以完成产出的机能,最大限度地使用cpu。

要兑现协程,关键在于客户程序自个儿主宰程序切换,切换以前必须由顾客程序自个儿童卫生保健留协程上叁次调用时的景况,如此,每便重复调用时,能够从上次之处继续推行

(详细的:协程具有自身的贮存器上下文和栈。协程调整切换时,将贮存器上下文和栈保存到别的地点,在切回到的时候,苏醒原先保存的寄放器上下文和栈)

四、同步与异步

5.1 yield完成协程

大家事先已经学习过大器晚成种在单线程下能够保存程序运转状态的点子,即yield,我们来简单复习一下:

  • yiled能够保留意况,yield的意况保存与操作系统的保留线程状态很像,然而yield是代码等级决定的,更轻量级
  • send能够把一个函数的结果传给其余三个函数,以此完结单线程内程序之间的切换

#不用yield:每次函数调用,都需要重复开辟内存空间,即重复创建名称空间,因而开销很大
import time
def consumer(item):
    # print('拿到包子%s' %item)
    x=11111111111
    x1=12111111111
    x3=13111111111
    x4=14111111111
    y=22222222222
    z=33333333333

    pass
def producer(target,seq):
    for item in seq:
        target(item) #每次调用函数,会临时产生名称空间,调用结束则释放,循环100000000次,则重复这么多次的创建和释放,开销非常大

start_time=time.time()
producer(consumer,range(100000000))
stop_time=time.time()
print('run time is:%s' %(stop_time-start_time)) #30.132838010787964


#使用yield:无需重复开辟内存空间,即重复创建名称空间,因而开销小
import time
def init(func):
    def wrapper(*args,**kwargs):
        g=func(*args,**kwargs)
        next(g)
        return g
    return wrapper

init
def consumer():
    x=11111111111
    x1=12111111111
    x3=13111111111
    x4=14111111111
    y=22222222222
    z=33333333333
    while True:
        item=yield
        # print('拿到包子%s' %item)
        pass
def producer(target,seq):
    for item in seq:
        target.send(item) #无需重新创建名称空间,从上一次暂停的位置继续,相比上例,开销小

start_time=time.time()
producer(consumer(),range(100000000))
stop_time=time.time()
print('run time is:%s' %(stop_time-start_time)) #21.882073879241943

缺点:
协程的实质是单线程下,无法采纳多核,能够是叁个程序开启多个经过,种种进程内张开几个线程,每一个线程内张开协程。
协程指的是单个线程,由此大器晚成旦协程现身堵塞,将会拥塞整个线程。

协程的定义(满足1,2,3就足以叫做协程卡塔 尔(阿拉伯语:قطر‎:

  1. 必须要在唯有三个单线程里金玉锦绣产出
  2. 改革分享数据不需加锁
  3. 客商程序里同生共死保留七个调控流的前后文栈
  4. 外加:三个体协会程碰到IO操作自动切换成别的协程(如何落到实处检验IO,yield、greenlet都心余力绌兑现,就用到了gevent模块(select机制卡塔尔卡塔 尔(阿拉伯语:قطر‎

注意:yield切换在还未有io的情形下仍旧尚未重新开垦内部存款和储蓄器空间的操作,对成效未有怎么升高,甚至更加慢,为此,能够用greenlet来为大家演示这种切换。

在微处理机领域,同步便是指一个进度在施行有些央求的时候,若该恳求须求意气风发段时间技巧回到新闻,那么那几个进度将会平素守候下去,直到收到再次回到音讯才继续施行下去。

5.2 greenlet完毕协程

greenlet是二个用C实现的协程模块,比较与python自带的yield,它能够使你在放肆函数之间自由切换,而不需把那么些函数先注脚为generator。

安装greenlet模块
pip install greenlet

from greenlet import greenlet
import time

def t1():
    print("test1,first")
    gr2.switch()
    time.sleep(5)
    print("test1,second")
    gr2.switch()

def t2():
    print("test2,first")
    gr1.switch()
    print("test2,second")

gr1 = greenlet(t1)
gr2 = greenlet(t2)
gr1.switch()


'''
输出结果:
test1,first
test2,first   #等待5秒
test1,second
test2,second
'''

能够在首先次switch时传入参数

from greenlet import greenlet
import time
def eat(name):
    print("%s eat food 1"%name)
    gr2.switch(name="alex")
    time.sleep(5)
    print("%s eat food 2"%name)
    gr2.switch()

def play_phone(name):
    print("%s play phone 1"%name)
    gr1.switch()
    print("%s play phone 1" % name)

gr1 = greenlet(eat)
gr2 = greenlet(play_phone)
gr1.switch(name="egon")  #可以在第一次switch时传入参数,以后都不需要

注意:greenlet只是提供了黄金年代种比generator越是便利的切换格局,依然未有消除境遇I/O自动切换的难点,而单单的切换,反而会回降程序的施行进度。那就供给选取gevent模块了。

异步是指进度无需直接等下去,而是继续施行别的操作,不管别的进度的情形。当有新闻再次回到时系统会通告进程张开管理,那样能够抓好施行的频率。譬如,打电话时正是联合通讯,发短息时正是异步通信。

5.3 gevent达成协程

gevent是三个第三方库,能够轻巧通过gevent达成产出同步或异步编制程序,在gevent中用到的重视是Greenlet,它是以C扩大模块方式接入Python的轻量级协程。greenlet全副运作在主程操作系统进程的内部,但它们被同盟式地调节和测量试验。欣逢I/O窒碍时会自动切换职务。

注意:gevent有投机的I/O堵塞,如:gevent.sleep()和gevent.socket();但是gevent无法平昔识别除本人之外的I/O梗塞,如:time.sleep(2),socket等,要想识别那一个I/O拥塞,必需打三个补丁:from gevent import monkey;monkey.patch_all()

  • 亟需先安装gevent模块
    pip install gevent

  • 成立三个体协会程对象g1
    g1 =gevent.spawn()
    spawn括号内率先个参数是函数名,如eat,前边能够有多少个参数,能够是岗位实参或珍视字实参,都以传给第多个参数(函数卡塔尔eat的。

from gevent import monkey;monkey.patch_all()
import gevent

def eat():
    print("点菜。。。")
    gevent.sleep(3)   #等待上菜
    print("吃菜。。。")

def play():
    print("玩手机。。。")
    gevent.sleep(5)  #网卡了
    print("看NBA...")

# gevent.spawn(eat)
# gevent.spawn(play)
# print('主') # 直接结束

#因而也需要join方法,进程或现场的jion方法只能join一个,而gevent的joinall方法可以join多个
g1=gevent.spawn(eat)
g2=gevent.spawn(play)
gevent.joinall([g1,g2])  #传一个gevent对象列表。
print("主线程")

"""
输出结果:
点菜。。。
玩手机。。。    
##等待大概3秒       此行没打印
吃菜。。。
##等待大概2秒          此行没打印
看NBA...
主线程
"""

注:上例中的gevent.sleep(3)是未有主见只会借风使船的I/O拥塞。跟time.sleep(3)效果与利益相近。

同步/异步

import gevent
def task(pid):
    """
    Some non-deterministic task
    """
    gevent.sleep(0.5)
    print('Task %s done' % pid)

def synchronous():  #同步执行
    for i in range(1, 10):
        task(i)

def asynchronous(): #异步执行
    threads = [gevent.spawn(task, i) for i in range(10)]
    gevent.joinall(threads)

print('Synchronous:')
synchronous()   #执行后,会顺序打印结果

print('Asynchronous:')
asynchronous()  #执行后,会异步同时打印结果,无序的。

爬虫应用

#协程的爬虫应用

from gevent import monkey;monkey.patch_all()
import gevent
import time
import requests

def get_page(url):
    print("GET: %s"%url)
    res = requests.get(url)
    if res.status_code == 200:
        print("%d bytes received from %s"%(len(res.text),url))

start_time = time.time()
g1 = gevent.spawn(get_page,"https://www.python.org")
g2 = gevent.spawn(get_page,"https://www.yahoo.com")
g3 = gevent.spawn(get_page,"https://www.github.com")
gevent.joinall([g1,g2,g3])
stop_time = time.time()
print("run time is %s"%(stop_time-start_time))

上以代码输出结果:

GET: https://www.python.org
GET: https://www.yahoo.com
GET: https://www.github.com
47714 bytes received from https://www.python.org
472773 bytes received from https://www.yahoo.com
98677 bytes received from https://www.github.com
run time is 2.501142978668213

应用:
通过gevent完结单线程下的socket并发,注意:from gevent import monkey;monkey.patch_all()早晚要放引导入socket模块早先,不然gevent不能够辨别socket的短路。

服务端代码:

from gevent import monkey;monkey.patch_all()
import gevent
from socket import *

class server:
    def __init__(self,ip,port):
        self.ip = ip
        self.port = port


    def conn_cycle(self):   #连接循环
        tcpsock = socket(AF_INET,SOCK_STREAM)
        tcpsock.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
        tcpsock.bind((self.ip,self.port))
        tcpsock.listen(5)
        while True:
            conn,addr = tcpsock.accept()
            gevent.spawn(self.comm_cycle,conn,addr)

    def comm_cycle(self,conn,addr):   #通信循环
        try:
            while True:
                data = conn.recv(1024)
                if not data:break
                print(addr)
                print(data.decode("utf-8"))
                conn.send(data.upper())
        except Exception as e:
            print(e)
        finally:
            conn.close()

s1 = server("127.0.0.1",60000)
print(s1)
s1.conn_cycle()

顾客端代码 :

from socket import *

tcpsock = socket(AF_INET,SOCK_STREAM)
tcpsock.connect(("127.0.0.1",60000))

while True:
    msg = input(">>: ").strip()
    if not msg:continue
    tcpsock.send(msg.encode("utf-8"))
    data = tcpsock.recv(1024)
    print(data.decode("utf-8"))

透过gevent完结产出多少个socket客户端去老是服务端

from gevent import monkey;monkey.patch_all()
import gevent
from socket import *

def client(server_ip,port):
    try:
        c = socket(AF_INET,SOCK_STREAM)
        c.connect((server_ip,port))
        count = 0
        while True:
            c.send(("say hello %s"%count).encode("utf-8"))
            msg = c.recv(1024)
            print(msg.decode("utf-8"))
            count+=1
    except Exception as e:
        print(e)
    finally:
        c.close()

# g_l = []
# for i in range(500):
#     g = gevent.spawn(client,'127.0.0.1',60000)
#     g_l.append(g)
# gevent.joinall(g_l)

#上面注释代码可简写为下面代码这样。

threads = [gevent.spawn(client,"127.0.0.1",60000) for i in range(500)]
gevent.joinall(threads)

比如:

六、IO多路复用

是因为CPU和内部存款和储蓄器的快慢远远胜出外设的进度,所以,在IO编制程序中,就存在速度严重不相称的难点。譬如要把100M的多少写入磁盘,CPU输出100M的数据只供给0.01秒,然而磁盘要收下那100M数目可能要求10秒,有二种格局清除:

经过IO多路复用完成同有时间监听三个端口的服务端

示例一:

# 示例一:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author : Cai Guangyin

from socket import socket
import select

sock_1 = socket()
sock_1.bind(("127.0.0.1",60000))
sock_1.listen(5)

sock_2 = socket()
sock_2.bind(("127.0.0.1",60001))
sock_2.listen(5)

inputs = [sock_1,sock_2]

while True:
    # IO多路复用
    # -- select方法,内部进行循环操作,哪个socket对象有变化(连接),就赋值给r;监听socket文件句柄有个数限制(1024个)
    # -- poll方法,也是内部进行循环操作,没有监听个数限制
    # -- epoll方法,通过异步回调,哪个socket文件句柄有变化,就会自动告诉epoll,它有变化,然后将它赋值给r;
    # windows下没有epoll方法,只有Unix下有,windows下只有select方法
    r,w,e=select.select(inputs,[],[],0.2)  #0.2是超时时间
        #当有人连接sock_1时,返回的r,就是[sock_1,];是个列表
        #当有人连接sock_2时,返回的r,就是[sock_2,];是个列表
        #当有多人同时连接sock_1和sock_2时,返回的r,就是[sock_1,sock_2,];是个列表
        #0.2是超时时间,如果这段时间内没有连接进来,那么r就等于一个空列表;
    for obj in r:
        if obj in [sock_1,sock_2]:

            conn, addr = obj.accept()
            inputs.append(conn)
            print("新连接来了:",obj)

        else:
            print("有连接用户发送消息来了:",obj)
            data = obj.recv(1024)
            if not data:break
            obj.sendall(data)

客户端:

# -*- coding:utf-8 -*-
#!/usr/bin/python
# Author : Cai Guangyin

from socket import *

tcpsock = socket(AF_INET,SOCK_STREAM)   #创建一个tcp套接字
tcpsock.connect(("127.0.0.1",60001))     #根据地址连接服务器

while True:   #客户端通信循环
    msg = input(">>: ").strip()   #输入消息
    if not msg:continue           #判断输入是否为空
        #如果客户端发空,会卡住,加此判断,限制用户不能发空
    if msg == 'exit':break       #退出
    tcpsock.send(msg.encode("utf-8"))   #socket只能发送二进制数据
    data = tcpsock.recv(1024)    #接收消息
    print(data.decode("utf-8"))

tcpsock.close()

如上服务端运维时,如若有客商端断开连接则会抛出如下万分:

图片 3

异常

  1. CPU等着,也正是前后相继暂停实践后续代码,等100M的数据在10秒后写入磁盘,再接着往下施行,这种形式称为同步IO
  2. CPU不等待,只是告诉磁盘,慢慢写不急急,写完布告本人,笔者随着干其余事去了,于是继续代码可以随着实践,这种格局称为异步IO

改善版如下

征集万分并将选择数据和发送数据分开处理
示例二:

# 示例二
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author : Cai Guangyin

from socket import *
import select

sk1 = socket(AF_INET,SOCK_STREAM)
sk1.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
sk1.bind(("127.0.0.1",60000))
sk1.listen(5)

sk2 = socket(AF_INET,SOCK_STREAM)
sk2.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
sk2.bind(("127.0.0.1",60001))
sk2.listen(5)


inputs = [sk1,sk2]
w_inputs = []

while True:
    r,w,e = select.select(inputs,w_inputs,inputs,0.1)
    for obj in r:
        if obj in [sk1,sk2]:
            print("新连接:",obj.getsockname())
            conn,addr = obj.accept()
            inputs.append(conn)

        else:
            try:
                # 如果客户端断开连接,将获取异常,并将收取数据data置为空
                data = obj.recv(1024).decode('utf-8')
                print(data)
            except Exception as e:
                data = ""

            if data:
                # 如果obj能正常接收数据,则认为它是一个可写的对象,然后将它加入w_inputs列表
                w_inputs.append(obj)
            else:
                # 如果数据data为空,则从inputs列表中移除此连接对象obj
                print("空消息")
                obj.close()
                inputs.remove(obj)


        print("分割线".center(60,"-"))

    # 遍历可写的对象列表,
    for obj in w:
        obj.send(b'ok')
        # 发送数据后删除w_inputs中的此obj对象,否则客户端断开连接时,会抛出”ConnectionResetError“异常
        w_inputs.remove(obj)

五、threading模块

七、socketserver完毕产出

据他们说TCP的套接字,关键正是多个巡回,一个老是循环,叁个通讯循环。

SocketServer内部利用 IO多路复用 以至 “多线程” 和 “多进度”
,进而完成产出管理八个顾客端伏乞的Socket服务端。即:每一个顾客端诉求连接到服务器时,Socket服务端都会在服务器是创制二个“线程”只怕“进度”
专责管理当下顾客端的有所诉求。

socketserver模块中的类分为两大类:server类(排除链接难题卡塔尔国和request类(消除通讯难点卡塔 尔(英语:State of Qatar)

server类:

图片 4

server类

request类:

图片 5

request类

线程server类的继续关系:

图片 6

线程server类的世襲关系

进度server类的存在延续关系:

图片 7

经过server类的接续关系

request类的后续关系:

图片 8

request类的三回九转关系

以下述代码为例,解析socketserver源码:

ftpserver=socketserver.ThreadingTCPServer(('127.0.0.1',8080),FtpServer)
ftpserver.serve_forever()

找出属性的逐风流罗曼蒂克:ThreadingTCPServer –> ThreadingMixIn –>
TCPServer->BaseServer

  1. 实例化获得ftpserver,先找类ThreadingTCPServer__init__,在TCPServer中找到,进而施行server_bind,server_active
  2. ftpserver下的serve_forever,在BaseServer中找到,进而实践self._handle_request_noblock(),该方法雷同是在BaseServer
  3. 执行self._handle_request_noblock()随后推行request, client_address = self.get_request()(就是TCPServer中的self.socket.accept()卡塔尔国,然后实践self.process_request(request, client_address)
  4. ThreadingMixIn中找到process_request,开启八线程应对现身,进而实践process_request_thread,执行self.finish_request(request, client_address)
  5. 上述四片段形成了链接循环,本有的开头进入拍卖通信部分,在BaseServer中找到finish_request,触发我们相濡相呴定义的类的实例化,去找__init__措施,而大家温馨定义的类未有该形式,则去它的父类也正是BaseRequestHandler中找….

源码分析总结:
依据tcp的socketserver大家和好定义的类中的

  • self.server 即套接字对象
  • self.request 即贰个链接
  • self.client_address 即顾客端地址

基于udp的socketserver大家和好定义的类中的

  • self.request是一个元组(第多少个成分是客商端发来的数码,第二有个别是服务端的udp套接字对象卡塔 尔(阿拉伯语:قطر‎,如(b'adsf', <socket.socket fd=200, family=AddressFamily.AF_INET, type=SocketKind.SOCK_DGRAM, proto=0, laddr=('127.0.0.1', 8080)>)
  • self.client_address即客商端地址。

线程是操作系统直接扶持的施行单元,由此,高级语言平日都内置三十二线程的支撑,Python也不例外,况兼,Python的线程是真的的Posix
Thread,实际不是仿照出来的线程。

6.1 ThreadingTCPServer

ThreadingTCPServer完毕的Soket服务器内部会为各类client创制贰个“线程”,该线程用来和客商端进行人机联作。

使用ThreadingTCPServer:

  • 始建二个接续自 SocketServer.BaseRequestHandler 的类
  • 类中必得定义贰个称呼为 handle 的法子
  • 启动ThreadingTCPServer。
  • 启动serve_forever() 链接循环

服务端:

import socketserver

class MyServer(socketserver.BaseRequestHandler):
    def handle(self):
        conn = self.request
        # print(addr)
        conn.sendall("欢迎致电10086,请输入1XXX,0转人工服务。".encode("utf-8"))
        Flag = True
        while Flag:
            data = conn.recv(1024).decode("utf-8")
            if data == "exit":
                Flag = False
            elif data == '0':
                conn.sendall("您的通话可能会被录音。。。".encode("utf-8"))
            else:
                conn.sendall("请重新输入。".encode('utf-8'))

if __name__ == '__main__':
    server = socketserver.ThreadingTCPServer(("127.0.0.1",60000),MyServer)
    server.serve_forever()  #内部实现while循环监听是否有客户端请求到达。

客户端:

import socket

ip_port = ('127.0.0.1',60000)
sk = socket.socket()
sk.connect(ip_port)
sk.settimeout(5)

while True:
    data = sk.recv(1024).decode("utf-8")
    print('receive:',data)
    inp = input('please input:')
    sk.sendall(inp.encode('utf-8'))
    if inp == 'exit':
        break
sk.close()

Python的规范库提供了多少个模块:_threadthreading_thread是下等模块,threading是高档模块,对_thread开展了包装。绝大超多境况下,咱们只供给使用threading其生机勃勃高端模块。

七、基于UDP的套接字

  • recvfrom(buffersize[, flags])收纳消息,buffersize是一次选择多少个字节的多寡。
  • sendto(data[, flags], address)
    发送新闻,data是要发送的二进制数据,address是要发送的地点,元组情势,富含IP和端口

服务端:

from socket import *
s=socket(AF_INET,SOCK_DGRAM)  #创建一个基于UDP的服务端套接字,注意使用SOCK_DGRAM类型
s.bind(('127.0.0.1',8080))  #绑定地址和端口,元组形式

while True:    #通信循环
    client_msg,client_addr=s.recvfrom(1024) #接收消息
    print(client_msg)
    s.sendto(client_msg.upper(),client_addr) #发送消息

客户端:

from socket import *
c=socket(AF_INET,SOCK_DGRAM)   #创建客户端套接字

while True:
    msg=input('>>: ').strip()
    c.sendto(msg.encode('utf-8'),('127.0.0.1',8080)) #发送消息
    server_msg,server_addr=c.recvfrom(1024) #接收消息
    print('from server:%s msg:%s' %(server_addr,server_msg))

仿照即时闲谈
由于UDP无连接,所以能够而且七个顾客端去跟服务端通讯

服务端:

from socket import *

server_address = ("127.0.0.1",60000)
udp_server_sock = socket(AF_INET,SOCK_DGRAM)
udp_server_sock.bind(server_address)

while True:
    qq_msg,addr = udp_server_sock.recvfrom(1024)
    print("来自[%s:%s]的一条消息:33[32m%s33[0m"%(addr[0],addr[1],qq_msg.decode("utf-8")))
    back_msg = input("回复消息:").strip()
    udp_server_sock.sendto(back_msg.encode("utf-8"),addr)

udp_server_sock.close()

客户端:

from socket import *

BUFSIZE = 1024
udp_client_sock = socket(AF_INET,SOCK_DGRAM)
qq_name_dic = {
    "alex":("127.0.0.1",60000),
    "egon":("127.0.0.1",60000),
    "seven":("127.0.0.1",60000),
    "yuan":("127.0.0.1",60000),
}

while True:
    qq_name = input("请选择聊天对象:").strip()
    while True:
        msg = input("请输入消息,回车发送:").strip()
        if msg == "quit":break
        if not msg or not qq_name or qq_name not in qq_name_dic:continue
        print(msg,qq_name_dic[qq_name])
        udp_client_sock.sendto(msg.encode("utf-8"),qq_name_dic[qq_name])

        back_msg,addr = udp_client_sock.recvfrom(BUFSIZE)
        print("来自[%s:%s]的一条消息:33[32m%s33[0m" %(addr[0],addr[1],back_msg.decode("utf-8")))
udp_client_sock.close()

注意:
1.你独自运营方面包车型大巴udp的顾客端,你意识并不会报错,相反tcp却会报错,因为udp协议只承受把包发出去,对方收不收,作者历来不管,而tcp是借助链接的,必得有三个服务端先运营着,顾客端去跟服务端创设链接然后依托于链接能力传递音讯,任何一方试图把链接摧毁都会引致对方程序的倒台。

2.上边的udp程序,你注释任何一条客户端的sendinto,服务端都会堵塞,为何?因为服务端有多少个recvfrom就要对应多少个sendinto,哪怕是sendinto(b”)那也要有。

3.recvfrom(buffersize)如果设置每趟选用数据的字节数,小于对方发送的数目字节数,假使运维Linux蒙受下,则只会接到到recvfrom()所设置的字节数的数码;而借使运营windows情形下,则会报错。

基于socketserver落实三十二线程的UDP服务端:

import socketserver

class MyUDPhandler(socketserver.BaseRequestHandler):
    def handle(self):
        client_msg,s=self.request
        s.sendto(client_msg.upper(),self.client_address)

if __name__ == '__main__':
    s=socketserver.ThreadingUDPServer(('127.0.0.1',60000),MyUDPhandler)
    s.serve_forever()

1. 调用Thread类直接开立

开发银行一个线程正是把一个函数字传送入并创设Thread实例,然后调用start()千帆竞发实行:

图片 9图片 10

 1 import time, threading
 2 
 3 # 新线程执行的代码:
 4 def loop():
 5     print('thread %s is running...' % threading.current_thread().name)
 6     n = 0
 7     while n < 5:
 8         n = n + 1
 9         print('thread %s >>> %s' % (threading.current_thread().name, n))
10         time.sleep(1)
11     print('thread %s ended.' % threading.current_thread().name)
12 
13 print('thread %s is running...' % threading.current_thread().name)
14 t = threading.Thread(target=loop, name='LoopThread')
15 t.start()
16 t.join()
17 print('thread %s ended.' % threading.current_thread().name)
18 
19 
20 #运行结果:
21 #thread MainThread is running...
22 # thread LoopThread is running...
23 # thread LoopThread >>> 1
24 # thread LoopThread >>> 2
25 # thread LoopThread >>> 3
26 # thread LoopThread >>> 4
27 # thread LoopThread >>> 5
28 # thread LoopThread ended.
29 # thread MainThread ended.

实例1

由于其余进度默许就能够运转多少个线程,我们把该线程称为主线程,主线程又足以运行新的线程,Python的threading模块有个current_thread()函数,它世代重返当前线程的实例。主线程实例的名字叫MainThread,子线程的名字在开登时钦命,大家用LoopThread命名子线程。名字只是在打字与印刷时用来体现,完全未有别的意思,若是不起名字Python就活动给线程命名称为Thread-1Thread-2……

图片 11图片 12

 1 import threading
 2 import time
 3 
 4 def countNum(n): # 定义某个线程要运行的函数
 5 
 6     print("running on number:%s" %n)
 7 
 8     time.sleep(3)
 9 
10 if __name__ == '__main__':
11 
12     t1 = threading.Thread(target=countNum,args=(23,)) #生成一个线程实例
13     t2 = threading.Thread(target=countNum,args=(34,))
14 
15     t1.start() #启动线程
16     t2.start()
17 
18     print("ending!")
19 
20 
21 #运行结果:程序打印完“ending!”后等待3秒结束
22 #running on number:23
23 #running on number:34
24 #ending!

实例2

该实例中国共产党有3个线程:主线程,t1和t2子线程

图片 13

 

2. 自定义Thread类继承式创制

图片 14图片 15

 1 #继承Thread式创建
 2 
 3 import threading
 4 import time
 5 
 6 class MyThread(threading.Thread):
 7 
 8     def __init__(self,num):
 9         threading.Thread.__init__(self)    #继承父类__init__
10         self.num=num
11 
12     def run(self):    #必须定义run方法
13         print("running on number:%s" %self.num)
14         time.sleep(3)
15 
16 t1=MyThread(56)
17 t2=MyThread(78)
18 
19 t1.start()
20 t2.start()
21 print("ending")

View Code

3. Thread类的实例方法

join和dameon

图片 16图片 17

 1 import threading
 2 from time import ctime,sleep
 3 
 4 def Music(name):
 5 
 6         print ("Begin listening to {name}. {time}".format(name=name,time=ctime()))
 7         sleep(3)
 8         print("end listening {time}".format(time=ctime()))
 9 
10 def Blog(title):
11 
12         print ("Begin recording the {title}. {time}".format(title=title,time=ctime()))
13         sleep(5)
14         print('end recording {time}'.format(time=ctime()))
15 
16 
17 threads = []
18 
19 
20 t1 = threading.Thread(target=Music,args=('FILL ME',))
21 t2 = threading.Thread(target=Blog,args=('',))
22 
23 threads.append(t1)
24 threads.append(t2)
25 
26 if __name__ == '__main__':
27 
28     #t2.setDaemon(True)
29 
30     for t in threads:
31 
32         #t.setDaemon(True) #注意:一定在start之前设置
33         t.start()
34 
35         #t.join()
36 
37     #t1.join()
38     #t2.join()    #  考虑这三种join位置下的结果?
39 
40     print ("all over %s" %ctime())

join和setDaemon

别的方式:

1 Thread实例对象的方法
2   # isAlive(): 返回线程是否活动的。
3   # getName(): 返回线程名。
4   # setName(): 设置线程名。
5 
6 threading模块提供的一些方法:
7   # threading.currentThread(): 返回当前的线程变量。
8   # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
9   # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

六、GIL

'''

定义:
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple 
native threads from executing Python bytecodes at once. This lock is necessary mainly 
because CPython’s memory management is not thread-safe. (However, since the GIL 
exists, other features have grown to depend on the guarantees that it enforces.)

'''

Python中的线程是操作系统的原生线程,Python虚构机使用三个大局解释器锁(Global
Interpreter
Lock卡塔 尔(英语:State of Qatar)来互斥线程对Python虚构机的应用。为了辅助多线程机制,叁个骨干的渴求便是内需达成分化线程对分享财富访谈的排挤,所以引进了GIL。
GIL:在三个线程具有驾驭释器的访谈权之后,别的的拥有线程都一定要等待它释放解释器的访问权,就算那一个线程的下一条指令并不会相互影响。
在调用任何Python C API从前,要先得到GIL
GIL劣势:多微处理机退化为单微处理机;优点:防止大量的加锁解锁操作。

1.
GIL的早先时代设计

Python帮衬四十八线程,而消除二十四线程之间数据完整性和情景同步的最简易方法自然正是加锁。
于是有了GIL那把相当的大锁,而当越多的代码库开辟者选取了这种设定后,他们开端大批量看重这种特点(即暗中同意python内部对象是thread-safe的,无需在实现时思考外加的内部存款和储蓄器锁和同步操作卡塔尔。稳步的这种落成格局被察觉是蛋疼且没用的。但当大家试图去拆分和去除GIL的时候,开采大批量库代码开垦者现已重度依赖GIL而极其难以去除了。有多难?做个类比,像MySQL这样的“小品种”为了把Buffer
Pool
Mutex那把大锁拆分成各种小锁也花了从5.5到5.6再到5.7七个大版为期近5年的小运,而且仍在这里起彼伏。MySQL这些背后有厂家辅助且有稳固支出团队的制品走的如此困难,这又加以Python那样基本开垦和代码进献者中度社区化的团伙吗?

2.
GIL的影响

任由你启几个线程,你有几个cpu,
Python在施行多少个进度的时候会淡定的在相同一时间刻只同意八个线程运维。
于是,python是敬敏不谢利用多核CPU达成多线程的。
这么,python对于计算密集型的职务开二十四线程的作用以致不比串行(未有大气切换),不过,对于IO密集型的任务功能依然有举世瞩目晋级的。

图片 18

测算密集型实例:

图片 19图片 20

 1 #coding:utf8
 2 from threading import Thread
 3 import time
 4 
 5 def counter():
 6     i = 0
 7     for _ in range(100000000):
 8         i = i + 1
 9     return True
10 
11 
12 def main():
13     l=[]
14     start_time = time.time()
15     for i in range(2):
16 
17         t = Thread(target=counter)
18         t.start()
19         l.append(t)
20         t.join()
21 
22     for t in l:
23         t.join()
24     # counter()
25     # counter()
26     end_time = time.time()
27     print("Total time: {}".format(end_time - start_time))
28 
29 if __name__ == '__main__':
30     main()
31 
32 
33 '''
34 py2.7:
35      串行:9.17599987984s
36      并发:9.26799988747s
37 py3.6:
38      串行:9.540389776229858s
39      并发:9.568442583084106s
40 
41 '''

测算密集型,三十二线程并发比较串行,未有显著优势

3. 消除方案

用multiprocessing代替Thread
multiprocessing库的产出相当的大程度上是为了弥补thread库因为GIL而无效的欠缺。它完全的复制了生机勃勃套thread所提供的接口方便迁移。独一的例外就是它使用了多进度并非八线程。种种进程有本人的单身的GIL,由此也不会忍俊不禁进程之间的GIL争抢。

图片 21图片 22

 1 #coding:utf8
 2 from multiprocessing import Process
 3 import time
 4 
 5 def counter():
 6     i = 0
 7     for _ in range(100000000):
 8         i = i + 1
 9 
10     return True
11 
12 def main():
13 
14     l=[]
15     start_time = time.time()
16 
17     # for _ in range(2):
18     #     t=Process(target=counter)
19     #     t.start()
20     #     l.append(t)
21     #     #t.join()
22     #
23     # for t in l:
24     #    t.join()
25     counter()
26     counter()
27     end_time = time.time()
28     print("Total time: {}".format(end_time - start_time))
29 
30 if __name__ == '__main__':
31     main()
32 
33 
34 '''
35 
36 py2.7:
37      串行:8.92299985886 s
38      并行:8.19099998474 s
39 
40 py3.6:
41      串行:9.963459014892578 s
42      并发:5.1366541385650635 s
43 
44 '''

multiprocess多进度实现并发运算能够提高成效

理当如此multiprocessing亦不是万能良药。它的引进会增添程序达成时线程间数据通信和一块的劳顿。就拿计数器来譬喻子,借使大家要多个线程累积同二个变量,对于thread来讲,申美赞臣个global变量,用thread.Lock的context包裹住,三行就化解了。而multiprocessing由于经过之间无法见到对方的数额,只可以通过在主线程注脚意气风发(Wissu卡塔 尔(阿拉伯语:قطر‎个Queue,put再get恐怕用share
memory的方法。那些额外的落实资本使得本来就老大忧伤的三三十二线程程序编码,变得进一层难熬了。

总结:因为GIL的存在,只有IO
Bound场景下的五十多线程会获得较好的个性进步;假若对并行总计品质较高的次序能够思索把核心部分改为C模块,可能索性用别样语言完毕;GIL在较长生龙活虎段时间内将会持续存在,然而会不断对其进展改进。

七、同步锁(lock)

八线程和多进度最大的不等在于,多进程中,同三个变量,各自有大器晚成份拷贝存在于每种进度中,互不影响,而八线程中,全部变量都由所有线程分享,所以,任何三个变量都能够被别的二个线程改过,由此,线程之间分享数据最大的危险在于八个线程同一时候改二个变量,把内容给改乱了。

图片 23图片 24

 1 import time
 2 import threading
 3 
 4 def subNum():
 5     global num #在每个线程中都获取这个全局变量
 6     temp = num
 7     time.sleep(0.1)
 8     num =temp-1  # 对此公共变量进行-1操作
 9 
10 num = 100  #设定一个共享变量
11 thread_list = []
12 
13 for i in range(100):
14     t = threading.Thread(target=subNum)
15     t.start()
16     thread_list.append(t)
17 
18 for t in thread_list: #等待所有线程执行完毕
19     t.join()
20 
21 print('Result: ', num)
22 
23 
24 #运行结果:
25 #Result:  99

多线程分享变量,不能够作保变量安全

如上实例,在三个进度内,设置分享变量num=100,然后成立玖十九个线程,施行num-=1的操作,可是,由于在函数subNum中设有time.sleep(0.1),该语句能够等价于IO操作。于是在此短小0.1秒的日子内,全体的线程已经创办并运营,得到了num=100的变量,等待0.1秒过后,最后得到的num其实是99.

锁经常被用来得以实现对分享能源的一块访问。为每多个分享能源创造贰个Lock对象,当你供给寻访该资源时,调用acquire方法来得到锁对象(假使其余线程已经赢得了该锁,则当前线程需等候其被放飞卡塔 尔(阿拉伯语:قطر‎,待财富访谈完后,再调用release方法释放锁:

图片 25图片 26

 1 import time
 2 import threading
 3 
 4 def subNum():
 5     global num #在每个线程中都获取这个全局变量
 6     lock.acquire()
 7     temp = num
 8     time.sleep(0.1)
 9     num =temp-1  # 对此公共变量进行-1操作
10     lock.release()
11 
12 
13 num = 100  #设定一个共享变量
14 lock = threading.Lock()    #生成一个同步锁对象
15 thread_list = []
16 
17 for i in range(100):
18     t = threading.Thread(target=subNum)
19     t.start()
20     thread_list.append(t)
21 
22 for t in thread_list: #等待所有线程执行完毕
23     t.join()
24 
25 print('Result: ', num)
26 
27 #运行结果:
28 #Result:  0

应用lock方法,保证变量安全

 

lock.acquire()与lock.release()包起来的代码段,保险平等时刻只同意叁个线程援用。

1 import threading
2 
3 R=threading.Lock()
4 
5 R.acquire()
6 '''
7 对公共数据的操作
8 '''
9 R.release()

八、死锁与递归锁

所谓死锁:
是指几个或三个以上的经过或线程在实行进度中,因争夺财富而导致的风流倜傥种相互等待的景观,若无外力作用,它们都将无法推动下去。这个时候称系统处于死锁状态或类别发生了死锁,那么些永远在人机联作等待的经过称为死锁进度。

图片 27图片 28

 1 import threading
 2 import time
 3 
 4 mutexA = threading.Lock()
 5 mutexB = threading.Lock()
 6 
 7 class MyThread(threading.Thread):
 8 
 9     def __init__(self):
10         threading.Thread.__init__(self)
11 
12     def run(self):
13         self.fun1()
14         self.fun2()
15 
16     def fun1(self):
17 
18         mutexA.acquire()  # 如果锁被占用,则阻塞在这里,等待锁的释放
19 
20         print("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
21 
22         mutexB.acquire()
23         print("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
24         mutexB.release()
25 
26         mutexA.release()
27 
28 
29     def fun2(self):
30 
31         mutexB.acquire()
32         print("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
33         time.sleep(0.2)
34 
35         mutexA.acquire()
36         print("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
37         mutexA.release()
38 
39         mutexB.release()
40 
41 if __name__ == "__main__":
42 
43     print("start---------------------------%s"%time.time())
44 
45     for i in range(0, 10):
46         my_thread = MyThread()
47         my_thread.start()
48 
49 
50 
51 #运行结果:
52 #start---------------------------1494316634.4121563
53 #I am Thread-1 , get res: ResA---1494316634.4121563
54 #I am Thread-1 , get res: ResB---1494316634.4121563
55 #I am Thread-1 , get res: ResB---1494316634.4121563
56 #I am Thread-2 , get res: ResA---1494316634.4121563

死锁实例

 

在Python中为了帮忙在同一线程中频仍号召同一资源,python提供了可重入锁PRADOLock。这一个EscortLock内部维护着三个Lock和一个counter变量,counter记录了acquire的次数,进而使得财富能够被反复require。直到三个线程全数的acquire都被release,别的的线程才干得到能源。上边的例子借使接纳LX570Lock替代Lock,则不会时有爆发死锁:

图片 29图片 30

 1 import threading
 2 import time
 3 
 4 # mutexA = threading.Lock()
 5 # mutexB = threading.Lock()
 6 rlock = threading.RLock()
 7 
 8 class MyThread(threading.Thread):
 9 
10     def __init__(self):
11         threading.Thread.__init__(self)
12 
13     def run(self):
14         self.fun1()
15         self.fun2()
16 
17     def fun1(self):
18         rlock.acquire()  # 如果锁被占用,则阻塞在这里,等待锁的释放
19 
20         print("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
21 
22         rlock.acquire()
23         print("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
24         rlock.release()
25 
26         rlock.release()
27 
28 
29     def fun2(self):
30         rlock.acquire()
31         print("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
32         time.sleep(0.2)
33 
34         rlock.acquire()
35         print("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
36         rlock.release()
37 
38         rlock.release()
39 
40 if __name__ == "__main__":
41 
42     print("start---------------------------%s"%time.time())
43 
44     for i in range(0, 10):
45         my_thread = MyThread()
46         my_thread.start()
47 
48 
49 #运行结果:从以下结果也可以发现,线程之间是竞争关系
50 """
51 start---------------------------1494316940.0863945
52 I am Thread-1 , get res: ResA---1494316940.0873976
53 I am Thread-1 , get res: ResB---1494316940.0873976
54 I am Thread-1 , get res: ResB---1494316940.0873976
55 I am Thread-1 , get res: ResA---1494316940.287911
56 I am Thread-2 , get res: ResA---1494316940.287911
57 I am Thread-2 , get res: ResB---1494316940.287911
58 I am Thread-2 , get res: ResB---1494316940.287911
59 I am Thread-2 , get res: ResA---1494316940.4883447
60 I am Thread-4 , get res: ResA---1494316940.4883447
61 I am Thread-4 , get res: ResB---1494316940.4883447
62 I am Thread-4 , get res: ResB---1494316940.4883447
63 I am Thread-4 , get res: ResA---1494316940.6886203
64 I am Thread-6 , get res: ResA---1494316940.6886203
65 I am Thread-6 , get res: ResB---1494316940.6896234
66 I am Thread-6 , get res: ResB---1494316940.6896234
67 I am Thread-6 , get res: ResA---1494316940.890659
68 I am Thread-8 , get res: ResA---1494316940.890659
69 I am Thread-8 , get res: ResB---1494316940.890659
70 I am Thread-8 , get res: ResB---1494316940.890659
71 I am Thread-8 , get res: ResA---1494316941.0918815
72 I am Thread-10 , get res: ResA---1494316941.0918815
73 I am Thread-10 , get res: ResB---1494316941.0918815
74 I am Thread-10 , get res: ResB---1494316941.0918815
75 I am Thread-10 , get res: ResA---1494316941.2923715
76 I am Thread-5 , get res: ResA---1494316941.2923715
77 I am Thread-5 , get res: ResB---1494316941.2923715
78 I am Thread-5 , get res: ResB---1494316941.2923715
79 I am Thread-5 , get res: ResA---1494316941.493138
80 I am Thread-9 , get res: ResA---1494316941.493138
81 I am Thread-9 , get res: ResB---1494316941.493138
82 I am Thread-9 , get res: ResB---1494316941.493138
83 I am Thread-9 , get res: ResA---1494316941.6937861
84 I am Thread-7 , get res: ResA---1494316941.6937861
85 I am Thread-7 , get res: ResB---1494316941.6937861
86 I am Thread-7 , get res: ResB---1494316941.6937861
87 I am Thread-7 , get res: ResA---1494316941.8946414
88 I am Thread-3 , get res: ResA---1494316941.8946414
89 I am Thread-3 , get res: ResB---1494316941.8946414
90 I am Thread-3 , get res: ResB---1494316941.8946414
91 I am Thread-3 , get res: ResA---1494316942.0956843
92 """

递归锁化解死锁

九、event对象

线程的三个首要特性是种种线程都是单身运作且情状不行预测。假设程序中的其余线程要求通过判别有个别线程的情况来规定自身下一步的操作,这时候线程同步难题就能够变得相当费事。为了消除这个难点,大家须要运用threading库中的伊夫nt对象。对象包蕴叁个可由线程设置的实信号标记,它同意线程等待某个事件的发生。在最早情状下,伊夫nt对象中的随机信号标识被安装为False。即使有线程等待贰个伊芙nt对象,
而那么些Event对象的注解为False,那么那一个线程将会被一直不通直至该标志为True。多个线程假若将三个伊芙nt对象的频限信号标记设置为True,它将唤起全部等待那么些伊夫nt对象的线程。如若贰个线程等待贰个业已被设置为实在伊夫nt对象,那么它将忽视那一个事件,
继续奉行。

event.isSet():返回event的状态值;

event.wait():如果 event.isSet()==False将阻塞线程;

event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;

event.clear():恢复event的状态值为False。

图片 31

 

能够设想意气风发种选取场景(仅仅看做评释卡塔尔国,比方,大家有七个线程从Redis队列中读取数据来拍卖,那几个线程都要尝尝去连接Redis的劳务,日常情状下,如若Redis连接不成事,留意气风发一线程的代码中,都会去品味再一次连接。假使大家想要在起步时确认保证Redis服务符合规律,才让那个工作线程去连接Redis服务器,那么我们就足以利用threading.Event机制来协和各种工作线程的连续几日操作:主线程中会去品尝连接Redis服务,若是正常的话,触发事件,各专业线程会尝试连接Redis服务。

图片 32图片 33

 1 import threading
 2 import time
 3 import logging
 4 
 5 logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s',)
 6 
 7 def worker(event):
 8     logging.debug('Waiting for redis ready...')
 9     while not event.isSet():
10         logging.debug('connect failed...')
11         event.wait(1)
12     logging.debug('redis ready, and connect to redis server and do some work [%s]', time.ctime())
13     time.sleep(1)
14 
15 def main():
16     readis_ready = threading.Event()
17     t1 = threading.Thread(target=worker, args=(readis_ready,), name='t1')
18     t1.start()
19 
20     t2 = threading.Thread(target=worker, args=(readis_ready,), name='t2')
21     t2.start()
22 
23     logging.debug('first of all, check redis server, make sure it is OK, and then trigger the redis ready event')
24     time.sleep(3) # simulate the check progress
25     logging.debug('redis server is running')
26     readis_ready.set()
27 
28 if __name__=="__main__":
29     main()
30 
31 
32 #运行结果:
33 (t1        ) Waiting for redis ready...
34 # (t1        ) connect failed...
35 # (t2        ) Waiting for redis ready...
36 # (t2        ) connect failed...
37 # (MainThread) first of all, check redis server, make sure it is OK, and then trigger the redis ready event
38 # (t1        ) connect failed...
39 # (t2        ) connect failed...
40 # (t2        ) connect failed...
41 # (t1        ) connect failed...
42 # (MainThread) redis server is running
43 # (t2        ) redis ready, and connect to redis server and do some work [Tue May  9 16:15:18 2017]
44 # (t1        ) redis ready, and connect to redis server and do some work [Tue May  9 16:15:18 2017]

监听Redis服务

十、Semaphore(信号量)

Semaphore管理一个放置的计数器,
每当调用acquire()时内置计数器-1;
调用release() 时内置流量计+1;
流速计不能小于0;当流速計为0时,acquire()将拥塞线程直到别的线程调用release()。

实例:(相同的时间独有5个线程能够得到semaphore,即能够界定最洛桑接数为5):

图片 34图片 35

 1 import threading
 2 import time
 3 
 4 semaphore = threading.Semaphore(5)
 5 
 6 def func():
 7     if semaphore.acquire():
 8         print (threading.currentThread().getName() + ' get semaphore')
 9         time.sleep(2)
10         semaphore.release()
11 
12 for i in range(20):
13   t1 = threading.Thread(target=func)
14   t1.start()
15 
16 
17 #运行结果:
18 # Thread-1 get semaphore
19 # Thread-2 get semaphore
20 # Thread-3 get semaphore
21 # Thread-4 get semaphore
22 # Thread-5 get semaphore
23 # Thread-6 get semaphore#隔2秒打印
24 # Thread-7 get semaphore
25 # Thread-8 get semaphore
26 # Thread-9 get semaphore
27 # Thread-10 get semaphore
28 # Thread-11 get semaphore#隔2秒打印
29 # Thread-12 get semaphore
30 # Thread-13 get semaphore
31 # Thread-14 get semaphore
32 # Thread-15 get semaphore
33 # Thread-16 get semaphore#隔2秒打印
34 # Thread-17 get semaphore
35 # Thread-18 get semaphore
36 # Thread-20 get semaphore
37 # Thread-19 get semaphore

semaphore实例

十一、multiprocessing

Multiprocessing is a package that supports spawning processes using an API similar to the threading module. 
The multiprocessing package offers both local and remote concurrency,effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. 
Due to this, the multiprocessing module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.

 

由于GIL的留存,python中的八线程其实并不是的确的多线程,假如想要丰硕地采纳多核CPU的资源,在python中大部景色要求使用多进度。

multiprocessing包是python中的多进程管理包。与threading.Thread相符,它可以行使multiprocessing.Process对象来创设二个经过。该进度能够运作在Python程序内部编写的函数。该Process对象与Thread对象的用法相近,也可以有start(),
run(),
join()的法子。别的multiprocessing包中也会有Lock/Event/塞马phore/Condition类
(这么些指标足以像八线程那样,通过参数字传送递给各样进度),用以同步进度,其用法与threading包中的同名类意气风发致。所以,multiprocessing的不小学一年级部份与threading使用同豆蔻梢头套API,只但是换成了多进度的地步。

图片 36图片 37

 1 from multiprocessing import Process
 2 import time
 3 def f(name):
 4 
 5     print('hello', name,time.ctime())
 6     time.sleep(1)
 7 
 8 if __name__ == '__main__':
 9     p_list=[]
10     for i in range(3):
11         p = Process(target=f, args=('alvin:%s'%i,))
12         p_list.append(p)
13         p.start()
14     for i in p_list:
15         p.join()
16     print('end')
17 
18 
19 #运行结果:
20 #hello alvin:0 Tue May  9 16:41:18 2017
21 #hello alvin:1 Tue May  9 16:41:18 2017
22 #hello alvin:2 Tue May  9 16:41:18 2017
23 #end

Process类调用

 

 

图片 38图片 39

 1 from multiprocessing import Process
 2 import time
 3 
 4 class MyProcess(Process):
 5     def __init__(self):
 6         super(MyProcess, self).__init__()
 7 
 8     def run(self):
 9 
10         print ('hello', self.name,time.ctime())
11         time.sleep(1)
12 
13 
14 if __name__ == '__main__':
15     p_list=[]
16     for i in range(3):
17         p = MyProcess()
18         p.start()
19         p_list.append(p)
20 
21     for p in p_list:
22         p.join()
23 
24     print('end')
25 
26 
27 #运行结果:
28 #hello MyProcess-1 Tue May  9 16:42:46 2017
29 #hello MyProcess-2 Tue May  9 16:42:46 2017
30 #hello MyProcess-3 Tue May  9 16:42:46 2017
31 #end

继承Process类调用

process类:

构造方法:

Process([group [, target [, name [, args [, kwargs]]]]])

  group: 线程组,近期还不曾达成,库引用中升迁必需是None; 
  target: 要施行的艺术; 
  name: 进程名; 
  args/kwargs: 要传入方法的参数。

实例方法:

  is_alive():再次来到进程是否在运维。

  join([timeout]):拥塞当前上下文情形的进度程,直到调用此方法的历程终止或抵达钦点的timeout(可选参数卡塔尔。

  start():进度思索伏贴,等待CPU调解

  run():strat()调用run方法,借使实例进度时未制订传入target,那star实施t私下认可run()方法。

  terminate():不管任务是否完毕,立刻终止职业进程

属性:

  daemon:和线程的setDeamon功能肖似

  name:进程名字。

  pid:进程号。

实例:

图片 40图片 41

 1 from multiprocessing import Process
 2 import os
 3 import time
 4 def info(name):
 5 
 6 
 7     print("name:",name)
 8     print('parent process:', os.getppid())
 9     print('process id:', os.getpid())
10     print("------------------")
11     time.sleep(1)
12 
13 def foo(name):
14 
15     info(name)
16 
17 if __name__ == '__main__':
18 
19     info('main process line')
20 
21 
22     p1 = Process(target=info, args=('alvin',))
23     p2 = Process(target=foo, args=('egon',))
24     p1.start()
25     p2.start()
26 
27     p1.join()
28     p2.join()
29 
30     print("ending")
31 
32 
33 
34 #运行结果:
35 # name: main process line
36 # parent process: 5112
37 # process id: 10808
38 # ------------------
39 # name: alvin
40 # name: egon
41 # parent process: 10808
42 # process id: 9576
43 # ------------------
44 # parent process: 10808
45 # process id: 9604
46 # ------------------
47 # ending

process类创设多进度

经过tasklist(Win)只怕ps -elf
|grep(linux)命令检查评定每三个历程号(PID)对应的经过名.

十二、协程

 1 import time
 2 
 3 """
 4 传统的生产者-消费者模型是一个线程写消息,一个线程取消息,通过锁机制控制队列和等待,但一不小心就可能死锁。
 5 如果改用协程,生产者生产消息后,直接通过yield跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产,效率极高。
 6 """
 7 # 注意到consumer函数是一个generator(生成器):
 8 # 任何包含yield关键字的函数都会自动成为生成器(generator)对象
 9 
10 def consumer():
11     r = ''
12     while True:
13         # 3、consumer通过yield拿到消息,处理,又通过yield把结果传回;
14         #    yield指令具有return关键字的作用。然后函数的堆栈会自动冻结(freeze)在这一行。
15         #    当函数调用者的下一次利用next()或generator.send()或for-in来再次调用该函数时,
16         #    就会从yield代码的下一行开始,继续执行,再返回下一次迭代结果。通过这种方式,迭代器可以实现无限序列和惰性求值。
17         n = yield r
18         if not n:
19             return
20         print('[CONSUMER] ←← Consuming %s...' % n)
21         time.sleep(1)
22         r = '200 OK'
23 def produce(c):
24     # 1、首先调用c.next()启动生成器
25     next(c)
26     n = 0
27     while n < 5:
28         n = n + 1
29         print('[PRODUCER] →→ Producing %s...' % n)
30         # 2、然后,一旦生产了东西,通过c.send(n)切换到consumer执行;
31         cr = c.send(n)
32         # 4、produce拿到consumer处理的结果,继续生产下一条消息;
33         print('[PRODUCER] Consumer return: %s' % cr)
34     # 5、produce决定不生产了,通过c.close()关闭consumer,整个过程结束。
35     c.close()
36 if __name__=='__main__':
37     # 6、整个流程无锁,由一个线程执行,produce和consumer协作完成任务,所以称为“协程”,而非线程的抢占式多任务。
38     c = consumer()
39     produce(c)
40     
41     
42 '''
43 result:
44 
45 [PRODUCER] →→ Producing 1...
46 [CONSUMER] ←← Consuming 1...
47 [PRODUCER] Consumer return: 200 OK
48 [PRODUCER] →→ Producing 2...
49 [CONSUMER] ←← Consuming 2...
50 [PRODUCER] Consumer return: 200 OK
51 [PRODUCER] →→ Producing 3...
52 [CONSUMER] ←← Consuming 3...
53 [PRODUCER] Consumer return: 200 OK
54 [PRODUCER] →→ Producing 4...
55 [CONSUMER] ←← Consuming 4...
56 [PRODUCER] Consumer return: 200 OK
57 [PRODUCER] →→ Producing 5...
58 [CONSUMER] ←← Consuming 5...
59 [PRODUCER] Consumer return: 200 OK
60 '''

 

greenlet:

greenlet机制的第生龙活虎观念是:生成器函数也许协程函数中的yield语句挂起函数的施行,直到稍后使用next()或send()操作实行回复结束。能够使用一个调治器循环在生机勃勃组生成器函数之间同盟四个职责。greentlet是python中得以达成大家所谓的”Coroutine(协程)”的二个底子库. 

 1 from greenlet import greenlet
 2  
 3 def test1():
 4     print (12)
 5     gr2.switch()
 6     print (34)
 7     gr2.switch()
 8  
 9 def test2():
10     print (56)
11     gr1.switch()
12     print (78)
13  
14 gr1 = greenlet(test1)
15 gr2 = greenlet(test2)
16 gr1.switch()
17 
18 
19 #运行结果:
20 #12
21 #56
22 #34
23 #78

基于greenlet的框架——gevent

gevent模块完毕协程:

Python通过yield提供了对协程的中坚补助,不过不完全。而第三方的gevent为Python提供了相比康健的协程辅助。

gevent是第三方库,通过greenlet完毕协程,其基本盘算是:

当一个greenlet碰着IO操作时,举个例子访问网络,就活动切换来任何的greenlet,等到IO操作完毕,再在适宜的时候切换回来继续实践。由于IO操作十二分耗费时间,常常使程序处于等候情状,有了gevent为大家机关心换协程,就确认保证总有greenlet在运行,实际不是伺机IO。

由于切换是在IO操作时自动完毕,所以gevent要求改过Python自带的局地标准库,那生龙活虎进程在运维时通过monkey
patch实现:

图片 42图片 43

 1 from gevent import monkey
 2 monkey.patch_all()
 3 import gevent
 4 from urllib import request
 5 import time
 6 
 7 def f(url):
 8     print('GET: %s' % url)
 9     resp = request.urlopen(url)
10     data = resp.read()
11     print('%d bytes received from %s.' % (len(data), url))
12 
13 start=time.time()
14 
15 gevent.joinall([
16         gevent.spawn(f, 'https://itk.org/'),
17         gevent.spawn(f, 'https://www.github.com/'),
18         gevent.spawn(f, 'https://zhihu.com/'),
19 ])
20 
21 print(time.time()-start)
22 
23 
24 
25 #运行结果:
26 #GET: https://itk.org/
27 #GET: https://www.github.com/
28 #GET: https://zhihu.com/
29 #9077 bytes received from https://zhihu.com/.
30 #12323 bytes received from https://itk.org/.
31 #92574 bytes received from https://www.github.com/.
32 #3.7679357528686523

gevent实例

 

 

图片 44图片 45

 1 from gevent import monkey
 2 monkey.patch_all()
 3 import gevent
 4 from urllib import request
 5 import time
 6 
 7 def f(url):
 8     print('GET: %s' % url)
 9     resp = request.urlopen(url)
10     data = resp.read()
11     print('%d bytes received from %s.' % (len(data), url))
12 
13 start=time.time()
14 
15 # gevent.joinall([
16 #         gevent.spawn(f, 'https://itk.org/'),
17 #         gevent.spawn(f, 'https://www.github.com/'),
18 #         gevent.spawn(f, 'https://zhihu.com/'),
19 # ])
20 
21 f('https://itk.org/')
22 f('https://www.github.com/')
23 f('https://zhihu.com/')
24 
25 print(time.time()-start)
26 
27 
28 
29 #运行结果:
30 #GET: https://itk.org/
31 #12323 bytes received from https://itk.org/.
32 #GET: https://www.github.com/
33 #92572 bytes received from https://www.github.com/.
34 #GET: https://zhihu.com/
35 #8885 bytes received from https://zhihu.com/.
36 #5.089903354644775

比较串行方式的运作效能

 

参照他事他说加以侦查资料:

2.

 

发表评论

电子邮件地址不会被公开。 必填项已用*标注