Python基础-线程相关操作

介绍线程的创建、控制、通信以及线程池、进程的使用

线程的创建、控制、通信

创建线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
"""
创建线程
"""
import threading

"""
构造器创建线程 推荐
"""


def action(max):
for i in range(max):
print(threading.current_thread().getName() + ":" + str(i))


for i in range(100):
print(threading.current_thread().getName() + ":" + str(i))
if i == 20:
# 创建线程
t1 = threading.Thread(target=action, args=(20,))
t1.start()
t2 = threading.Thread(target=action, args=(20,))
t2.start()
print("主线程执行完成")

print('================继承thread=========')
'''
继承Thread 类 重写run方法
'''


class FKThread(threading.Thread):

def __init__(self):
threading.Thread.__init__(self)
self.i = 0

def run(self):
while self.i < 30:
print(threading.current_thread().getName() + ": " + str(self.i))
self.i += 1


for i in range(100):
print(threading.current_thread().getName() + ": " + str(i))
if i == 20:
fk1 = FKThread()
fk1.start()
fk2 = FKThread()
fk2.start()
print('主程序执行完成')

线程不能启动两次,放开下面的注释会报错

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import threading

"""
1.不能启动两次
2.死亡后不能再启动
"""


def action(max):
for i in range(max):
print(threading.current_thread().getName() + ":" + str(i))


sd = threading.Thread(target=action, args=(100,))
for i in range(300):
print(threading.current_thread().getName() + ": " + str(i))
if i == 20:
sd.start()
print(sd.is_alive())

# RuntimeError: threads can only be started once
# if i>20 and not(sd.is_alive()):
# sd.start()

join()方法会阻塞线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
"""
调用join()的线程阻塞
"""
import threading


def action(max):
for i in range(max):
print(threading.current_thread().getName() + ": " + str(i))


threading.Thread(target=action, args=(100,), name='新线程').start()

for i in range(100):
if i == 20:
jt = threading.Thread(target=action, args=(100,), name='被join的线程')
jt.start()
jt.join()

print(threading.current_thread().getName() + ": " + str(i))
print('主线程执行完成')

前台线程结束,后台线程也会结束

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import threading
"""
前台线程结束 后台线程也结束
"""


def action(max):
for i in range(max):
print(threading.current_thread().getName() + ": " + str(i))


dt = threading.Thread(target=action, name='daemon后台线程', args=(100,))
dt.daemon = True
dt.start()

for i in range(10):
print(threading.current_thread().getName() + ": " + str(i))

sleep() 当前线程休眠

1
2
3
4
5
6
7
import time
"""
sleep() 当前线程休眠暂停多少s
"""
for i in range(10):
print("当前时间 %s", time.ctime())
time.sleep(1)

线程不安全问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import threading
import time
"""
两个线程取钱冲突 账户余额出现负数
"""


class Account:
def __init__(self, account_no, balance):
self.account_no = account_no
self.balance = balance


def draw(account, draw_amount):
if account.balance >= draw_amount:
print(threading.current_thread().getName() +
":取钱成功!吐出钞票:" + str(draw_amount))
# 模拟线程切换
time.sleep(0.001)
account.balance -= draw_amount
print("余额为:" + str(account.balance))
else:
print(threading.current_thread().getName() + '取钱失败,余额不足!')


acct = Account('123456', 1000)
threading.Thread(target=draw, args=(acct, 800), name='甲').start()
threading.Thread(target=draw, args=(acct, 800), name='乙').start()

线程加锁,保证线程安全

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import threading
import time
"""
加锁保证线程同步
"""


class Account:
def __init__(self, account_no, balance):
self.account_no = account_no
self.__balance = balance
self.lock = threading.RLock()

def getBalance(self):
return self.__balance

def draw(self, draw_amount):
# 加锁
self.lock.acquire()
try:
if self.__balance >= draw_amount:
print(threading.current_thread().getName() +
"取钱成功!吐出钞票:" + str(draw_amount))
time.sleep(1)
self.__balance -= draw_amount
print("\t余额为:" + str(self.__balance))
else:
print(threading.current_thread().getName() + ":取钱失败!余额不足。。。")
finally:
self.lock.release()


