ch20-03-graceful-shutdown-and-cleanup.md
commit 322899b375d071e4d96aaf29ce25c1a4b4ec65da
示例 20-20 中的代碼如期通過(guò)使用線程池異步的響應(yīng)請(qǐng)求。這里有一些警告說(shuō) workers
、id
和 thread
字段沒有直接被使用,這提醒了我們并沒有清理所有的內(nèi)容。當(dāng)使用不那么優(yōu)雅的 ctrl-c 終止主線程時(shí),所有其他線程也會(huì)立刻停止,即便它們正處于處理請(qǐng)求的過(guò)程中。
現(xiàn)在我們要為 ThreadPool
實(shí)現(xiàn) Drop
trait 對(duì)線程池中的每一個(gè)線程調(diào)用 join
,這樣這些線程將會(huì)執(zhí)行完他們的請(qǐng)求。接著會(huì)為 ThreadPool
實(shí)現(xiàn)一個(gè)告訴線程他們應(yīng)該停止接收新請(qǐng)求并結(jié)束的方式。為了實(shí)踐這些代碼,修改 server 在優(yōu)雅停機(jī)(graceful shutdown)之前只接受兩個(gè)請(qǐng)求。
現(xiàn)在開始為線程池實(shí)現(xiàn) Drop
。當(dāng)線程池被丟棄時(shí),應(yīng)該 join 所有線程以確保他們完成其操作。示例 20-22 展示了 Drop
實(shí)現(xiàn)的第一次嘗試;這些代碼還不能夠編譯:
文件名: src/lib.rs
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
示例 20-22: 當(dāng)線程池離開作用域時(shí) join 每個(gè)線程
這里首先遍歷線程池中的每個(gè) workers
。這里使用了 &mut
因?yàn)?nbsp;self
本身是一個(gè)可變引用而且也需要能夠修改 worker
。對(duì)于每一個(gè)線程,會(huì)打印出說(shuō)明信息表明此特定 worker 正在關(guān)閉,接著在 worker 線程上調(diào)用 join
。如果 join
調(diào)用失敗,通過(guò) unwrap
使得 panic 并進(jìn)行不優(yōu)雅的關(guān)閉。
如下是嘗試編譯代碼時(shí)得到的錯(cuò)誤:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0507]: cannot move out of `worker.thread` which is behind a mutable reference
--> src/lib.rs:52:13
|
52 | worker.thread.join().unwrap();
| ^^^^^^^^^^^^^ move occurs because `worker.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait
For more information about this error, try `rustc --explain E0507`.
error: could not compile `hello` due to previous error
這告訴我們并不能調(diào)用 join
,因?yàn)橹挥忻恳粋€(gè) worker
的可變借用,而 join
獲取其參數(shù)的所有權(quán)。為了解決這個(gè)問(wèn)題,需要一個(gè)方法將 thread
移動(dòng)出擁有其所有權(quán)的 Worker
實(shí)例以便 join
可以消費(fèi)這個(gè)線程。示例 17-15 中我們?cè)娺^(guò)這么做的方法:如果 Worker
存放的是 Option<thread::JoinHandle<()>
,就可以在 Option
上調(diào)用 take
方法將值從 Some
成員中移動(dòng)出來(lái)而對(duì) None
成員不做處理。換句話說(shuō),正在運(yùn)行的 Worker
的 thread
將是 Some
成員值,而當(dāng)需要清理 worker 時(shí),將 Some
替換為 None
,這樣 worker 就沒有可以運(yùn)行的線程了。
為此需要更新 Worker
的定義為如下:
文件名: src/lib.rs
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
現(xiàn)在依靠編譯器來(lái)找出其他需要修改的地方。check 代碼會(huì)得到兩個(gè)錯(cuò)誤:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `join` found for enum `Option` in the current scope
--> src/lib.rs:52:27
|
52 | worker.thread.join().unwrap();
| ^^^^ method not found in `Option<JoinHandle<()>>`
error[E0308]: mismatched types
--> src/lib.rs:72:22
|
72 | Worker { id, thread }
| ^^^^^^ expected enum `Option`, found struct `JoinHandle`
|
= note: expected enum `Option<JoinHandle<()>>`
found struct `JoinHandle<_>`
help: try wrapping the expression in `Some`
|
72 | Worker { id, Some(thread) }
| +++++ +
Some errors have detailed explanations: E0308, E0599.
For more information about an error, try `rustc --explain E0308`.
error: could not compile `hello` due to 2 previous errors
讓我們修復(fù)第二個(gè)錯(cuò)誤,它指向 Worker::new
結(jié)尾的代碼;當(dāng)新建 Worker
時(shí)需要將 thread
值封裝進(jìn) Some
。做出如下改變以修復(fù)問(wèn)題:
文件名: src/lib.rs
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
// --snip--
Worker {
id,
thread: Some(thread),
}
}
}
第一個(gè)錯(cuò)誤位于 Drop
實(shí)現(xiàn)中。之前提到過(guò)要調(diào)用 Option
上的 take
將 thread
移動(dòng)出 worker
。如下改變會(huì)修復(fù)問(wèn)題:
文件名: src/lib.rs
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
如第十七章我們見過(guò)的,Option
上的 take
方法會(huì)取出 Some
而留下 None
。使用 if let
解構(gòu) Some
并得到線程,接著在線程上調(diào)用 join
。如果 worker 的線程已然是 None
,就知道此時(shí)這個(gè) worker 已經(jīng)清理了其線程所以無(wú)需做任何操作。
有了所有這些修改,代碼就能編譯且沒有任何警告。不過(guò)也有壞消息,這些代碼還不能以我們期望的方式運(yùn)行。問(wèn)題的關(guān)鍵在于 Worker
中分配的線程所運(yùn)行的閉包中的邏輯:調(diào)用 join
并不會(huì)關(guān)閉線程,因?yàn)樗麄円恢?nbsp;loop
來(lái)尋找任務(wù)。如果采用這個(gè)實(shí)現(xiàn)來(lái)嘗試丟棄 ThreadPool
,則主線程會(huì)永遠(yuǎn)阻塞在等待第一個(gè)線程結(jié)束上。
為了修復(fù)這個(gè)問(wèn)題,修改線程既監(jiān)聽是否有 Job
運(yùn)行也要監(jiān)聽一個(gè)應(yīng)該停止監(jiān)聽并退出無(wú)限循環(huán)的信號(hào)。所以信道將發(fā)送這個(gè)枚舉的兩個(gè)成員之一而不是 Job
實(shí)例:
文件名: src/lib.rs
enum Message {
NewJob(Job),
Terminate,
}
Message
枚舉要么是存放了線程需要運(yùn)行的 Job
的 NewJob
成員,要么是會(huì)導(dǎo)致線程退出循環(huán)并終止的 Terminate
成員。
同時(shí)需要修改信道來(lái)使用 Message
類型值而不是 Job
,如示例 20-23 所示:
文件名: src/lib.rs
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Message>,
}
// --snip--
impl ThreadPool {
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(Message::NewJob(job)).unwrap();
}
}
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
let thread = thread::spawn(move || loop {
let message = receiver.lock().unwrap().recv().unwrap();
match message {
Message::NewJob(job) => {
println!("Worker {} got a job; executing.", id);
job();
}
Message::Terminate => {
println!("Worker {} was told to terminate.", id);
break;
}
}
});
Worker {
id,
thread: Some(thread),
}
}
}
示例 20-23: 收發(fā) Message
值并在 Worker
收到 Message::Terminate
時(shí)退出循環(huán)
為了適用 Message
枚舉需要將兩個(gè)地方的 Job
修改為 Message
:ThreadPool
的定義和 Worker::new
的簽名。ThreadPool
的 execute
方法需要發(fā)送封裝進(jìn) Message::NewJob
成員的任務(wù)。然后,在 Worker::new
中當(dāng)從信道接收 Message
時(shí),當(dāng)獲取到 NewJob
成員會(huì)處理任務(wù)而收到 Terminate
成員則會(huì)退出循環(huán)。
通過(guò)這些修改,代碼再次能夠編譯并繼續(xù)按照示例 20-20 之后相同的行為運(yùn)行。不過(guò)還是會(huì)得到一個(gè)警告,因?yàn)椴]有創(chuàng)建任何 Terminate
成員的消息。如示例 20-24 所示修改 Drop
實(shí)現(xiàn)來(lái)修復(fù)此問(wèn)題:
文件名: src/lib.rs
impl Drop for ThreadPool {
fn drop(&mut self) {
println!("Sending terminate message to all workers.");
for _ in &self.workers {
self.sender.send(Message::Terminate).unwrap();
}
println!("Shutting down all workers.");
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
示例 20-24:在對(duì)每個(gè) worker 線程調(diào)用 join
之前向 worker 發(fā)送 Message::Terminate
現(xiàn)在遍歷了 worker 兩次,一次向每個(gè) worker 發(fā)送一個(gè) Terminate
消息,一個(gè)調(diào)用每個(gè) worker 線程上的 join
。如果嘗試在同一循環(huán)中發(fā)送消息并立即 join 線程,則無(wú)法保證當(dāng)前迭代的 worker 是從信道收到終止消息的 worker。
為了更好的理解為什么需要兩個(gè)分開的循環(huán),想象一下只有兩個(gè) worker 的場(chǎng)景。如果在一個(gè)單獨(dú)的循環(huán)中遍歷每個(gè) worker,在第一次迭代中向信道發(fā)出終止消息并對(duì)第一個(gè) worker 線程調(diào)用 join
。如果此時(shí)第一個(gè) worker 正忙于處理請(qǐng)求,那么第二個(gè) worker 會(huì)收到終止消息并停止。我們會(huì)一直等待第一個(gè) worker 結(jié)束,不過(guò)它永遠(yuǎn)也不會(huì)結(jié)束因?yàn)榈诙€(gè)線程接收了終止消息。死鎖!
為了避免此情況,首先在一個(gè)循環(huán)中向信道發(fā)出所有的 Terminate
消息,接著在另一個(gè)循環(huán)中 join 所有的線程。每個(gè) worker 一旦收到終止消息即會(huì)停止從信道接收消息,意味著可以確保如果發(fā)送同 worker 數(shù)相同的終止消息,在 join 之前每個(gè)線程都會(huì)收到一個(gè)終止消息。
為了實(shí)踐這些代碼,如示例 20-25 所示修改 main
在優(yōu)雅停機(jī) server 之前只接受兩個(gè)請(qǐng)求:
文件名: src/bin/main.rs
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2) {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
println!("Shutting down.");
}
示例 20-25: 在處理兩個(gè)請(qǐng)求之后通過(guò)退出循環(huán)來(lái)停止 server
你不會(huì)希望真實(shí)世界的 web server 只處理兩次請(qǐng)求就停機(jī)了,這只是為了展示優(yōu)雅停機(jī)和清理處于正常工作狀態(tài)。
take
方法定義于 Iterator
trait,這里限制循環(huán)最多頭 2 次。ThreadPool
會(huì)在 main
的結(jié)尾離開作用域,而且還會(huì)看到 drop
實(shí)現(xiàn)的運(yùn)行。
使用 cargo run
啟動(dòng) server,并發(fā)起三個(gè)請(qǐng)求。第三個(gè)請(qǐng)求應(yīng)該會(huì)失敗,而終端的輸出應(yīng)該看起來(lái)像這樣:
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
Finished dev [unoptimized + debuginfo] target(s) in 1.0s
Running `target/debug/main`
Worker 0 got a job; executing.
Worker 3 got a job; executing.
Shutting down.
Sending terminate message to all workers.
Shutting down all workers.
Shutting down worker 0
Worker 1 was told to terminate.
Worker 2 was told to terminate.
Worker 0 was told to terminate.
Worker 3 was told to terminate.
Shutting down worker 1
Shutting down worker 2
Shutting down worker 3
可能會(huì)出現(xiàn)不同順序的 worker 和信息輸出??梢詮男畔⒅锌吹椒?wù)是如何運(yùn)行的: worker 0 和 worker 3 獲取了頭兩個(gè)請(qǐng)求,接著在第三個(gè)請(qǐng)求時(shí),我們停止接收連接。當(dāng) ThreadPool
在 main
的結(jié)尾離開作用域時(shí),其 Drop
實(shí)現(xiàn)開始工作,線程池通知所有線程終止。每個(gè) worker 在收到終止消息時(shí)會(huì)打印出一個(gè)信息,接著線程池調(diào)用 join
來(lái)終止每一個(gè) worker 線程。
這個(gè)特定的運(yùn)行過(guò)程中一個(gè)有趣的地方在于:注意我們向信道中發(fā)出終止消息,而在任何線程收到消息之前,就嘗試 join worker 0 了。worker 0 還沒有收到終止消息,所以主線程阻塞直到 worker 0 結(jié)束。與此同時(shí),每一個(gè)線程都收到了終止消息。一旦 worker 0 結(jié)束,主線程就等待其他 worker 結(jié)束,此時(shí)他們都已經(jīng)收到終止消息并能夠停止了。
恭喜!現(xiàn)在我們完成了這個(gè)項(xiàng)目,也有了一個(gè)使用線程池異步響應(yīng)請(qǐng)求的基礎(chǔ) web server。我們能對(duì) server 執(zhí)行優(yōu)雅停機(jī),它會(huì)清理線程池中的所有線程。
如下是完整的代碼參考:
文件名: src/bin/main.rs
use hello::ThreadPool;
use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
use std::thread;
use std::time::Duration;
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
println!("Shutting down.");
}
fn handle_connection(mut stream: TcpStream) {
let mut buffer = [0; 1024];
stream.read(&mut buffer).unwrap();
let get = b"GET / HTTP/1.1\r\n";
let sleep = b"GET /sleep HTTP/1.1\r\n";
let (status_line, filename) = if buffer.starts_with(get) {
("HTTP/1.1 200 OK", "hello.html")
} else if buffer.starts_with(sleep) {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
} else {
("HTTP/1.1 404 NOT FOUND", "404.html")
};
let contents = fs::read_to_string(filename).unwrap();
let response = format!(
"{}\r\nContent-Length: {}\r\n\r\n{}",
status_line,
contents.len(),
contents
);
stream.write(response.as_bytes()).unwrap();
stream.flush().unwrap();
}
文件名: src/lib.rs
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Message>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
enum Message {
NewJob(Job),
Terminate,
}
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(Message::NewJob(job)).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
println!("Sending terminate message to all workers.");
for _ in &self.workers {
self.sender.send(Message::Terminate).unwrap();
}
println!("Shutting down all workers.");
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
let thread = thread::spawn(move || loop {
let message = receiver.lock().unwrap().recv().unwrap();
match message {
Message::NewJob(job) => {
println!("Worker {} got a job; executing.", id);
job();
}
Message::Terminate => {
println!("Worker {} was told to terminate.", id);
break;
}
}
});
Worker {
id,
thread: Some(thread),
}
}
}
這里還有很多可以做的事!如果你希望繼續(xù)增強(qiáng)這個(gè)項(xiàng)目,如下是一些點(diǎn)子:
ThreadPool
? 和其公有方法增加更多文檔unwrap
? 調(diào)用改為更健壯的錯(cuò)誤處理ThreadPool
? 進(jìn)行其他不同于處理網(wǎng)絡(luò)請(qǐng)求的任務(wù)好極了!你結(jié)束了本書的學(xué)習(xí)!由衷感謝你同我們一道加入這次 Rust 之旅?,F(xiàn)在你已經(jīng)準(zhǔn)備好出發(fā)并實(shí)現(xiàn)自己的 Rust 項(xiàng)目并幫助他人了。請(qǐng)不要忘記我們的社區(qū),這里有其他 Rustaceans 正樂于幫助你迎接 Rust 之路上的任何挑戰(zhàn)。
更多建議: