python 线程

一、什么是线程

山西快乐十分走势 www.yfhdr.cn 线程(英语:thread)是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。在Unix System V及SunOS中也被称为轻量进程(lightweight processes),但轻量进程更多指内核线程(kernel thread),而把用户线程(user thread)称为线程。

  线程是独立调度和分派的基本单位。线程可以为操作系统内核调度的内核线程,如Win32线程;由用户进程自行调度的用户线程,如Linux平台的POSIX Thread;或者由内核与用户进程,如Windows 7的线程,进行混合调度。

  同一进程中的多条线程将共享该进程中的全部系统资源,如虚拟地址空间,文件描述符和信号处理等等。但同一进程中的多个线程有各自的调用栈(call stack),自己的寄存器环境(register context),自己的线程本地存储(thread-local storage)。

  一个进程可以有很多线程,每条线程并行执行不同的任务。

  在多核或多CPU,或支持Hyper-threading的CPU上使用多线程程序设计的好处是显而易见,即提高了程序的执行吞吐率。在单CPU单核的计算机上,使用多线程技术,也可以把进程中负责I/O处理、人机交互而常被阻塞的部分与密集计算的部分分开来执行,编写专门的workhorse线程执行密集计算,从而提高了程序的执行效率。(以上都是从百度百科 线程 复制粘贴的呵呵)  

线程特点

在多线程OS中,通常是在一个进程中包括多个线程,每个线程都是作为利用CPU的基本单位,是花费最小开销的实体。线程具有以下属性。
1)轻型实体
  线程中的实体基本上不拥有系统资源,只是有一点必不可少的、能保证独立运行的资源。
  线程的实体包括程序、数据和TCB。线程是动态概念,它的动态特性由线程控制块TCB(Thread Control Block)描述。

    TCB包括以下信息:

       ?。?)线程状态。
       ?。?)当线程不运行时,被保存的现场资源。
       ?。?)一组执行堆栈。
       ?。?)存放每个线程的局部变量主存区。
       ?。?)访问同一个进程中的主存和其它资源。

  用于指示被执行指令序列的程序计数器、保留局部变量、少数状态参数和返回地址等的一组寄存器和堆栈。
2)独立调度和分派的基本单位。
  在多线程OS中,线程是能独立运行的基本单位,因而也是独立调度和分派的基本单位。由于线程很“轻”,故线程的切换非常迅速且开销?。ㄔ谕唤讨械模?。
3)可并发执行。
  在一个进程中的多个线程之间,可以并发执行,甚至允许在一个进程中所有线程都能并发执行;同样,不同进程中的线程也能并发执行,充分利用和发挥了处理机与外围设备并行工作的能力。
4)共享进程资源
  在同一进程中的各个线程,都可以共享该进程所拥有的资源,这首先表现在:所有线程都具有相同的地址空间(进程的地址空间),这意味着,线程可以访问该地址空间的每一个虚地址;此外,还可以访问进程所拥有的已打开文件、定时器、信号量机构等。由于同一个进程内的线程共享内存和文件,所以线程之间互相通信不必调用内核。

(以上都是从百度百科 线程 复制粘贴的呵呵呵)

线程与进程的区别

  可以归纳为以下4点:

  1. 地址空间和其它资源(如打开文件):进程间相互独立,同一进程的各线程间共享。某进程内的线程在其它进程不可见。
  2. 通信:进程间通信IPC,线程间可以直接读写进程数据段(如全局变量)来进行通信——需要进程同步和互斥手段的辅助,以保证数据的一致性。
  3. 调度和切换:线程上下文切换比进程上下文切换要快得多。
  4. 在多线程OS中,线程不是一个可执行的实体。

不得不提的GIL

  Python语言和GIL(全称Global Interpreter Lock,全局解释器锁)没有半毛钱关系。仅仅是由于历史原因在Cpython虚拟机(解释器),难以移除GIL。Cpython解释器中存在一个GIL,它的作用就是保证同一时刻只有一个线程可以执行代码。因此造成了我们使用多线程的时候无法实现并行?!?/span>

  # 不安全原因:1.对全局变量进行修改   2.对某个值+= -+ *= /=。(图解待补充。。。。好累好懒)

  解决方法:

  1. 更换解释器 比如使用jpython(java实现的python解释器)
  2. 使用多进程完成多任务的处理
  3. 用C语言扩展

 二、线程初探

python线程??椋簍hreading

在Python中有几个用于多线程编程的???,包括thread、threading和Queue等。我们使用更高级的threading???/strong>。multiprocess??榈耐耆7铝藅hreading??榈慕涌?,二者在使用层面,有很大的相似性。

 

创建线程

面向函数的方式

from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('Frank',))
    t.start()
    print('主线程')

面向对象的方式

from threading import Thread
import time
class Sayhi(Thread):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        time.sleep(2)
        print('%s say hello' % self.name)


if __name__ == '__main__':
    t = Sayhi('Frank')
    t.start()
    print('主线程')
创建线程不需要在 if __name__ == '__main__' 下面执行:
  在线程部分不需要通过import来为新的线程获取代码(开子进程需要),因为新的线程与之前的主线程共享同一段代码(全局变量也共享)。
  不需要import也就意味着不存在在子线程中又重复一次创建线程的操作(发散:创建进程时,win与linux的不同)

多线程与多进程

1.多线程与多进程的开销

①启动100个子线程

# -*- coding:utf-8 -*-
import time
from threading import Thread

def func(a):
    a += 1

if __name__ == '__main__':

    start = time.time()
    t_lis = []
    for i in range(100):
        t = Thread(target=func,args=(1,))
        t_lis.append(t)
        t.start()
    # 等待所有子线程结束
    for i in t_lis:
        i.join()
    print('运行时间:%s'%(time.time()-start))

结果为:0.011000633239746094

 

②启动100个子进程

# -*- coding:utf-8 -*-
import time
from multiprocessing import Process

def func(a):
    a += 1

if __name__ == '__main__':
    start = time.time()
    p_lis = []
    for i in range(100):
        p = Process(target=func,args=(1,))
        p_lis.append(p)
        p.start()
    # 等待所有子进程运行完
    for i in p_lis:
        i.join()
    print('运行时间:%s'%(time.time()-start))

结果为:1.8501055240631104

 

 

2.多线程与多进程数据共享问题  

2.1多个线程之间的全局变量是共享的

from threading import Thread

tn = 0


def func():
    global tn
    tn += 1


t_l = []
for i in range(100):
    t = Thread(target=func)
    t.start()
    t_l.append(t)
for t in t_l: t.join()
print(tn)

结果为:100

 

2.2进程之间数据隔离

from multiprocessing import Process

pn = 0


def func():
    global pn
    pn += 1


if __name__ == '__main__':
    p_l = []
    for i in range(100):
        p = Process(target=func)
        p.start()
        p_l.append(p)
    for p in p_l: p.join()
    print(pn)

结果为:0

 

2.3.进程间的数据共享

from multiprocessing import Process,Manager,Lock

def func(dic,lock):
    dic['count'] -= 1
    # 加锁
    # with lock:
    #     dic['count'] -= 1


if __name__ == '__main__':
    lock = Lock()
    m = Manager()
    dic = m.dict({'count':100})
    p_l = []
    for i in range(100):
        p = Process(target=func,args=(dic,lock))
        p.start()
        p_l.append(p)
    for p in p_l:p.join()
    print(dic)

上述代码,100减100次1的操作,结果如下图:

Q:100减100次1这么慢?  A:不是减操作造成的 而是开启进程 管理进程 销毁进程拖慢了程序的执行速度

Q:为什么结果不为0?  A:数据不安全现象。

进程之间可以共享数据,提供共享数据的类是Manager,但是它提供的list\dict这些数据类型是数据不安全的(针对 +=  -=  *=  /=), 需要加锁来保证安全

 

由此我们总结一下多进程和多线程之间的区别:

进程:数据隔离 开销大

线程:数据共享 开销小

 

Thread类的其他方法

Thread实例对象的方法
  # isAlive(): 返回线程是否活动的。
  # getName(): 返回线程名。
  # setName(): 设置线程名。

threading??樘峁┑囊恍┓椒ǎ?
  # threading.currentThread(): 返回当前的线程变量。
  # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
  # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

# 线程有terminate么?
# 没有terminate 不能强制结束
# 所有的子线程都会在执行完所有的任务之后自动结束

当前的线程对象courrent_thread()

courrentThread = courrent_thread

import os
from threading import Thread, currentThread


def func():
    t = currentThread()
    print(t.name, t.ident, os.getpid())


tobj = Thread(target=func)
tobj.start()
print('tobj :', tobj)
t2 = currentThread()
print(t2.name, t2.ident, os.getpid())

这里可以看到,t 和 t2 两者的线程id都是6456.

相关习题

lst = [1,2,3,4,5,6,7,8,9,10]

# 按照顺序把列表中的每一个元素都计算一个平方,使用多线程的方式并且将结果按照顺序返回

import time
import random
from threading import Thread, currentThread

dic = {}


def func(i):
    t = currentThread()
    time.sleep(random.random())
    dic[t.ident] = i ** 2


t_lst = []
for i in range(1, 11):
    t = Thread(target=func, args=(i,))
    t.start()
    t_lst.append(t)
for t in t_lst:
    t.join()
    print(dic[t.ident])
习题代码:

 

 三、守护线程

#1.对主进程来说,运行完毕指的是主进程代码运行完毕
#2.对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕
#1 主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束,
#2 主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。

from threading import Thread
import time


def sayhi(name):
    time.sleep(2)
    print('%s say hello' % name)


if __name__ == '__main__':
    t = Thread(target=sayhi, args=('Frank',))
    t.setDaemon(True)  # 必须在t.start()之前设置
    t.start()
    print('主线程')
    print(t.is_alive())

 结果:

 

代码中可以看到,除守护线程外,没有非守护线程存在,当主线程代码执行结束后,守护线程就结束了。

 

from threading import Thread
import time


def foo():
    print(123)
    time.sleep(1)
    while True:
        print("end123")


def bar():
    print(456)
    time.sleep(2)
    print("end456")


t1 = Thread(target=foo)
t2 = Thread(target=bar)

t1.daemon = True
t1.start()
t2.start()

print("main-------")

 结果:

 

代码中t1为守护线程,t2为非守护线程子线程,可以看到:主线程代码执行完之后,子线程继续执行,子线程执行结束后,守护线程才结束。

 

四、锁

GIL和锁

  上面已经提过:在Cpython中,存在GIL全局解释器锁,它的作用就是保证同一时刻只有一个线程可以执行代码。它的目的是为了数据安全。

  那么,既然已经加了GIL,为什么线程中我们依然会用到锁呢?那是因为,即使我们使用了GIL,在多线程操作中,我们依然会出现数据不安全现象。

  

  2、列表、字典中的方法基本都安全,但当值为空时,一些操作可能会引起报错。而队列是安全的,put/get方法中,当值为空时,get只会发生阻塞而不会报错

互斥锁

互斥锁:在同一个线程中,不能连续acquire多次,并且可以做到多个线程中被锁的代码同时只有一个线程执行

递归锁

递归锁:在同一个线程中,能连续acquire多次,并且可以做到多个线程中被锁的代码同时只有一个线程执行

死锁现象

  吃面问题:假如四个人吃面,只有一碗面,只有一把叉子,必须同时拿到面和叉子才能吃面。一个人抢到面,一个人抢到叉子,都不放,都等着另一个,就僵在那里,进入阻塞状态!

快速解决死锁:两种,1递归锁,将所有锁都设置成一个RLock()对象。2互斥锁,

 

 

五、queue???/h1>

  队列是线程安全的,自带lock。

先进先出  Queue

在多线程下都不准
q.empty() 判断是否为空
q.full() 判断时候为慢
q.qsize() 队列的大小

 

后进先出 LifoQueue

 

优先级队列 PriorityQueue

pq = PriorityQueue()
pq.put((10,'asdad'))
pq.put((2,'szxd'))
pq.put((20,'gfhrt'))
print(pd.get())# (2,'szxd')
print(pd.get()) # (10,'asdad')
print(pd.get()) # (20,'gfhrt')

六、Python标准???-concurrent.futures

concurrent.futures??樘峁┝烁叨确庾暗囊觳降饔媒涌?/span>
ThreadPoolExecutor:线程池,提供异步调用
ProcessPoolExecutor: 进程池,提供异步调用

基本方法

# submit(fn, *args, **kwargs)
异步提交任务

# map(func, *iterables, timeout=None, chunksize=1)
取代for循环submit的操作

# shutdown(wait=True)
相当于进程池的pool.close()+pool.join()操作
wait=True,等待池内所有任务执行完毕回收完资源后才继续
wait=False,立即返回,并不会等待池内的任务执行完毕
但不管wait参数为何值,整个程序都会等到所有任务执行完毕
submit和map必须在shutdown之前

# result(timeout=None)
取得结果

# add_done_callback(fn)
回调函数

 

 

 进程池

The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned.

class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None)
An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes.If max_workers is None or not given, it will default to the number of processors on the machine. If max_workers is lower or equal to 0, then a ValueError will be raised.

 

译文:

ProcessPoolExecutor类是Executor子类,它使用一个进程池异步执行调用。ProcessPoolExecutor使用多处理???,它允许它旁路执行全局解释器锁,但也意味着只能执行和返回可选对象。

类concurrent.futures.ProcessPoolExecutor(max_workers=None,mp_context=None)一个执行器子类,该子类最多使用一个max_worker进程池异步执行调用。如果max_worker为空或未给定,则它将默认为机器上的处理器数。如果max_worker较低或等于0,则会引发ValueError。

from concurrent.futures import ProcessPoolExecutor

import os, time, random


def task(n):
    print('%s is runing' % os.getpid())
    time.sleep(random.randint(1, 3))
    return n ** 2


if __name__ == '__main__':

    executor = ProcessPoolExecutor(max_workers=3)

    futures = []
    for i in range(11):
        future = executor.submit(task, i)
        futures.append(future)
    executor.shutdown(True)
    print('+++>')
    for future in futures:
        print(future.result())

 

 线程池

ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously.
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='')
An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously.

Changed in version 3.5: If max_workers is None or not given, it will default to the number of processors on the machine, multiplied by 5, assuming that ThreadPoolExecutor is often used to overlap I/O instead of CPU work and the number of workers should be higher than the number of workers for ProcessPoolExecutor.

New in version 3.6: The thread_name_prefix argument was added to allow users to control the threading.Thread names for worker threads created by the pool for easier debugging.

 

译文:

ThreadPoolExecutor是一个Executor子类,它使用线程池异步执行调用。类concurrent.futures.ThreadPoolExecutor(max_workers=None,thread_name_prefix='')一个执行器子类,它最多使用一个max_worker线程池来异步执行调用。

在3.5版中更改:如果max_worker为空或未给定,它将默认为机器上的处理器数乘以5,假设ThreadPoolExecutor经常用于重叠I/O而不是CPU工作,并且工作人员的数量应该高于ProcessPoolExecutor的工作人员数。

版本3.6中的新内容:为允许用户控制由池创建的工作线程的线程名,添加了 thread_name_prefix参数,以便于调试。(金山词霸)

 

线程池的用法与ProcessPoolExecutor相同

import time
from concurrent.futures import ThreadPoolExecutor


def func(arg):
    time.sleep(0.5)
    print(arg)
    return arg


r_lst = []
tp = ThreadPoolExecutor(5)
for i in range(100):
    ret = tp.submit(func, i)
    r_lst.append(ret)
tp.shutdown()  # 阻塞,就有线程池完成任务才继续向下执行
print('执行完了~~再次打印')
for ret in r_lst:
    print(ret.result())

执行结果:

从结果我们可以看到,线程池中5个线程,每5个线程打印一次,因为shutdown阻塞的原因,代码在执行完第一个for循环后没有继续执行print语句,而是等待整个线程池执行结束后才继续执行。

