多线程

多线程创建

  • 通过函数 使用threading模块中Thread类中target参数,需要制定接受一个函数对象,函数就是执行子线程的逻辑
  • 通过类 自定义类继承threading.Thread类 并覆盖run()方法中来实现线程的逻辑 通过面向对象的编程方式,如果你的子线程比较多 比较方便管理代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import time
import threading
def singe():
for i in range(3):
print('正在唱歌....')
time.sleep(1)


def dance():
for i in range(3):
print('正在跳舞....')
time.sleep(1)



if __name__ == '__main__':
t1 = threading.Thread(target=singe)
t2 = threading.Thread(target=dance)
t1.start()
t2.start()
正在唱歌....
#正在跳舞....
#正在唱歌....
#正在跳舞....
#正在跳舞....
#正在唱歌....

主线程与子线程的关系

  • 主线程会等待子线程执行结束后再结束
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import threading
import time

def demo():
# 子线程
for i in range(5):
print('hello 子线程')
time.sleep(1)

if __name__ == '__main__':

t = threading.Thread(target=demo)
t.start() # 启动线程

print(1)
#hello 子线程
#1
#hello 子线程
#hello 子线程
#hello 子线程
#hello 子线程

  • 守护线程 setDaemon(True) 不会等待子线程结束
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import threading
import time

def demo():
# 子线程
for i in range(5):
print('hello 子线程')
time.sleep(1)

if __name__ == '__main__':

t = threading.Thread(target=demo)
# 守护线程 不会等待子线程结束
t.setDaemon(True)
t.start() # 启动线程
print(1)

#hello 子线程
#1
  • .join() 等待线程结束后主线程再进行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def demo():
# 子线程
for i in range(5):
print('hello 子线程')
time.sleep(1)

if __name__ == '__main__':

t = threading.Thread(target=demo)
t.start() # 启动线程
# 等待子线程结束之后主线程在执行
t.join()

print(1)
#hello 子线程
#hello 子线程
#hello 子线程
#hello 子线程
#hello 子线程
#1

查看线程数量

  • threading.enumerate 查看线程数量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import threading
import time
def demo1():
for i in range(5):
print('demo1--%d'%i)
time.sleep(1)


def demo2():
for i in range(10):
print('demo2--%d' % i)
time.sleep(1)

def main():
t1 = threading.Thread(target=demo1)
t2 = threading.Thread(target=demo2)
t1.start()
t2.start()
while True:

print(threading.enumerate())
if len(threading.enumerate()) <= 1:
break
time.sleep(1)

if __name__ == '__main__':

main()
#代码冗长不做赘述

子线程的执行与创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import threading
import time

def demo():
for i in range(5):
print('demo--%d'%i)
time.sleep(1)

def main():
print(threading.enumerate())
t1 = threading.Thread(target=demo) # 回调函数
print(threading.enumerate())
t1.start() # 创建并启动线程
print(threading.enumerate())

if __name__ == '__main__':
main()
#只有最后一个threading.enumerate()可以输出子线程 故t1.start() 创建并启动线程
#threading.Thread 只是回调函数

通过类创建多线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import threading
import time

class A(threading.Thread): #继承类

def run(self) -> None:

for i in range(5):
print(i)

if __name__ == '__main__':
a = A()
a.start()
print(threading.enumerate())
time.sleep(5)
print(123)
print(threading.enumerate())
#0
#[<_MainThread(MainThread, started 13260)>, <A(Thread-1, started 10304)>]
#1
#2
#3
#4
#123
#[<_MainThread(MainThread, started 13260)>]

多线程的资源竞争

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import threading
import time
# 线程是共享全局变量
num = 0
# 创建一个互斥锁 默认情况下是没有上锁
# 用Lock()方式创建的锁叫做不可重复的锁,只能执行一次
# mutex = threading.Lock()
# mutex = threading.RLock()
def demo1(nums):
global num
for i in range(nums):
num += 1
print('demo1-num-%d'%num)


def demo2(nums):
global num
for i in range(nums):
num += 1
print('demo2-num-%d' % num)


