国产gaysexchina男同gay,japanrcep老熟妇乱子伦视频,吃奶呻吟打开双腿做受动态图,成人色网站,国产av一区二区三区最新精品

Rust 優(yōu)雅停機(jī)與清理

2023-03-22 15:16 更新
ch20-03-graceful-shutdown-and-cleanup.md
commit 322899b375d071e4d96aaf29ce25c1a4b4ec65da

示例 20-20 中的代碼如期通過(guò)使用線程池異步的響應(yīng)請(qǐng)求。這里有一些警告說(shuō) workers、id 和 thread 字段沒有直接被使用,這提醒了我們并沒有清理所有的內(nèi)容。當(dāng)使用不那么優(yōu)雅的 ctrl-c 終止主線程時(shí),所有其他線程也會(huì)立刻停止,即便它們正處于處理請(qǐng)求的過(guò)程中。

現(xiàn)在我們要為 ThreadPool 實(shí)現(xiàn) Drop trait 對(duì)線程池中的每一個(gè)線程調(diào)用 join,這樣這些線程將會(huì)執(zhí)行完他們的請(qǐng)求。接著會(huì)為 ThreadPool 實(shí)現(xiàn)一個(gè)告訴線程他們應(yīng)該停止接收新請(qǐng)求并結(jié)束的方式。為了實(shí)踐這些代碼,修改 server 在優(yōu)雅停機(jī)(graceful shutdown)之前只接受兩個(gè)請(qǐng)求。

為 ThreadPool 實(shí)現(xiàn) Drop Trait

現(xiàn)在開始為線程池實(shí)現(xiàn) Drop。當(dāng)線程池被丟棄時(shí),應(yīng)該 join 所有線程以確保他們完成其操作。示例 20-22 展示了 Drop 實(shí)現(xiàn)的第一次嘗試;這些代碼還不能夠編譯:

文件名: src/lib.rs

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

示例 20-22: 當(dāng)線程池離開作用域時(shí) join 每個(gè)線程

這里首先遍歷線程池中的每個(gè) workers。這里使用了 &mut 因?yàn)?nbsp;self 本身是一個(gè)可變引用而且也需要能夠修改 worker。對(duì)于每一個(gè)線程,會(huì)打印出說(shuō)明信息表明此特定 worker 正在關(guān)閉,接著在 worker 線程上調(diào)用 join。如果 join 調(diào)用失敗,通過(guò) unwrap 使得 panic 并進(jìn)行不優(yōu)雅的關(guān)閉。

如下是嘗試編譯代碼時(shí)得到的錯(cuò)誤:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0507]: cannot move out of `worker.thread` which is behind a mutable reference
  --> src/lib.rs:52:13
   |
52 |             worker.thread.join().unwrap();
   |             ^^^^^^^^^^^^^ move occurs because `worker.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait

For more information about this error, try `rustc --explain E0507`.
error: could not compile `hello` due to previous error

這告訴我們并不能調(diào)用 join,因?yàn)橹挥忻恳粋€(gè) worker 的可變借用,而 join 獲取其參數(shù)的所有權(quán)。為了解決這個(gè)問(wèn)題,需要一個(gè)方法將 thread 移動(dòng)出擁有其所有權(quán)的 Worker 實(shí)例以便 join 可以消費(fèi)這個(gè)線程。示例 17-15 中我們?cè)娺^(guò)這么做的方法:如果 Worker 存放的是 Option<thread::JoinHandle<()>,就可以在 Option 上調(diào)用 take 方法將值從 Some 成員中移動(dòng)出來(lái)而對(duì) None 成員不做處理。換句話說(shuō),正在運(yùn)行的 Worker 的 thread 將是 Some 成員值,而當(dāng)需要清理 worker 時(shí),將 Some 替換為 None,這樣 worker 就沒有可以運(yùn)行的線程了。

為此需要更新 Worker 的定義為如下:

文件名: src/lib.rs

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

現(xiàn)在依靠編譯器來(lái)找出其他需要修改的地方。check 代碼會(huì)得到兩個(gè)錯(cuò)誤:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `join` found for enum `Option` in the current scope
  --> src/lib.rs:52:27
   |
52 |             worker.thread.join().unwrap();
   |                           ^^^^ method not found in `Option<JoinHandle<()>>`

error[E0308]: mismatched types
  --> src/lib.rs:72:22
   |
72 |         Worker { id, thread }
   |                      ^^^^^^ expected enum `Option`, found struct `JoinHandle`
   |
   = note: expected enum `Option<JoinHandle<()>>`
            found struct `JoinHandle<_>`
help: try wrapping the expression in `Some`
   |
72 |         Worker { id, Some(thread) }
   |                      +++++      +

Some errors have detailed explanations: E0308, E0599.
For more information about an error, try `rustc --explain E0308`.
error: could not compile `hello` due to 2 previous errors

讓我們修復(fù)第二個(gè)錯(cuò)誤,它指向 Worker::new 結(jié)尾的代碼;當(dāng)新建 Worker 時(shí)需要將 thread 值封裝進(jìn) Some。做出如下改變以修復(fù)問(wèn)題:

文件名: src/lib.rs

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        // --snip--

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

第一個(gè)錯(cuò)誤位于 Drop 實(shí)現(xiàn)中。之前提到過(guò)要調(diào)用 Option 上的 take 將 thread 移動(dòng)出 worker。如下改變會(huì)修復(fù)問(wèn)題:

文件名: src/lib.rs

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

如第十七章我們見過(guò)的,Option 上的 take 方法會(huì)取出 Some 而留下 None。使用 if let 解構(gòu) Some 并得到線程,接著在線程上調(diào)用 join。如果 worker 的線程已然是 None,就知道此時(shí)這個(gè) worker 已經(jīng)清理了其線程所以無(wú)需做任何操作。

向線程發(fā)送信號(hào)使其停止接收任務(wù)

有了所有這些修改,代碼就能編譯且沒有任何警告。不過(guò)也有壞消息,這些代碼還不能以我們期望的方式運(yùn)行。問(wèn)題的關(guān)鍵在于 Worker 中分配的線程所運(yùn)行的閉包中的邏輯:調(diào)用 join 并不會(huì)關(guān)閉線程,因?yàn)樗麄円恢?nbsp;loop 來(lái)尋找任務(wù)。如果采用這個(gè)實(shí)現(xiàn)來(lái)嘗試丟棄 ThreadPool ,則主線程會(huì)永遠(yuǎn)阻塞在等待第一個(gè)線程結(jié)束上。

為了修復(fù)這個(gè)問(wèn)題,修改線程既監(jiān)聽是否有 Job 運(yùn)行也要監(jiān)聽一個(gè)應(yīng)該停止監(jiān)聽并退出無(wú)限循環(huán)的信號(hào)。所以信道將發(fā)送這個(gè)枚舉的兩個(gè)成員之一而不是 Job 實(shí)例:

文件名: src/lib.rs

enum Message {
    NewJob(Job),
    Terminate,
}

Message 枚舉要么是存放了線程需要運(yùn)行的 Job 的 NewJob 成員,要么是會(huì)導(dǎo)致線程退出循環(huán)并終止的 Terminate 成員。

同時(shí)需要修改信道來(lái)使用 Message 類型值而不是 Job,如示例 20-23 所示:

文件名: src/lib.rs

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Message>,
}

// --snip--

impl ThreadPool {
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(Message::NewJob(job)).unwrap();
    }
}

// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let message = receiver.lock().unwrap().recv().unwrap();

            match message {
                Message::NewJob(job) => {
                    println!("Worker {} got a job; executing.", id);

                    job();
                }
                Message::Terminate => {
                    println!("Worker {} was told to terminate.", id);

                    break;
                }
            }
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

示例 20-23: 收發(fā) Message 值并在 Worker 收到 Message::Terminate 時(shí)退出循環(huán)

為了適用 Message 枚舉需要將兩個(gè)地方的 Job 修改為 MessageThreadPool 的定義和 Worker::new 的簽名。ThreadPool 的 execute 方法需要發(fā)送封裝進(jìn) Message::NewJob 成員的任務(wù)。然后,在 Worker::new 中當(dāng)從信道接收 Message 時(shí),當(dāng)獲取到 NewJob成員會(huì)處理任務(wù)而收到 Terminate 成員則會(huì)退出循環(huán)。

通過(guò)這些修改,代碼再次能夠編譯并繼續(xù)按照示例 20-20 之后相同的行為運(yùn)行。不過(guò)還是會(huì)得到一個(gè)警告,因?yàn)椴]有創(chuàng)建任何 Terminate 成員的消息。如示例 20-24 所示修改 Drop 實(shí)現(xiàn)來(lái)修復(fù)此問(wèn)題:

文件名: src/lib.rs

impl Drop for ThreadPool {
    fn drop(&mut self) {
        println!("Sending terminate message to all workers.");

        for _ in &self.workers {
            self.sender.send(Message::Terminate).unwrap();
        }

        println!("Shutting down all workers.");

        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

示例 20-24:在對(duì)每個(gè) worker 線程調(diào)用 join 之前向 worker 發(fā)送 Message::Terminate

現(xiàn)在遍歷了 worker 兩次,一次向每個(gè) worker 發(fā)送一個(gè) Terminate 消息,一個(gè)調(diào)用每個(gè) worker 線程上的 join。如果嘗試在同一循環(huán)中發(fā)送消息并立即 join 線程,則無(wú)法保證當(dāng)前迭代的 worker 是從信道收到終止消息的 worker。

為了更好的理解為什么需要兩個(gè)分開的循環(huán),想象一下只有兩個(gè) worker 的場(chǎng)景。如果在一個(gè)單獨(dú)的循環(huán)中遍歷每個(gè) worker,在第一次迭代中向信道發(fā)出終止消息并對(duì)第一個(gè) worker 線程調(diào)用 join。如果此時(shí)第一個(gè) worker 正忙于處理請(qǐng)求,那么第二個(gè) worker 會(huì)收到終止消息并停止。我們會(huì)一直等待第一個(gè) worker 結(jié)束,不過(guò)它永遠(yuǎn)也不會(huì)結(jié)束因?yàn)榈诙€(gè)線程接收了終止消息。死鎖!

為了避免此情況,首先在一個(gè)循環(huán)中向信道發(fā)出所有的 Terminate 消息,接著在另一個(gè)循環(huán)中 join 所有的線程。每個(gè) worker 一旦收到終止消息即會(huì)停止從信道接收消息,意味著可以確保如果發(fā)送同 worker 數(shù)相同的終止消息,在 join 之前每個(gè)線程都會(huì)收到一個(gè)終止消息。

為了實(shí)踐這些代碼,如示例 20-25 所示修改 main 在優(yōu)雅停機(jī) server 之前只接受兩個(gè)請(qǐng)求:

文件名: src/bin/main.rs

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming().take(2) {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }

    println!("Shutting down.");
}

示例 20-25: 在處理兩個(gè)請(qǐng)求之后通過(guò)退出循環(huán)來(lái)停止 server

你不會(huì)希望真實(shí)世界的 web server 只處理兩次請(qǐng)求就停機(jī)了,這只是為了展示優(yōu)雅停機(jī)和清理處于正常工作狀態(tài)。

take 方法定義于 Iterator trait,這里限制循環(huán)最多頭 2 次。ThreadPool 會(huì)在 main 的結(jié)尾離開作用域,而且還會(huì)看到 drop 實(shí)現(xiàn)的運(yùn)行。

使用 cargo run 啟動(dòng) server,并發(fā)起三個(gè)請(qǐng)求。第三個(gè)請(qǐng)求應(yīng)該會(huì)失敗,而終端的輸出應(yīng)該看起來(lái)像這樣:

$ cargo run
   Compiling hello v0.1.0 (file:///projects/hello)
    Finished dev [unoptimized + debuginfo] target(s) in 1.0s
     Running `target/debug/main`
Worker 0 got a job; executing.
Worker 3 got a job; executing.
Shutting down.
Sending terminate message to all workers.
Shutting down all workers.
Shutting down worker 0
Worker 1 was told to terminate.
Worker 2 was told to terminate.
Worker 0 was told to terminate.
Worker 3 was told to terminate.
Shutting down worker 1
Shutting down worker 2
Shutting down worker 3

可能會(huì)出現(xiàn)不同順序的 worker 和信息輸出??梢詮男畔⒅锌吹椒?wù)是如何運(yùn)行的: worker 0 和 worker 3 獲取了頭兩個(gè)請(qǐng)求,接著在第三個(gè)請(qǐng)求時(shí),我們停止接收連接。當(dāng) ThreadPool 在 main 的結(jié)尾離開作用域時(shí),其 Drop 實(shí)現(xiàn)開始工作,線程池通知所有線程終止。每個(gè) worker 在收到終止消息時(shí)會(huì)打印出一個(gè)信息,接著線程池調(diào)用 join 來(lái)終止每一個(gè) worker 線程。

這個(gè)特定的運(yùn)行過(guò)程中一個(gè)有趣的地方在于:注意我們向信道中發(fā)出終止消息,而在任何線程收到消息之前,就嘗試 join worker 0 了。worker 0 還沒有收到終止消息,所以主線程阻塞直到 worker 0 結(jié)束。與此同時(shí),每一個(gè)線程都收到了終止消息。一旦 worker 0 結(jié)束,主線程就等待其他 worker 結(jié)束,此時(shí)他們都已經(jīng)收到終止消息并能夠停止了。

恭喜!現(xiàn)在我們完成了這個(gè)項(xiàng)目,也有了一個(gè)使用線程池異步響應(yīng)請(qǐng)求的基礎(chǔ) web server。我們能對(duì) server 執(zhí)行優(yōu)雅停機(jī),它會(huì)清理線程池中的所有線程。

如下是完整的代碼參考:

文件名: src/bin/main.rs

use hello::ThreadPool;
use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
use std::thread;
use std::time::Duration;

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }

    println!("Shutting down.");
}

fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 1024];
    stream.read(&mut buffer).unwrap();

    let get = b"GET / HTTP/1.1\r\n";
    let sleep = b"GET /sleep HTTP/1.1\r\n";

    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK", "hello.html")
    } else if buffer.starts_with(sleep) {
        thread::sleep(Duration::from_secs(5));
        ("HTTP/1.1 200 OK", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND", "404.html")
    };

    let contents = fs::read_to_string(filename).unwrap();

    let response = format!(
        "{}\r\nContent-Length: {}\r\n\r\n{}",
        status_line,
        contents.len(),
        contents
    );

    stream.write(response.as_bytes()).unwrap();
    stream.flush().unwrap();
}

文件名: src/lib.rs

use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Message>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

enum Message {
    NewJob(Job),
    Terminate,
}

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(Message::NewJob(job)).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        println!("Sending terminate message to all workers.");

        for _ in &self.workers {
            self.sender.send(Message::Terminate).unwrap();
        }

        println!("Shutting down all workers.");

        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let message = receiver.lock().unwrap().recv().unwrap();

            match message {
                Message::NewJob(job) => {
                    println!("Worker {} got a job; executing.", id);

                    job();
                }
                Message::Terminate => {
                    println!("Worker {} was told to terminate.", id);

                    break;
                }
            }
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

這里還有很多可以做的事!如果你希望繼續(xù)增強(qiáng)這個(gè)項(xiàng)目,如下是一些點(diǎn)子:

  • 為 ?ThreadPool? 和其公有方法增加更多文檔
  • 為庫(kù)的功能增加測(cè)試
  • 將 ?unwrap? 調(diào)用改為更健壯的錯(cuò)誤處理
  • 使用 ?ThreadPool? 進(jìn)行其他不同于處理網(wǎng)絡(luò)請(qǐng)求的任務(wù)
  • 在 crates.io 上尋找一個(gè)線程池 crate 并使用它實(shí)現(xiàn)一個(gè)類似的 web server,將其 API 和魯棒性與我們的實(shí)現(xiàn)做對(duì)比

總結(jié)

好極了!你結(jié)束了本書的學(xué)習(xí)!由衷感謝你同我們一道加入這次 Rust 之旅?,F(xiàn)在你已經(jīng)準(zhǔn)備好出發(fā)并實(shí)現(xiàn)自己的 Rust 項(xiàng)目并幫助他人了。請(qǐng)不要忘記我們的社區(qū),這里有其他 Rustaceans 正樂于幫助你迎接 Rust 之路上的任何挑戰(zhàn)。

以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)