歡迎光臨
我們一直在努力

Python網路程式設計 —— 執行緒

執行緒的概念

執行緒就是在程式執行過程中,執行程式程式碼的一個分支,每個執行的程式至少都有一個執行緒

單執行緒執行

import time


def sing():
    for i in range(3):
        print("唱歌...%d" % i)
        time.sleep(1)

def dance():
    for i in range(3):
        print("跳舞...%d" % i)
        time.sleep(1)

if __name__ == '__main__':
    sing()
    dance()
執行結果:

唱歌...0
唱歌...1
唱歌...2
跳舞...0
跳舞...1
跳舞...2

***Repl Closed***

多執行緒執行

多執行緒的執行需要匯入threading模組

引數說明:

Thread([group[,target[,name[,args[,kwargs]]]]])
- group: 執行緒組,目前只能使用None
- target: 執行的目標任務名
- args: 以元組的方式給執行任務傳參
- kwargs: 以字典方式給執行任務傳參
- name: 執行緒名,一般不用設定

多執行緒完成多工

# 多執行緒執行
import time, threading


def sing():
    # 獲取當前程序
    print(threading.current_thread())
    for i in range(3):

        print("唱歌...%d" % i)
        time.sleep(1)

def dance():
    print(threading.current_thread())
    for i in range(3):

        print("跳舞...%d" % i)
        time.sleep(1)

if __name__ == '__main__':
    sing_thread = threading.Thread(target=sing)
    dance_thread = threading.Thread(target=dance)

    sing_thread.start()
    dance_thread.start()
執行結果:


唱歌...0

跳舞...0
唱歌...1
跳舞...1
唱歌...2
跳舞...2

***Repl Closed***

多執行緒執行帶有引數的任務

import time, threading


def sing(num):
    for i in range(num):
        print("唱歌...%d" % i)
        time.sleep(1)


def dance(num):
    for i in range(num):
        print("跳舞...%d" % i)
        time.sleep(1)


if __name__ == '__main__':
    sing_thread = threading.Thread(target=sing, args=(3,))

    dance_thread = threading.Thread(target=dance, kwargs={"num": 3})

    sing_thread.start()
    dance_thread.start()
執行結果:

唱歌...0
跳舞...0
跳舞...1
唱歌...1
跳舞...2
唱歌...2

***Repl Closed***

檢視獲取執行緒列表

import time, threading


def sing():
    for i in range(5):
        print("唱歌...%d" % i)
        time.sleep(1)


def dance():
    for i in range(5):
        print("跳舞...%d" % i)
        time.sleep(1)


if __name__ == '__main__':
    # 獲取當前程式活動執行緒的列表
    thread_list = threading.enumerate()
    print("111:", thread_list, len(thread_list))

    sing_thread = threading.Thread(target=sing)

    dance_thread = threading.Thread(target=dance)

    thread_list = threading.enumerate()
    print("222:", thread_list, len(thread_list))

    # 啟動執行緒
    sing_thread.start()
    dance_thread.start()

    # 只有執行緒啟動了,才能加入到活動執行緒列表中
    thread_list = threading.enumerate()
    print("333:", thread_list, len(thread_list))
執行結果:

111: [<_MainThread(MainThread, started 11864)>] 1
222: [<_MainThread(MainThread, started 11864)>] 1
唱歌...0
跳舞...0
333: [<_MainThread(MainThread, started 11864)>, , ] 3
跳舞...1
唱歌...1
跳舞...2
唱歌...2
唱歌...3
跳舞...3
唱歌...4
跳舞...4

***Repl Closed***

注意

執行緒之間執行是無序的

import time, threading


def task():
    time.sleep(1)
    print("當前執行緒:", threading.current_thread().name)

if __name__ == '__main__':

    for _ in range(5):
        sub_thread = threading.Thread(target=task)
        sub_thread.start()
執行結果:

當前執行緒: Thread-5
當前執行緒: Thread-2
當前執行緒: Thread-3
當前執行緒: Thread-1
當前執行緒: Thread-4

***Repl Closed***

主執行緒會等待所有的子執行緒結束後才結束

# 主執行緒會等待所有的子執行緒結束後纔會結束
import time, threading


# 測試主執行緒是否會等待子執行緒執行完成以後程式再退出
def show_info():
    for i in range(5):
        print("test:", i)
        time.sleep(1)


if __name__ == '__main__':
    sub_thread = threading.Thread(target=show_info)
    sub_thread.start()

    # 主執行緒延時5秒
    time.sleep(10)
    print("over")
執行結果:

test: 0
test: 1
test: 2
test: 3
test: 4
over

***Repl Closed***

守護主執行緒

import time, threading


def show_info():
    for i in range(5):
        print("test:", i)
        time.sleep(1)

if __name__ == '__main__':
    # 設定成守護主執行緒,主執行緒退出後子執行緒直接銷燬不再執行子執行緒的程式碼
    sub_thread = threading.Thread(target=show_info, daemon=True)


    sub_thread.start()

    time.sleep(10)
    print("over")
執行結果:

test: 0
test: 1
test: 2
test: 3
test: 4
over

***Repl Closed***

自定義執行緒

import threading


# 自定義執行緒類
class MyThread(threading.Thread):
    # 通過構造方法取接受任務的引數
    def __init__(self, info1, info2):
        # 呼叫父類的構造方法
        super().__init__()
        self.info1 = info1
        self.info2 = info2


    # 定義自定義執行緒相關的任務
    def test1(self):
        print(self.info1)


    def test2(self):
        print(self.info2)

    # 通過run方法執行相關任務
    def run(self):
        self.test1()
        self.test2()


# 建立自定義執行緒
my_thread = MyThread("測試1", "測試2")

# 啟動
my_thread.start()
執行結果:

測試1
測試2

***Repl Closed***

總結:

多執行緒共享全域性變數

import time, threading


# 定義全域性變數
my_list = list()

# 寫入資料任務
def write_data():
    for i in range(5):
        my_list.append(i)
        time.sleep(1)
    print("write_data:", my_list)

# 讀取資料任務
def read_data():
    print("read_data:", my_list)

if __name__ == '__main__':
    # 建立寫入資料的執行緒
    write_thread = threading.Thread(target=write_data)
    # 建立讀取資料的執行緒
    read_thread = threading.Thread(target=read_data)

    write_thread.start()

    # 主執行緒等待寫入執行緒執行完成以後程式碼再繼續往下執行
    write_thread.join()
    print("開始讀取資料...")
    read_thread.start()
執行結果:

write_data: [0, 1, 2, 3, 4]
開始讀取資料...
read_data: [0, 1, 2, 3, 4]

***Repl Closed***

多執行緒同時對全域性變數進行操作,導致資料可能出現錯誤

import threading


# 定義全域性變數
g_num = 0

# 迴圈一次給全域性變數加1
def sum_num1():
    for i in range(1000000):
        global g_num
        g_num += 1
    print("sum1:", g_num)

# 迴圈一次給全域性變數加1
def sum_num2():
    for i in range(1000000):
        global g_num
        g_num += 1
    print("sum2:", g_num)

if __name__ == '__main__':
    # 建立兩個執行緒
    first_thread = threading.Thread(target=sum_num1)
    second_thread = threading.Thread(target=sum_num2)

    first_thread.start()
    second_thread.start()
執行結果:

sum1: 1491056
sum2: 1528560

***Repl Closed***

通過上面執行結果,得出:多執行緒同時對全域性變數運算元據發生了錯誤

原因分析:兩個執行緒first_thread和second_thread都要對全域性變數g_num(預設是0)進行加1運算,但是由於是多執行緒同時操作,,有可能出現下面的情況:

1.在g_num=0時,first_thread取得g_num=0.此時系統把first_thread排程為」sleeping」狀態,把second_thread轉換為」running」狀態,t2也獲得g_num=0

2.然後second_thread對得到的值進行加1並賦給g_num,使得g_num=1

3.然後系統又把second_thread排程為」sleeping」,把first_thread轉為」running」.執行緒t1又把之前得到的0加1後賦值給g_num.

4.這樣導致雖然first_thread和second_thread都對g_num加1,但結果仍然是g_num=1。

全域性變數資料錯誤的解決辦法

執行緒同步:保證同一時刻只能有一個執行緒去操作全域性變數同步,就是協同步調,按預定的先後次序進行執行

執行緒同步的方式:

1.執行緒等待(join)

2.互斥鎖

執行緒等待實現方式:

import threading


# 定義全域性變數
g_num = 0

# 迴圈一次給全域性變數加1
def sum_num1():
    for i in range(1000000):
        global g_num
        g_num += 1
    print("sum1:", g_num)

# 迴圈一次給全域性變數加1
def sum_num2():
    for i in range(1000000):
        global g_num
        g_num += 1
    print("sum2:", g_num)

if __name__ == '__main__':
    # 建立兩個執行緒
    first_thread = threading.Thread(target=sum_num1)
    second_thread = threading.Thread(target=sum_num2)

    first_thread.start()
    first_thread.join()

    second_thread.start()
執行結果:

sum1: 1000000
sum2: 2000000

***Repl Closed***

結論:多個執行緒同時對同一個全域性變數進行操作,會有可能出現資源競爭資料錯誤的問題

執行緒同步方式可以解決資源競爭資料錯誤問題,但是這樣有多工變成了單任務

互斥鎖

對共享資料進行鎖定,保證同一時刻只能有一個執行緒去操作

搶到鎖的執行緒先執行,沒有搶到鎖的執行緒需要等待,等鎖用完後需要釋放,然後其它等待的執行緒再去搶這個鎖,哪個執行緒搶到,那個執行緒再執行

具體哪個執行緒搶到這個鎖,我們決定不了,是由CPU排程決定的

執行緒同步能夠保證多個執行緒安全訪問競爭資源,最簡單的同步機制是引入互斥鎖

互斥鎖為資源引入的一個狀態:鎖定/非鎖定

某個執行緒要更改共享資料時,先將其鎖定,此時資源的狀態為」鎖定」,其他執行緒不能更改;直到該執行緒釋放資源,將資源的狀態變成」非鎖定」,其他的執行緒才能再次鎖定該資源。互斥鎖保證了每次只有一個執行緒進行寫入操作,從而保證了多執行緒情況下資料的正確性。

建立鎖:

a = threading.Lock()

鎖定

a.acquire()

釋放

a.release()

注意:

1.如果這個鎖之前時沒有上鎖的,那麼acquire不會堵塞

2.如果在呼叫acquire對這個鎖上鎖之前,它已經被其它執行緒上了鎖,那麼此時acquire會堵塞,直到這個鎖被解鎖為止

# 使用互斥鎖完成2個執行緒對同一個全域性變數各加100萬次的操作
import threading


# 定義全域性變數
g_num = 0

# 建立全域性互斥鎖
lock = threading.Lock()

# 迴圈一次給全域性變數加1
def sum_num1():
    # 上鎖
    lock.acquire()
    for i in range(1000000):
        global g_num
        g_num += 1

    print("sun1:", g_num)

    # 釋放鎖
    lock.release()

# 迴圈一次給全域性變數加1
def sum_num2():
    # 上鎖
    lock.acquire()
    for i in range(1000000):
        global g_num
        g_num += 1

    print("sum2:", g_num)

    # 釋放鎖
    lock.release()

if __name__ == '__main__':
    # 建立執行緒
    first_thread = threading.Thread(target=sum_num1)
    second_thread = threading.Thread(target=sum_num2)

    # 啟動執行緒
    first_thread.start()
    second_thread.start()
執行結果:

sun1: 1000000
sum2: 2000000

***Repl Closed***

注意

加上互斥鎖,哪個執行緒搶到這個鎖我們決定不了,哪個執行緒搶到鎖哪個執行緒先執行,沒有搶到的執行緒需要等待

加上互斥鎖多工瞬間變成單任務,效能會下降,也就是說同一時刻只能有一個執行緒去執行

使用互斥鎖的目的

能夠保證多個執行緒訪問共享資料不會出現資源競爭及資料錯誤

上鎖、解鎖過程

當一個執行緒呼叫鎖的acquire()方法獲得鎖時,鎖就進去了」locked」狀態。

每次只有一個而執行緒可以獲得鎖,如果此時另一個執行緒試圖獲得這個鎖,該執行緒就會變為」blocked」狀態,稱為」阻塞」,直到擁有鎖的執行緒呼叫鎖的release()方法釋放鎖之後,鎖進入」unlocked」狀態。

執行緒排程程式從處於同步阻塞狀態的執行緒中選擇一個來獲得鎖,並使得該執行緒進入執行」running」狀態

死鎖

一直等待對方釋放鎖的情景就是死鎖

根據下標在列表中取值,但是要保證同一時刻只能有一個執行緒去取值

# 死鎖示例:
import time, threading

# 建立互斥鎖
lock = threading.Lock()

def get_value(index):
    # 上鎖
    lock.acquire()
    print(threading.current_thread().name)
    my_list = [3, 6, 8, 1]
    # 判斷下標釋放越界
    if index >= len(my_list):
        print("下標越界:", index)

        return
    value = my_list[index]
    print(value)
    time.sleep(1)
    # 釋放鎖
    lock.release()

if __name__ == '__main__':
    # 模擬大量執行緒去執行取值操作

    for i in range(30):
        sub_thread = threading.Thread(target=get_value, args=(i,))
        sub_thread.start()

避免死鎖:

# 死鎖示例:
import time, threading

# 建立互斥鎖
lock = threading.Lock()

def get_value(index):
    # 上鎖
    lock.acquire()
    print(threading.current_thread().name)
    my_list = [3, 6, 8, 1]
    # 判斷下標釋放越界
    if index >= len(my_list):
        print("下標越界:", index)
        lock.release()
        return
    value = my_list[index]
    print(value)
    time.sleep(1)
    # 釋放鎖
    lock.release()

if __name__ == '__main__':
    # 模擬大量執行緒去執行取值操作

    for i in range(30):
        sub_thread = threading.Thread(target=get_value, args=(i,))
        sub_thread.start()

小結:使用互斥鎖的時候需要注意死鎖的問題,要在合適的地方注意釋放鎖

死鎖一旦發生就會造成應用的停止響應

###個人獨立部落格:wwwlimiao.tech

未經允許不得轉載:頭條楓林網 » Python網路程式設計 —— 執行緒