def draw(account, draw_amount):
account.draw(draw_amount)


acct = Account('123456', 1000)
threading.Thread(target=draw, name='甲', args=(acct, 800)).start()
threading.Thread(target=draw, name='乙', args=(acct, 800)).start()

模拟死锁问题,应该要避免

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
"""
模拟死锁问题 应当避免
避免多次锁定、具有相同的加锁顺序、
使用定时锁acquire()方法设置timeout、死锁监测
"""

import threading
import time


class A:
def __init__(self):
self.lock = threading.RLock()

def foo(self, b):
try:
self.lock.acquire()
print('当前线程名:' + threading.current_thread().getName() +
"进入A实例foo方法")
time.sleep(0.2)
print('当前线程名:' + threading.current_thread().getName() +
"企图调用B实例的last的方法")
b.last()
finally:
self.lock.release()

def last(self):
try:
self.lock.acquire()
print('进入了A类的last方法内部')
finally:
self.lock.release()


class B:
def __init__(self):
self.lock = threading.RLock()

def bar(self, a):
try:
self.lock.acquire()
print('当前线程名:' + threading.current_thread().getName() +
"进入了B实例的bar方法")
time.sleep(0.2)
print('当前线程名:' + threading.current_thread().getName() +
'企图调用A实例的last方法')
a.last()
finally:
self.lock.release()

def last(self):
try:
self.lock.acquire()
print('进入了B类的last方法内部')
finally:
self.lock.release()


a = A()
b = B()


def init():
threading.current_thread().name = '主线程'
a.foo(b)
print('进入了主线程之后')


def action():
threading.current_thread().name = '副线程'
b.bar(a)
print('进入了副线程之后')


threading.Thread(target=action).start()
init()

线程之间的通信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
"""
线程之间的通信 condition
acquire wait notify notifyAll 方法
"""
import threading


class Account:

def __init__(self, account_no, balance):
self.account_no = account_no
self.__balance = balance
self.cond = threading.Condition()
self.__flag = False

def getBalance(self):
return self.__balance

def draw(self, draw_amount):
self.cond.acquire()
try:
if not self.__flag:
self.cond.wait()
else:
print(threading.current_thread().getName() +
'取钱:' + str(draw_amount))
self.__balance -= draw_amount
print('余额为:' + str(self.__balance))
self.__flag = False
self.cond.notifyAll()
finally:
self.cond.release()

def deposit(self, deposit_amount):
self.cond.acquire()
try:
if self.__flag:
self.cond.wait()
else:
print(threading.current_thread().getName() +
'存钱:' + str(deposit_amount))
self.__balance += deposit_amount
print('账户余额为:' + str(self.__balance))
self.__flag = True
self.cond.notifyAll()
finally:
self.cond.release()


def draw_money(account, draw_amount, max):
for i in range(max):
account.draw(draw_amount)


def deposit_money(account, deposit_amount, max):
for i in range(max):
account.deposit(deposit_amount)


acct = Account('123456', 0)
threading.Thread(target=draw_money, args=(acct, 800, 60), name='张三').start()
threading.Thread(target=deposit_money, args=(
acct, 800, 20), name='存钱甲').start()
threading.Thread(target=deposit_money, args=(
acct, 800, 20), name='存钱乙').start()
threading.Thread(target=deposit_money, args=(
acct, 800, 20), name='存钱丙').start()

阻塞队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
"""
queue 模块下阻塞队列
"""
import threading
import queue
import time


def product(bq):
str_tuple = ('python', 'kotlin', 'swift')
for i in range(10):
print(threading.current_thread().name + "生产者准备生产元祖元素")
time.sleep(0.2)
bq.put(str_tuple[i % 3])
print(threading.current_thread().name + "生产者生产元祖元素完成")


def consumer(bq):
while True:
print(threading.current_thread().name + "消费者准备消费元祖元素")
time.sleep(0.2)
t = bq.get()
print(threading.current_thread().name + "消费者消费%s完成" % t)


# 容量为1的队列 生产1个需消费完再生产
bq = queue.Queue(maxsize=1)
# 三个生产者
threading.Thread(target=product, args=(bq,)).start()
threading.Thread(target=product, args=(bq,)).start()
threading.Thread(target=product, args=(bq,)).start()
# 一个消费者
threading.Thread(target=consumer, args=(bq,)).start()

Event控制线程通信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
"""
Event 管理内部旗标是否为true实现线程通信
但是没有锁lock对象 线程同步需要自己设置锁
is_set()内部旗标是否为true
set()设置旗标true 唤醒等待的线程
clear() 设置旗标false
wait() 阻塞当前线程
"""

import threading
import time

event = threading.Event()


def cal(name):
print('%s 启动' % threading.current_thread().getName())
print('%s 准备开始计算' % name)
# 线程阻塞 等待唤醒
event.wait()
print('%s 收到通知了' % threading.current_thread().getName())
print('%s 开始正式计算' % name)


threading.Thread(target=cal, args=('甲',)).start()
threading.Thread(target=cal, args=('乙',)).start()

time.sleep(2)
print('-------------')
print('主线程发出事件唤醒等待的线程')
event.set()
线程池使用

线程池创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
"""
concurrent.futures 模块 Executor 提供线程池
"""
from concurrent.futures import ThreadPoolExecutor
import threading
import time


def action(max):
my_sum = 0
for i in range(max):
print(threading.current_thread().name + ":" + str(i))
my_sum += i
return my_sum


# 创建包含2个线程的线程池
pool = ThreadPoolExecutor(max_workers=2)
# 提交任务以及参数
future1 = pool.submit(action, 50)
future2 = pool.submit(action, 100)
# 任务是否结束
print(future1.done())
time.sleep(3)
print(future2.done())
# 返回结果
# future1.result() 如果线程未执行完会阻塞当前线程
print(future1.result())
print(future2.result())
print("主线程结束")
pool.shutdown()

获取线程返回值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from concurrent.futures import ThreadPoolExecutor
import threading

"""
上面讲future.result()方法会阻塞当前线程
如果不想阻塞当前线程 可以添加add_done_callback(fn(future))回调函数
"""


def action(max):
my_sum = 0
for i in range(max):
print(threading.current_thread().getName() + ":" + str(i))
my_sum += i
return my_sum


def get_result(future):
print(future.result())


with ThreadPoolExecutor(max_workers=2) as pool:
future1 = pool.submit(action, 50)
future2 = pool.submit(action, 100)

future1.add_done_callback(get_result)
future2.add_done_callback(get_result)
print('---------------------------')

ThreadPoolExecutor的map方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

"""
Executor 提供map(fn, *iterables, timeout=None, chunksize=1) 方法
为每个iterables里的元素启动一个线程
"""

from concurrent.futures import ThreadPoolExecutor
import threading


def action(max):
my_sum = 0
for i in range(max):
print(threading.current_thread().name + ":" + str(i))
my_sum += i
return my_sum


with ThreadPoolExecutor(max_workers=4) as pool:
reuslts = pool.map(action, (50, 100, 150))
print('------------')
for r in reuslts:
print('==============' + str(r))

local()函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
"""
线程工具函数 threading.local()函数 返回线程局部变量
为每个线程提供一个变量副本 线程间使用变量不冲突
"""

from concurrent.futures import ThreadPoolExecutor
import threading

mydata = threading.local()


def action(max):
for i in range(max):
try:
mydata.x += i
except Exception:
mydata.x = i
print('%s mydata.x 的值为:%d' %
(threading.current_thread().name, mydata.x))


with ThreadPoolExecutor(max_workers=2) as pool:
pool.submit(action, 10)
pool.submit(action, 10)

threading中Timer 指定时间执行一次

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
"""
threading 的Timer 类 指定时间执行一次
cancel() 取消Timer调度
"""

from threading import Timer
import time

# def hello():
# print('hello world')

# t = Timer(10,hello)
# t.start()


count = 0


def print_time():
print("当前时间:%s" % time.ctime())
global t, count
count += 1
if count < 10:
t = Timer(1, print_time)
t.start()


t = Timer(1, print_time)
t.start()
# time.sleep(5)
# t.cancel()

sched模块scheduler()线程调度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
"""
sched 模块 执行复杂的任务调度
"""

import sched
import time


# 定义线程调度器
s = sched.scheduler()


def print_time(name='default'):
print('%s 的时间:%s' % (name, time.ctime()))