其中我们注意到一个现象,在最后的打印中,result似乎是一次性就打印出了,好像没有发生阻塞,那么result是阻塞的吗?

 

我们将代码中的shutdown注释掉,做一下简单的修改,如下

import time
from concurrent.futures import ThreadPoolExecutor


def func(arg):
    time.sleep(0.1)
    print(arg)
    return arg


r_lst = []
tp = ThreadPoolExecutor(5)
for i in range(100):
    ret = tp.submit(func, i)
    r_lst.append(ret)
# tp.shutdown()  # 阻塞,就有线程池完成任务才继续向下执行
print('执行完了~~再次打印')
for ret in r_lst:
    print('result',ret.result())

结果:

我们可以看到,100个线程被添加到了5个线程池中,任务句柄按顺序添加到了列表r_lst中,此时因为没有shutdown()方法的阻塞,代码没有等待线程执行结束,很快就运行到了result()所在的for循环,但for循环并没有立刻将结果输出。而是在某些线程执行结束后才开始打印结果。由此我们推断:result()方法发生了阻塞,它在等待线程执行结束后才拿到返回值。

 

map方法

map取代了for+submit

 


回调函数

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from multiprocessing import Pool
import requests
import json
import os

def get_page(url):
    print('<进程%s> get %s' %(os.getpid(),url))
    respone=requests.get(url)
    if respone.status_code == 200:
        return {'url':url,'text':respone.text}

def parse_page(res):
    res=res.result()
    print('<进程%s> parse %s' %(os.getpid(),res['url']))
    parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)


if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        '//www.sina.com.cn/'
    ]

    # p=Pool(3)
    # for url in urls:
    #     p.apply_async(get_page,args=(url,),callback=pasrse_page)
    # p.close()
    # p.join()

    p=ProcessPoolExecutor(3)
    for url in urls:
        p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一个future对象obj,需要用obj.result()拿到结果

 

 

Q1:主线程需不需要回收子线程的资源。
A:不需要,线程资源属于进程,所以进程结束了,县城的资源自然就被回收了
Q2:主线程为什么要等待子线程结束之后才结束
A:主线程结束意味着进程结束,所有的子线程都会结束
Q3:守护线程如何结束
A:主线程结束,主进程也结束,守护线程因主进程的结束而结束
面试P38T34
Q:回调函数是谁执行的?

 

posted @ 2019-04-18 17:06 Frank流云 阅读(...) 评论(...) 编辑 收藏
  • 高温“烤验”,品读这些自带凉意的避暑诗词 2019-05-19
  • 2015科教频道中秋晚会《天涯共此时》 2019-05-19
  • 特朗普政府用政治“边缘政策”处理贸易问题是玩火 2019-05-17
  • 高清:探访苟坝见证历史 红军在此留下珍贵遗物 2019-05-15
  • 交通小事故,几分钟就能处理完 2019-05-15
  • 在现时代,无论中国还是西方发达国家都是社会财富公有制和私有制并存的社会,由于仍旧存在社会财富私有制,所以必然存在贫富差别,离开私有制来谈“贫”和“富”... 2019-05-15
  • 【学习时刻·经济实说②】管清友:中央经济工作会议的十大亮点 2019-05-15
  • 绿染江源,千湖归来——三江源生态保护建设取得阶段性成效 2019-05-13
  • 把握和传承好“变则通”思想(大家手笔) 2019-05-13
  • 5月份70个大中城市商品住宅销售情况:一线城市商品房价格同比持续下降 2019-05-08
  • 官方:装备管理失职 恒大俱乐部总经理等人遭处罚 2019-05-01
  • 中共一大代表中最早辞世的王尽美:3首小诗与27年人生 2019-04-29
  • 不管怎么修饰辞藻,只要放弃革命,就是苏联的结果,还用证明吗? 2019-04-27
  • 新和县:12333为参保群众提供咨询便利 2019-04-22
  • 秋冬进补“小人参” 称霸国人餐桌3000年 2019-04-22
  • 774| 834| 191| 157| 98| 494| 109| 710| 394| 520|