def main():
#t1 = threading.Thread(target=demo1,args=(100,)) #100
#t2 = threading.Thread(target=demo2,args=(100,)) #200 数量少的时候不容易出现
t1 = threading.Thread(target=demo1,args=(1000000,)) #demo1-num-1560258
t2 = threading.Thread(target=demo2,args=(1000000,)) #demo2-num-1540071 数量多容易出现
t1.start()
t2.start()
time.sleep(3)
print('main-num-%d'%num)

if __name__ == '__main__':
main()

上锁

  • 互斥锁

    • 因多线程几乎同时修改某一共享数据的时候 需要进行同步控制,某个线程要更改数据时,先将其锁定,此时资源状态为锁定,其他线程不能改变,直到该线程释放资源,其他线程才能再次锁定资源

    • mutex = threading.Lock()   #创建锁 只可执行一次
      #mutex = threading.RLock() #可以执行多次
      # 加锁
      mutex.acquire()
      # 解锁
      mutex.release()
      <!--hexoPostRenderEscape:<figure class="highlight plain"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br><span class="line">28</span><br><span class="line">29</span><br><span class="line">30</span><br><span class="line">31</span><br><span class="line">32</span><br><span class="line">33</span><br><span class="line">34</span><br><span class="line">35</span><br><span class="line">36</span><br><span class="line">37</span><br><span class="line">38</span><br><span class="line">39</span><br><span class="line">40</span><br></pre></td><td class="code"><pre><span class="line"></span><br><span class="line">&#96;&#96;&#96;python</span><br><span class="line">import threading</span><br><span class="line">import time</span><br><span class="line"># 线程是共享全局变量</span><br><span class="line">num &#x3D; 0</span><br><span class="line"># 用Lock()方式创建的锁叫做不可重复的锁,只能执行一次</span><br><span class="line">mutex &#x3D; threading.Lock()</span><br><span class="line">def demo1(nums):</span><br><span class="line">    global num</span><br><span class="line">    # 加锁</span><br><span class="line">    mutex.acquire()</span><br><span class="line">    for i in range(nums):</span><br><span class="line">        num +&#x3D; 1</span><br><span class="line">    # 解锁</span><br><span class="line">    mutex.release()</span><br><span class="line">    print(&#39;demo1-num-%d&#39;%num)</span><br><span class="line"></span><br><span class="line"></span><br><span class="line">def demo2(nums):</span><br><span class="line">    global num</span><br><span class="line">    # 加锁</span><br><span class="line">    mutex.acquire()</span><br><span class="line">    for i in range(nums):</span><br><span class="line">        num +&#x3D; 1</span><br><span class="line">        # 解锁</span><br><span class="line">    mutex.release()</span><br><span class="line">    print(&#39;demo2-num-%d&#39; % num)</span><br><span class="line"></span><br><span class="line"></span><br><span class="line">def main():</span><br><span class="line">    t1 &#x3D; threading.Thread(target&#x3D;demo1,args&#x3D;(1000000,))    #demo1-num-1000000</span><br><span class="line">    t2 &#x3D; threading.Thread(target&#x3D;demo2,args&#x3D;(1000000,))    #demo2-num-2000000</span><br><span class="line">    t1.start()</span><br><span class="line">    t2.start()</span><br><span class="line">    time.sleep(3)</span><br><span class="line">    print(&#39;main-num-%d&#39;%num)</span><br><span class="line"></span><br><span class="line">if __name__ &#x3D;&#x3D; &#39;__main__&#39;:</span><br><span class="line">    main()</span><br></pre></td></tr></table></figure>:hexoPostRenderEscape-->
  • 死锁

    • 在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源就会造成死锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import threading
import time

class MyThread1(threading.Thread):
def run(self):
# 对mutexA上锁
mutexA.acquire()

# mutexA上锁后,延时1秒,等待另外那个线程 把mutexB上锁
print(self.name+'----do1---up----')
time.sleep(1)

# 此时会堵塞,因为这个mutexB已经被另外的线程抢先上锁了
mutexB.acquire()
print(self.name+'----do1---down----')
mutexB.release()

# 对mutexA解锁
mutexA.release()

class MyThread2(threading.Thread):
def run(self):
# 对mutexB上锁
mutexB.acquire()

# mutexB上锁后,延时1秒,等待另外那个线程 把mutexA上锁
print(self.name+'----do2---up----')
time.sleep(1)

# 此时会堵塞,因为这个mutexA已经被另外的线程抢先上锁了
mutexA.acquire()
print(self.name+'----do2---down----')
mutexA.release()

# 对mutexB解锁
mutexB.release()

mutexA = threading.Lock()
mutexB = threading.Lock()

if __name__ == '__main__':
t1 = MyThread1()
t2 = MyThread2()
t1.start()
t2.start()

生产者消费者模型

Queue线程

  • 在线程中,访问一些全局变量,加锁是一个经常的过程。如果你是想把一些数据存储到某个队列中,那么Python内置了一个线程安全的模块叫做queue模块。Python中的queue模块中提供了同步的、线程安全的队列类,包括FIFO(先进先出)队列Queue,LIFO(后入先出)队列LifoQueue。这些队列都实现了锁原语(可以理解为原子操作,即要么不做,要么都做完),能够在多线程中直接使用。可以使用队列来实现线程间的同步。
  • empty():判断队列是否为空。
  • full():判断队列是否满了。
  • get():从队列中取最后一个数据。
  • put():将一个数据放到队列中。

生产者与消费者

  • 生产者和消费者模式是多线程开发中常见的一种模式。通过生产者和消费者模式,可以让代码达到高内聚低耦合的目标,线程管理更加方便,程序分工更加明确。
  • 生产者的线程专门用来生产一些数据,然后存放到容器中(中间变量)。消费者在从这个中间的容器中取出数据进行消费

Lock版本生产者与消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import threading
import random
gMoney = 0
# 定义一个变量 保存生产的次数 默认是0次
gTimes = 0
# 定义一把锁
gLock = threading.Lock()

# 定义生产者
class Producer(threading.Thread):

def run(self):
global gMoney
global gTimes
gLock.acquire() # 上锁
while True:
# gLock.acquire() # 上锁
if gTimes >= 10:
# gLock.release()
break
money = random.randint(0,100)
gMoney += money
gTimes += 1
print("%s生产了%d元钱" % (threading.current_thread().name, money))
gLock.release() # 解锁
# 定义消费者
class Consumer(threading.Thread)
def run(self):
global gMoney
while True:
gLock.acquire() # 上锁
money = random.randint(0, 100)
if gMoney >= money:
gMoney -= money
print("%s消费了%d元钱" % (threading.current_thread().name, money))
else:
if gTimes >= 10:
gLock.release()
break
print("%s想消费%d元钱,但是余额只有%d"%(threading.current_thread().name,money,gMoney))
gLock.release() # 解锁

def main():
# 开启5个生产者线程
for x in range(5):
th = Producer(name="生产者%d号" % x)
th.start()
# 开启5个消费者线程
for x in range(5):
th = Consumer(name="消费者%d号" % x)
th.start()

if __name__ == '__main__':
main()

Condition版生产者与消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import threading
import random
gMoney = 0
# 定义一个变量 保存生产的次数 默认是0次
gTimes = 0
# 定义一把锁
# gLock = threading.Lock()
gCond = threading.Condition()
# 定义生产者
class Producer(threading.Thread):

def run(self):
global gMoney
global gTimes

while True:
gCond.acquire() # 上锁
if gTimes >= 10:
gCond.release()
break
money = random.randint(0,100)
gMoney += money
gTimes += 1
print("%s生产了%d元钱,剩余%d元钱" % (threading.current_thread().name, money, gMoney))
gCond.notify_all()
gCond.release() # 解锁
# 定义消费者
class Consumer(threading.Thread):

def run(self):
global gMoney
while True:
gCond.acquire() # 上锁
money = random.randint(0, 100)

while gMoney < money:
if gTimes >= 10:
gCond.release()
return # 这里如果用break只能退出外层循环,所以我们直接return
print("%s想消费%d元钱,但是余额只有%d元钱了,并且生产者已经不再生产了!"%(threading.current_thread().name,money,gMoney))
gCond.wait()
# 开始消费
gMoney -= money
print("%s消费了%d元钱,剩余%d元钱" % (threading.current_thread().name, money, gMoney))
gCond.release()

def main():
# 开启5个生产者线程
for x in range(5):
th = Producer(name="生产者%d号" % x)
th.start()
# 开启5个消费者线程
for x in range(5):
th = Consumer(name="消费者%d号" % x)
th.start()

if __name__ == '__main__':
main()