print('主线程:', time.ctime())

# 10s后 优先级1 执行print_time
s.enter(10, 1, print_time)

# 5s后 优先级2
s.enter(5, 2, print_time, argument=('位置参数',))
# 5s后 优先级1 >2
s.enter(5, 1, print_time, kwargs={'name': '关键字参数'})

s.run()
print('主线程:', time.ctime())
进程相关操作

进程的创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
"""
os fork() 方法创建进程 返回0为主进程 其他为子进程
只能用于linux unix macOS

multiprocessing模块 下Process也可以创建进程 方式类似线程
"""

# 指定target创建进程
import os
import multiprocessing


def action(max):
for i in range(max):
print('(%s)子进程(父进程:(%s)):%d' % (os.getpid(), os.getppid(), i))


if __name__ == '__main__':
for i in range(200):
print('(%s)主进程:%d' % (os.getpid(), i))
if i == 20:
mp1 = multiprocessing.Process(target=action, args=(200,))
mp1.start()
mp2 = multiprocessing.Process(target=action, args=(200,))
mp2.start()
mp2.join()

print("--------主线程执行完成----------")


# 继承 multiprocessing.Process 重写run方法 ...

进程的启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
"""
进程启动方式
spawn 效率较低 fork forkserver
windows 只支持spawn
通过set_start_method() 设置
或者 通过get_context()获取Context对象 设置
"""

import multiprocessing
import os

# set_start_method()设置启动方式


# def foo(q):
# print('被启动的新进程(%s)' % os.getpid())
# q.put('python')


# if __name__ == '__main__':
# # multiprocessing.set_start_method('fork')
# multiprocessing.set_start_method('spawn')
# q = multiprocessing.Queue()
# mp = multiprocessing.Process(target=foo, args=(q,))
# mp.start()
# mp.join()
# print(q.get())


# get_context()设置启动方式

def foo(q):
print('被启动的新进程(%s)' % os.getpid())
q.put('heart')


if __name__ == '__main__':
ctx = multiprocessing.get_context('spawn')
q = ctx.Queue()
mp = ctx.Process(target=foo, args=(q,))
mp.start()
mp.join()
print(q.get())

进程池使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
"""
multiprocessing 的Pool() 创建进程池
apply() apply_async()
map() map_async() ...等方法放入进程池
"""

import multiprocessing
import time
import os

# apply_async() 使用

# def action(name='default'):
# print('(%s)进程正在执行,参数为%s' % (os.getpid(), name))
# time.sleep(3)


# if __name__ == '__main__':
# # 4个进程的进程池
# pool = multiprocessing.Pool(processes=4)

# pool.apply_async(action)
# pool.apply_async(action, args=('位置参数',))
# pool.apply_async(action, kwds={'name': '关键字参数'})
# pool.close()
# pool.join()


# map_async()使用

def action(max):
my_sum = 0
for i in range(max):
print('(%s)进程正在执行:%d' % (os.getpid(), i))
my_sum += i
return my_sum


if __name__ == '__main__':
with multiprocessing.Pool(processes=4) as pool:
results = pool.map(action, (50, 100, 150))
print("===================")
for r in results:
print(r)

queue 进程通信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
"""
进程之间通信-Queue
"""
import multiprocessing


def f(q):
print('(%s)开始放入数据。。。' % multiprocessing.current_process().pid)
q.put('python')


if __name__ == '__main__':
q = multiprocessing.Queue()
p = multiprocessing.Process(target=f, args=(q,))
p.start()
print('(%s) 开始读取数据。。。' % multiprocessing.current_process().pid)
print(q.get())
p.join()

进程管道通信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
"""
进程通信-Pipe 管道
"""

import multiprocessing


def f(conn):
print('(%s) 进程开始发送数据。。。。' % multiprocessing.current_process().pid)
conn.send('hello world')


if __name__ == '__main__':
# 返回两个PipeConnection对象
parent_conn, child_conn = multiprocessing.Pipe()
p = multiprocessing.Process(target=f, args=(child_conn,))
p.start()
print('(%s)进程开始接收数据。。。。' % multiprocessing.current_process().pid)
print(parent_conn.recv())
p.join()