在撰寫此書時(shí),CPU 架構(gòu)的景觀正以幾十年來最快的速度發(fā)生變化。
一個(gè)并發(fā)程序需要同時(shí)處理多個(gè)互不相關(guān)的任務(wù)。考慮一下游戲服務(wù)器的例子:典型做法是將數(shù)十個(gè)組件組合起來,其中的每一個(gè)都與外部有復(fù)雜交互。可能其中某個(gè)組件負(fù)責(zé)多個(gè)用戶間聊天;另外一些負(fù)責(zé)處理玩家的輸入,并且將更新后的狀態(tài)返回給客戶端;同時(shí)還有其他程序執(zhí)行物理計(jì)算。
并發(fā)程序的正確運(yùn)轉(zhuǎn)并不需要多核,盡管多核可以提高執(zhí)行效率和響應(yīng)速度。
相比之下,一個(gè)并行程序僅解決一個(gè)單獨(dú)的問題。假設(shè)一個(gè)金融模型嘗試計(jì)算并預(yù)測下一分鐘某支股票的價(jià)格波動。如果想在某個(gè)交易所列出的所有股票上執(zhí)行這個(gè)模型,例如計(jì)算一下那些股票應(yīng)該買入或賣出,我們希望在五百個(gè)核上可以比僅有一個(gè)核的時(shí)候跑得更快。這表明,并行程序通常不需要通過多核來保證正確性。
另一個(gè)有效區(qū)分并行和并發(fā)的點(diǎn)在于他們?nèi)绾闻c外部世界交互。由定義,并發(fā)程序連續(xù)不斷的處理網(wǎng)絡(luò)協(xié)議和數(shù)據(jù)庫之類的東西。典型的并行程序可能更專注:其接收流入的數(shù)據(jù),咀嚼一會兒(間或有點(diǎn) I/O),然后將需要返回的數(shù)據(jù)流吐出來。
許多傳統(tǒng)編程語言進(jìn)一步模糊了并發(fā)和并行之間已經(jīng)難以辨認(rèn)的邊界,這些語言強(qiáng)制程序員使用相同的基礎(chǔ)設(shè)施投監(jiān)這兩種程序。
本章將涉及在單個(gè)操作系統(tǒng)進(jìn)程內(nèi)進(jìn)行并發(fā)和并行編程。
作為并發(fā)編程的基礎(chǔ),大多數(shù)語言提供了創(chuàng)建多個(gè)多線程的方法。 Haskell
也不例外,盡管使用 Haskell
進(jìn)行線程編程看起來和其他語言有些不同。
In Haskell, a thread is an IO action that executes independently from other threads. To create a thread, we import the Control.Concurrent module and use the forkIO functionHaskell
中,線程是互相獨(dú)立的 IO
動作。為創(chuàng)建線程,需要導(dǎo)入 Control.Concurrent
模塊并使用其中的 forkIO
函數(shù)
ghci> :m +Control.Concurrent
ghci> :t forkIO
forkIO :: IO () -> IO ThreadId
ghci> :m +System.Directory
ghci> forkIO (writeFile "xyzzy" "seo craic nua!") >> doesFileExist "xyzzy"
True
新線程幾乎立即開始執(zhí)行,創(chuàng)建它的線程同時(shí)繼續(xù)向下執(zhí)行。新線程將在它的 IO
動作結(jié)束后停止執(zhí)行。
GHC 的運(yùn)行時(shí)組件并不按特定順序執(zhí)行多個(gè)線程。所以,上面的例子中,文件 xyzzy 的創(chuàng)建時(shí)間在初始線程檢查其是否存在之前或之后都有可能。如果刪除 xyzzy 并且再執(zhí)行一次,我們可能得到完全相反的結(jié)果。
假設(shè)我們要將一個(gè)大文件壓縮并寫入磁盤,但是希望快速處理用戶輸入以使他們感覺程序是立即響應(yīng)的。如果使用 forkIO
來開啟一個(gè)單獨(dú)的線程去寫文件,這樣就可以同時(shí)做這兩件事。
-- file: ch24/Compressor.hs
import Control.Concurrent (forkIO)
import Control.Exception (handle)
import Control.Monad (forever)
import qualified Data.ByteString.Lazy as L
import System.Console.Readline (readline)
-- http://hackage.haskell.org/ 上的 zlib 包提供了壓縮功能
import Codec.Compression.GZip (compress)
main = do
maybeLine <- readline "Enter a file to compress> "
case maybeLine of
Nothing -> return () -- 用戶輸入了 EOF
Just "" -> return () -- 不輸入名字按 “想要退出” 處理
Just name -> do
handle
(print :: (SomeException->IO ()))
$ do
content <- L.readFile name
forkIO (compressFile name content)
return ()
main
where compressFile path = L.writeFile (path ++ ".gz") . compress
因?yàn)槭褂昧硕栊缘?ByteString
I/O ,主線程中做僅僅是打開文件。真正讀取文件內(nèi)容發(fā)生在子線程中。
當(dāng)用戶輸入的文件名并不存在時(shí)將發(fā)生異常, handle (print :: (SomeException-> IO ()))
是一個(gè)低成本的打印錯(cuò)誤信息的方式。
在兩個(gè)線程之間共享信息最簡單的方法是,讓它們使用同一個(gè)變量。上面文件壓縮的例子中, main
線程與子線程共享了文件名和文件內(nèi)容。 Haskell
的數(shù)據(jù)默認(rèn)是不可變的,所以這樣共享不會有問題,兩個(gè)線程都無法修改另一個(gè)線程中的文件名和文件內(nèi)容。
線程經(jīng)常需要和其他線程進(jìn)行活躍的通信。例如, GHC
沒有提供查看其他線程是否還在執(zhí)行、執(zhí)行完畢、或者崩潰的方法 [54] ??墒?,其提供了同步變量類型, MVar
,我們可以通過它自己實(shí)現(xiàn)上述功能。
MVar
的行為類似一個(gè)單元素的箱子:其可以為滿或空。將一些東西扔進(jìn)箱子,使其填滿,或者從中拿出一些東西,使其變空。
ghci> :t putMVar
putMVar :: MVar a -> a -> IO ()
ghci> :t takeMVar
takeMVar :: MVar a -> IO a
嘗試將一個(gè)值放入非空的 MVar
,將會導(dǎo)致線程休眠直到其他線程從其中拿走一個(gè)值使其變空。類似的,如果嘗試從一個(gè)空的 MVar
取出一個(gè)值,線程也將休眠,直到其他線程向其中放入一個(gè)值。
-- file: ch24/MVarExample.hs
import Control.Concurrent
communicate = do
m <- newEmptyMVar
forkIO $ do
v <- takeMVar m
putStrLn ("received " ++ show v)
putStrLn "sending"
putMVar m "wake up!"
newEmptyMVar
函數(shù)的作用從其名字一目了然。要創(chuàng)建一個(gè)初始狀態(tài)非空的 MVar
,需要使用 newMVar
。
ghci> :t newEmptyMVar
newEmptyMVar :: IO (MVar a)
ghci> :t newMVar
newMVar :: a -> IO (MVar a)
在 ghci
運(yùn)行一下上面例子。
ghci> :load MVarExample
[1 of 1] Compiling Main ( MVarExample.hs, interpreted )
Ok, modules loaded: Main.
ghci> communicate
sending
rece
如果有使用傳統(tǒng)編程語言編寫并發(fā)程序的經(jīng)驗(yàn),你會想到 MVar
有助于實(shí)現(xiàn)兩個(gè)熟悉的效果。
- 從一個(gè)線程向另一個(gè)線程發(fā)送消息,例如:一個(gè)提醒。
- 對線程間共享的可變數(shù)據(jù)提供互斥。在數(shù)據(jù)沒有被任何線程使用時(shí),將其放入
MVar
,某線程需要讀取或改變它時(shí),將其臨時(shí)從中取出。
GHC 的運(yùn)行時(shí)系統(tǒng)對主線程的控制與其他線程不同。主線程結(jié)束時(shí),運(yùn)行時(shí)系統(tǒng)認(rèn)為整個(gè)程序已經(jīng)跑完了。其他沒有執(zhí)行完畢的線程,會被強(qiáng)制終止。
所以,如果線程執(zhí)行時(shí)間非常長,且必須不被殺死,必須對主線程做特殊安排,以使得主線程在其他線程完成前都不退出。讓我們來開發(fā)一個(gè)小庫實(shí)現(xiàn)這一點(diǎn)。
-- file: ch24/NiceFork.hs
import Control.Concurrent
import Control.Exception (Exception, try)
import qualified Data.Map as M
data ThreadStatus = Running
| Finished -- 正常退出
| Threw Exception -- 被未捕獲的異常終結(jié)
deriving (Eq, Show)
-- | 創(chuàng)建一個(gè)新線程管理器
newManager :: IO ThreadManager
-- | 創(chuàng)建一個(gè)被管理的線程
forkManaged :: ThreadManager -> IO () -> IO ThreadId
-- | 立即返回一個(gè)被管理線程的狀態(tài)
getStatus :: ThreadManager -> ThreadId -> IO (Maybe ThreadStatus)
-- | 阻塞,直到某個(gè)特定的被管理線程終結(jié)
waitFor :: ThreadManager -> ThreadId -> IO (Maybe ThreadStatus)
-- | 阻塞,直到所有被管理線程終結(jié)
waitAll :: ThreadManager -> IO ()
我們使用一個(gè)常見的方法來實(shí)現(xiàn) ThreadManager
的類型抽象:將其包裹進(jìn)一個(gè) newtype
,并防止使用者直接創(chuàng)建這個(gè)類型的值。在模塊的導(dǎo)出聲明中,我們列出了一個(gè)創(chuàng)建線程管理器的 IO 動作,但是并不直接導(dǎo)出類型構(gòu)造器。
-- file: ch24/NiceFork.hs
module NiceFork
(
ThreadManager
, newManager
, forkManaged
, getStatus
, waitFor
, waitAll
) where
ThreadManager
的實(shí)現(xiàn)中維護(hù)了一個(gè)線程 ID 到線程狀態(tài)的 map 。我們將此作為線程 map 。
-- file: ch24/NiceFork.hs
newtype ThreadManager =
Mgr (MVar (M.Map ThreadId (MVar ThreadStatus)))
deriving (Eq)
newManager = Mgr `fmap` newMVar M.empty
此處使用了兩層 MVar
。首先將 Map
保存在 MVar 中。這將允許通過使用新版本替換來“改變” map 中的值。同樣確保了每個(gè)使用這個(gè) Map
的線程可以看到一致的內(nèi)容。
對每個(gè)被管理的線程,都維護(hù)一個(gè)對應(yīng)的 MVar
。這種 MVar
從空狀態(tài)開始,表示這個(gè)線程正在執(zhí)行。當(dāng)線程被殺死或者發(fā)生未處理異常導(dǎo)致退出時(shí),我們將此類信息寫入這個(gè) MVar
。
為了創(chuàng)建一個(gè)線程并觀察它的狀態(tài),必須做一點(diǎn)簿記。
-- file: ch24/NiceFork.hs
forkManaged (Mgr mgr) body =
modifyMVar mgr $ \m -> do
state <- newEmptyMVar
tid <- forkIO $ do
result <- try body
putMVar state (either Threw (const Finished) result)
return (M.insert tid state m, tid)
forkManaged
中使用的 modifyMVar
函數(shù)很實(shí)用:它將 takeMVar
和 putMVar
安全的組合在一起。
ghci> :t modifyMVar
modifyMVar :: MVar a -> (a -> IO (a, b)) -> IO b
其從一個(gè) MVar
中取出一個(gè)值,并傳入一個(gè)函數(shù)。這個(gè)函數(shù)生成一個(gè)新的值,且返回一個(gè)結(jié)果。如果函數(shù)拋出一個(gè)異常, modifyMVar
會將初始值重新放回 MVar
,否則其會寫入新值。它還會返回另一個(gè)返回值。
使用 modifyMVar
而非手動使用 takeMVar
和 putMVar
管理 MVar
, 可以避免兩類并發(fā)場景下的問題。
- 忘記將一個(gè)值放回
MVar
。有的線程會一直等待MVar
中被放回一個(gè)值,如果一致沒有等到,就將導(dǎo)致死鎖。- 沒有考慮可能出現(xiàn)的異常,擾亂了某端代碼的控制流。這可能導(dǎo)致一個(gè)本應(yīng)執(zhí)行的
putMVar
沒有執(zhí)行,進(jìn)而導(dǎo)致死鎖。
因?yàn)檫@些美妙的安全特性,盡可能的使用 modifyMVar
是明智的選擇。
modifyMVar
遵循的模式適用很多場景。下面是這些模式:
- 獲得一份資源。
- 將資源傳入一個(gè)將處理它函數(shù)。
- 始終釋放資源,即使函數(shù)拋出異常。如果發(fā)生異常,重新拋出異常,以便使其被程序捕獲。
除了安全性,這個(gè)方法還有其他好處:可以是代碼更簡短且容易理解。正如前面的 forkManaged
, Hakell
的簡潔語法和匿名函數(shù)使得這種風(fēng)格的代碼看起來一點(diǎn)都不刺眼。
下面是 modifyMVar
的定義,從中可以了解這個(gè)模式的細(xì)節(jié):
-- file: ch24/ModifyMVar.hs
import Control.Concurrent (MVar, putMVar, takeMVar)
import Control.Exception (block, catch, throw, unblock)
import Prelude hiding (catch) -- use Control.Exception's version
modifyMVar :: MVar a -> (a -> IO (a,b)) -> IO b
modifyMVar m io =
block $ do
a <- takeMVar m
(b,r) <- unblock (io a) `catch` \e ->
putMVar m a >> throw e
putMVar m b
return r
這種模式很容易用于你的特定需求,無論是處理網(wǎng)絡(luò)連接,數(shù)據(jù)庫句柄,或者被 C
庫函數(shù)管理的數(shù)據(jù)。
我們編寫的 getStatus
函數(shù)用于獲取某個(gè)線程的當(dāng)前狀態(tài)。若某線程已經(jīng)不被管理(或者未被管理),它返回 Nothing
。
-- file: ch24/NiceFork.hs
getStatus (Mgr mgr) tid =
modifyMVar mgr $ \m ->
case M.lookup tid m of
Nothing -> return (m, Nothing)
Just st -> tryTakeMVar st >>= \mst -> case mst of
Nothing -> return (m, Just Running)
Just sth -> return (M.delete tid m, Just sth)
若線程仍在運(yùn)行,它返回 Just Running
。 否則,它指出將線程為何被終止,并停止管理這個(gè)線程。
若 tryTakeMVar
函數(shù)發(fā)現(xiàn) MVar 為空,它將立即返回 Nothing
而非阻塞等待。
ghci> :t tryTakeMVar
tryTakeMVar :: MVar a -> IO (Maybe a)
否則,它將從 MVar 取到一個(gè)值。
waitFor
函數(shù)的行為較簡單,其會阻塞等待給定線程終止,而非立即返回。
-- file: ch24/NiceFork.hs
waitFor (Mgr mgr) tid = do
maybeDone <- modifyMVar mgr $ \m ->
return $ case M.updateLookupWithKey (\_ _ -> Nothing) tid m of
(Nothing, _) -> (m, Nothing)
(done, m') -> (m', done)
case maybeDone of
Nothing -> return Nothing
Just st -> Just `fmap` takeMVar st
首先讀取保存線程狀態(tài)的 MVar
,若其存在。 Map
類型的 updateLookupWithKey
函數(shù)很有用:它將查找某個(gè)值與更新或移除組合起來。
ghci> :m +Data.Map
ghci> :t updateLookupWithKey
updateLookupWithKey :: (Ord k) =>
(k -> a -> Maybe a) -> k -> Map k a -> (Maybe a, Map k a)
在此處,我們希望若保存線程狀態(tài)的 MVar
存在,則將其從 Map 中移除,這樣線線程管理器將不在管理這個(gè)線程。若從其中取到了值,則從中取出線程的退出狀態(tài),并將其返回。
我們的最后一個(gè)實(shí)用函數(shù)簡單的等待所有當(dāng)前被管理的線程完成,且忽略他們的退出狀態(tài)。
-- file: ch24/NiceFork.hs
waitAll (Mgr mgr) = modifyMVar mgr elems >>= mapM_ takeMVar
where elems m = return (M.empty, M.elems m)
我們在上面定義的 waitFor
函數(shù)有點(diǎn)不完善,因?yàn)榛蚨嗷蛏賵?zhí)行了重復(fù)的模式分析:在 modifyMVar
內(nèi)部的回調(diào)函數(shù),以及處理其返回值時(shí)。
當(dāng)然,我們可以用一個(gè)函數(shù)消除這種重復(fù)。這是 Control.Monad
模塊中的 join 函數(shù)。
ghci> :m +Control.Monad
ghci> :t join
join :: (Monad m) => m (m a) -> m a
這是個(gè)有趣的主意:可以創(chuàng)建一個(gè) monadic 函數(shù)或純代碼中的 action ,然后一直帶著它直到最終某處有個(gè) monad 可以使用它。一旦我們了解這種寫法適用的場景,就可以更靈活的編寫代碼。
-- file: ch24/NiceFork.hs
waitFor2 (Mgr mgr) tid =
join . modifyMVar mgr $ \m ->
return $ case M.updateLookupWithKey (\_ _ -> Nothing) tid m of
(Nothing, _) -> (m, return Nothing)
(Just st, m') -> (m', Just `fmap` takeMVar st)
對于線程間的一次性通信, MVar
已經(jīng)足夠好了。另一個(gè)類型, Chan
提供了單向通信頻道。此處有一個(gè)使用它的簡單例子。
-- file: ch24/Chan.hs
import Control.Concurrent
import Control.Concurrent.Chan
chanExample = do
ch <- newChan
forkIO $ do
writeChan ch "hello world"
writeChan ch "now i quit"
readChan ch >>= print
readChan ch >>= print
若一個(gè) Chan
未空, readChan
將一直阻塞,直到讀到一個(gè)值。 writeChan
函數(shù)從不阻塞:它會立即將一個(gè)值寫入 Chan
。
正如大多數(shù) Haskell
容器類型, MVar
和 Char
都是非嚴(yán)格的:從不對其內(nèi)容求值。我們提到它,并非因?yàn)檫@是一個(gè)問題,而是因?yàn)檫@通常是一個(gè)盲點(diǎn):人們傾向于假設(shè)這些類型是嚴(yán)格的,這大概是因?yàn)樗鼈儽挥迷?IO monad
中。
正如其他容器類型,誤認(rèn)為 MVar
和 Chan
是嚴(yán)格的會導(dǎo)致空間和性能的泄漏。考慮一下這個(gè)很可能發(fā)生的情況:
我們分離一個(gè)線程以在另一個(gè)核上執(zhí)行一些開銷較大的計(jì)算
-- file: ch24/Expensive.hs
import Control.Concurrent
notQuiteRight = do
mv <- newEmptyMVar
forkIO $ expensiveComputation_stricter mv
someOtherActivity
result <- takeMVar mv
print result
它看上去做了一些事情并將結(jié)果存入 MVar
。
-- file: ch24/Expensive.hs
expensiveComputation mv = do
let a = "this is "
b = "not really "
c = "all that expensive"
putMVar mv (a ++ b ++ c)
當(dāng)我們在父線程中從 MVar
獲取結(jié)果并嘗試用它做些事情時(shí),我們的線程開始瘋狂的計(jì)算,因?yàn)槲覀儚奈磸?qiáng)制指定在其他線程中的計(jì)算真正發(fā)生。
照舊,一旦我們知道了有個(gè)潛在問題,解決方案就很簡單:未分離的線程添加嚴(yán)格性,以確保計(jì)算確實(shí)發(fā)生。這個(gè)嚴(yán)格性最好加在一個(gè)位置,以避免我們忘記添加過它。
-- file: ch24/ModifyMVarStrict.hs
{-# LANGUAGE BangPatterns #-}
import Control.Concurrent (MVar, putMVar, takeMVar)
import Control.Exception (block, catch, throw, unblock)
import Prelude hiding (catch) -- 使用 Control.Exception's 中的 catch 而非 Prelude 中的。
modifyMVar_strict :: MVar a -> (a -> IO a) -> IO ()
modifyMVar_strict m io = block $ do
a <- takeMVar m
!b <- unblock (io a) `catch` \e ->
putMVar m a >> throw e
putMVar m b
Note
查看 Hackage
始終是值得的。
在 Hackage
包數(shù)據(jù)庫,你將發(fā)現(xiàn)一個(gè)庫,strict-concurrency
,它提供了嚴(yán)格版本的 MVar
和 Chan
類型
上面代碼中的 !
模式用起來很簡單,但是并不總是足以確保我們的數(shù)據(jù)已經(jīng)被求值。更完整的方法,請查看下面的段落“從求值中分離算法”。
因?yàn)?writeChan
總是立即成功,所以在使用 Chan
時(shí)有潛在風(fēng)險(xiǎn)。若對某個(gè) Chan
的寫入多于其讀取, Chan
將用不檢查的方法增長:對未讀消息的讀取將遠(yuǎn)遠(yuǎn)落后于其增長。
盡管 Haskell 擁有與其他語言不同的基礎(chǔ)設(shè)施用于線程間共享數(shù)據(jù),它仍需克服相同的基本問題:編寫正確的并發(fā)程序極端困難。真的,一些其他語言中的并發(fā)編程陷阱也會在 Haskell
中出現(xiàn)。其中為人熟知的兩個(gè)是死鎖和饑餓。
死鎖的情況下,兩個(gè)或多個(gè)線程永遠(yuǎn)卡在爭搶共享資源的訪問權(quán)上。制造多線程程序死鎖的一個(gè)經(jīng)典方法是不按順序加鎖。這種類型的 bug 很常見,它有個(gè)名字:鎖順序倒置。 Haskell
沒有提供鎖, 但 MVar
類型可能會有順序倒置問題。這有一個(gè)簡單例子:
-- file: ch24/LockHierarchy.hs
import Control.Concurrent
nestedModification outer inner = do
modifyMVar_ outer $ \x -> do
yield -- 強(qiáng)制當(dāng)前線程讓出 CPU
modifyMVar_ inner $ \y -> return (y + 1)
return (x + 1)
putStrLn "done"
main = do
a <- newMVar 1
b <- newMVar 2
forkIO $ nestedModification a b
forkIO $ nestedModification b a
在 ghci 中運(yùn)行這段程序,它通常會(但不總是)不打印任何信息,表明兩個(gè)線程已經(jīng)卡住了。
容易看出 nestedModification
函數(shù)的問題。在第一個(gè)線程中,我們先取出 MVar a
,接著取出 b
。在第二個(gè)線程中,先取出 b
然后取出 a
,若第一個(gè)線程成功取出了 a
然后要取出 b
,這是兩個(gè)線程都會阻塞:每個(gè)線程都嘗試獲取一個(gè) MVar
,而這個(gè) MVar
已經(jīng)被另一個(gè)線程取空了,所以二者都不能完成整個(gè)流程。
無論何種語言,通常解決倒序問題的方法是申請資源時(shí)一直遵循一致的順序。因?yàn)檫@需要人工遵循編碼規(guī)范,在實(shí)踐中很容易遺忘。
更麻煩的是,這種倒序問題在實(shí)際代碼中很難被發(fā)現(xiàn)。獲取 MVar
的動作經(jīng)常跨越不同文件中的不同函數(shù),這使得通過觀察源碼檢查時(shí)更加棘手。更糟糕的是,這類問題通常是間歇性的,這使得它們難于重現(xiàn),更不要說隔離和修復(fù)了。
并發(fā)軟件通常可能會導(dǎo)致饑餓問題,某個(gè)線程霸占了共享資源,阻止其他線程使用。很容易想象這是如何發(fā)生的:一個(gè)線程調(diào)用 modifyMVar
執(zhí)行一個(gè) 100 毫秒的代碼段,稍后另外一個(gè)線程對同一個(gè) MVar
調(diào)用 modifyMVar
執(zhí)行一個(gè) 1 毫秒的代碼段。第二個(gè)線程在第一個(gè)線程完成前將無法執(zhí)行。
MVar
類型的非嚴(yán)格性質(zhì)使會導(dǎo)致或惡化饑餓的問題。若我們將一個(gè)求值開銷很大的 thunk
寫入一個(gè) MVar
,在一個(gè)看上去開銷較小的線程中取出并求值,這個(gè)線程的執(zhí)行開銷馬上會變大。所以我們在 “MVar 和 Chan 是非嚴(yán)格的” 一章中特地給出了一些建議。
幸運(yùn)的是,我們已經(jīng)提及的并發(fā) API
并不是故事的全部。最近加入 Haskell 中的一個(gè)設(shè)施,軟件事務(wù)內(nèi)存,使用起來更加容易和安全。我們將在第 28 章,軟件事務(wù)內(nèi)存中介紹。
Chan
類型是使用 MVar
實(shí)現(xiàn)的。使用 MVar
來開發(fā)一個(gè)有邊界的 Chan
庫。Int
參數(shù),限制單獨(dú) BoundedChan
中的未讀消息數(shù)量。writeBoundedChanfunction
要被阻塞,知道某個(gè)讀取者使用 readBoundedChan
函數(shù)消費(fèi)掉隊(duì)列中的一個(gè)值。strict-concurrency
包,試著自己開發(fā)一個(gè),作為內(nèi)置 MVar
類型的包裝。按照經(jīng)典的 Haskell
實(shí)踐,使你的庫類型安全,讓用戶不會混淆嚴(yán)格和非嚴(yán)格的 MVar
。默認(rèn)情況下, GHC
生成的程序只使用一個(gè)核,甚至在編寫并發(fā)代碼時(shí)也是如此。要使用多核,我們必須明確指定。當(dāng)生成可執(zhí)行程序時(shí),要在鏈接階段指定這一點(diǎn)。
- “non-threaded” 運(yùn)行時(shí)庫在一個(gè)操作系統(tǒng)線程中運(yùn)行所有
Haskell
線程。這個(gè)運(yùn)行時(shí)在創(chuàng)建線程和通過 MVar 傳遞數(shù)據(jù)時(shí)很高效。- “threaded” 庫使用多個(gè)操作系統(tǒng)線程運(yùn)行
Haskell
線程。它在創(chuàng)建線程和使用MVar
時(shí)具有更高的開銷。
若我們在向編譯器傳遞 -threadedoption
參數(shù),它將使用 threaded
運(yùn)行時(shí)庫鏈接我們的程序。在編譯庫和源碼文件時(shí)無需指定 -threaded
,只是在最終生成可執(zhí)行文件時(shí)需要指定。
即使為程序指定了 threaded
運(yùn)行時(shí),默認(rèn)情況下它仍將只使用一個(gè)核運(yùn)行。必須明確告訴運(yùn)行時(shí)使用多少個(gè)核。
運(yùn)行程序時(shí)可以向 GHC 的運(yùn)行時(shí)系統(tǒng)傳遞命令行參數(shù)。在將控制權(quán)交給我們的代碼前,運(yùn)行時(shí)掃描程序的參數(shù),看是否有命令行選項(xiàng) +RTS
。其后跟隨的所有選項(xiàng)都被運(yùn)行時(shí)解釋,直到特殊的選項(xiàng) -RTS
,這些選項(xiàng)都是提供給運(yùn)行時(shí)系統(tǒng)的,不為我們的程序。運(yùn)行時(shí)會對我們的代碼隱藏所有這些選項(xiàng)。當(dāng)我們使用 System.Environment
模塊的 getArgsfunction
來獲得我們的命令行參數(shù)是,我們不會在其中獲得運(yùn)行時(shí)選項(xiàng)。
threaded
運(yùn)行時(shí)接受參數(shù) -N
[55] 。 其接受一個(gè)參數(shù),指定了 GHC
的運(yùn)行時(shí)系統(tǒng)將使用的核數(shù)。這個(gè)選項(xiàng)對輸入很挑剔: -N
和參數(shù)之間必須沒有空格。 -N4
可被接受, -N 4
則不被接受。
GHC.Conc
模塊輸出一個(gè)變量, numCapabilities
,它會告訴我們運(yùn)行時(shí)系統(tǒng)被 -NRTS
選項(xiàng)指定了多少核。
-- file: ch24/NumCapabilities.hs
import GHC.Conc (numCapabilities)
import System.Environment (getArgs)
main = do
args <- getArgs
putStrLn $ "command line arguments: " ++ show args
putStrLn $ "number of cores: " ++ show numCapabilitie
若編譯上面的程序,我們可以看到運(yùn)行時(shí)系統(tǒng)的選項(xiàng)對于程序來說是不可見的,但是它可以看其運(yùn)行在多少核上。
$ ghc -c NumCapabilities.hs
$ ghc -threaded -o NumCapabilities NumCapabilities.o $ ./NumCapabilities +RTS -N4 -RTS foo
command line arguments: ["foo"]
number of cores: 4
選擇正確的運(yùn)行時(shí)需要花點(diǎn)心思。 threaded
運(yùn)行時(shí)可以使用多核,但是也有相應(yīng)的代價(jià):線程間共享數(shù)據(jù)的成本比 non-threaded
運(yùn)行時(shí)更大。
目前為止, GHC 的 6.8.3 版本使用的垃圾收集器是單線程的:它執(zhí)行時(shí)暫停其他所有線程,而且它是在單核上執(zhí)行。這限制了我們在使用多核的時(shí)候希望看到的性能改進(jìn)[56]_。
很多真實(shí)世界中的并發(fā)程序中,一個(gè)單獨(dú)的線程多數(shù)時(shí)間實(shí)在等待一個(gè)網(wǎng)絡(luò)請求或響應(yīng)。這些情況下,若以一個(gè)單獨(dú)的 Haskell
程序?yàn)閿?shù)萬并發(fā)客戶端提供服務(wù),使用低開銷的 non-threaded
運(yùn)行時(shí)很可能是合適的。例如,與其用 4 個(gè)核跑 threaded 運(yùn)行時(shí)的單個(gè)服務(wù)器程序,可能同時(shí)跑 4 個(gè) non-threaded 運(yùn)行時(shí)的相同服務(wù)器程序性能更好。
我們的目的并不是阻止你使用 threaded
運(yùn)行時(shí)。相對于 non-threaded
運(yùn)行時(shí)它并沒有特別大的開銷:相對于其他編程語言,線程依舊驚人的輕量。我們僅是希望說明 threaded
運(yùn)行時(shí)并不是在所有場景都是最佳選擇。
現(xiàn)在讓我們來關(guān)注一下并行編程。對很多計(jì)算密集型問題,可以通過分解問題,并在多個(gè)核上求值來更快的計(jì)算出結(jié)果。多核計(jì)算機(jī)已經(jīng)普及,甚至在最新的筆記本上都有,但是很少有程序可以利用這一優(yōu)勢。
大部分原因是因?yàn)閭鹘y(tǒng)觀念認(rèn)為并行編程非常困難。在一門典型的編程語言中,我們將用處理并發(fā)程序相同的庫和設(shè)施處理并發(fā)程序。這是我們的注意力集中在處理一些熟悉的問題比如死鎖、競爭條件、饑餓和陡峭的復(fù)雜性。
但是我們可以確定,使用 Haskell
的并發(fā)特性開發(fā)并行代碼時(shí),有許多更簡單的方法。在一個(gè)普通的 Haskell 函數(shù)上稍加變化,就可以并行求值。
更多建議: