作為一個(gè)對(duì) Scala 充滿熱情的開發(fā)者,你應(yīng)該已經(jīng)聽說過 Scala 處理并發(fā)的能力,或許你就是被這個(gè)吸引來的。相較于大多數(shù)編程語言低級(jí)的并發(fā) API,Scala 提供的方法可以讓人們更好的理解并發(fā)以及編寫良構(gòu)的并發(fā)程序。
本章的主題- Future 就是這種方法的兩大基石之一。(另一個(gè)是 actor)我會(huì)解釋 Future 的優(yōu)點(diǎn),以及它的函數(shù)式特征。
如果你想動(dòng)手試試接下來的例子,請(qǐng)確保 Scala 版本不低于 2.9.3,F(xiàn)uture 在 2.10.0 版本中引入,并向后兼容到 2.9.3,最初,它是 Akka 庫的一部分(API略有不同)。
假設(shè)你想準(zhǔn)備一杯卡布奇諾,你可以一個(gè)接一個(gè)的執(zhí)行以下步驟:
轉(zhuǎn)換成 Scala 代碼,可能會(huì)是這樣:
import scala.util.Try
// Some type aliases, just for getting more meaningful method signatures:
type CoffeeBeans = String
type GroundCoffee = String
case class Water(temperature: Int)
type Milk = String
type FrothedMilk = String
type Espresso = String
type Cappuccino = String
// dummy implementations of the individual steps:
def grind(beans: CoffeeBeans): GroundCoffee = s"ground coffee of $beans"
def heatWater(water: Water): Water ` water.copy(temperature ` 85)
def frothMilk(milk: Milk): FrothedMilk = s"frothed $milk"
def brew(coffee: GroundCoffee, heatedWater: Water): Espresso = "espresso"
def combine(espresso: Espresso, frothedMilk: FrothedMilk): Cappuccino = "cappuccino"
// some exceptions for things that might go wrong in the individual steps
// (we'll need some of them later, use the others when experimenting with the code):
case class GrindingException(msg: String) extends Exception(msg)
case class FrothingException(msg: String) extends Exception(msg)
case class WaterBoilingException(msg: String) extends Exception(msg)
case class BrewingException(msg: String) extends Exception(msg)
// going through these steps sequentially:
def prepareCappuccino(): Try[Cappuccino] = for {
ground <- Try(grind("arabica beans"))
water <- Try(heatWater(Water(25)))
espresso <- Try(brew(ground, water))
foam <- Try(frothMilk("milk"))
} yield combine(espresso, foam)
這樣做有幾個(gè)優(yōu)點(diǎn):可以很輕易的弄清楚事情的步驟,一目了然,而且不會(huì)混淆。(畢竟沒有上下文切換)不好的一面是,大部分時(shí)間,你的大腦和身體都處于等待的狀態(tài):在等待研磨咖啡豆時(shí),你完全不能做任何事情,只有當(dāng)這一步完成后,你才能開始燒水。這顯然是在浪費(fèi)時(shí)間,所以你可能想一次開始多個(gè)步驟,讓它們同時(shí)執(zhí)行,一旦水燒開,咖啡豆也磨好了,你可以制做咖啡了,這期間,打奶泡也可以開始了。
這和編寫軟件沒什么不同。一個(gè) Web 服務(wù)器可以用來處理和響應(yīng)請(qǐng)求的線程只有那么多,不能因?yàn)橐却龜?shù)據(jù)庫查詢或其他 HTTP 服務(wù)調(diào)用的結(jié)果而阻塞了這些可貴的線程。相反,一個(gè)異步編程模型和非阻塞 IO 會(huì)更合適,這樣的話,當(dāng)一個(gè)請(qǐng)求處理在等待數(shù)據(jù)庫查詢結(jié)果時(shí),處理這個(gè)請(qǐng)求的線程也能夠?yàn)槠渌?qǐng)求服務(wù)。
"I heard you like callbacks, so I put a callback in your callback!"
在并發(fā)家族里,你應(yīng)該已經(jīng)知道 nodejs 這個(gè)很酷的家伙,nodejs 完全通過回調(diào)來通信,不幸的是,這很容易導(dǎo)致回調(diào)中包含回調(diào)的回調(diào),這簡(jiǎn)直是一團(tuán)糟,代碼難以閱讀和調(diào)試。
Scala 的 Future 也允許回調(diào),但它提供了更好的選擇,所以你不怎么需要它。
"I know Futures, and they are completely useless!"
也許你知道些其他的 Future 實(shí)現(xiàn),最引人注目的是 Java 提供的那個(gè)。但是對(duì)于 Java 的 Future,你只能去查看它是否已經(jīng)完成,或者阻塞線程直到其結(jié)束。簡(jiǎn)而言之,Java 的 Future 幾乎沒有用,而且用起來絕對(duì)不會(huì)讓人開心。
如果你認(rèn)為 Scala 的 Future 也是這樣,那大錯(cuò)特錯(cuò)了!
scala.concurrent 包里的 Future[T]
是一個(gè)容器類型,代表一種返回值類型為 T
的計(jì)算。計(jì)算可能會(huì)出錯(cuò),也可能會(huì)超時(shí);從而,當(dāng)一個(gè) future 完成時(shí),它可能會(huì)包含異常,而不是你期望的那個(gè)值。
Future 只能寫一次: 當(dāng)一個(gè) future 完成后,它就不能再被改變了。同時(shí),F(xiàn)uture 只提供了讀取計(jì)算值的接口,寫入計(jì)算值的任務(wù)交給了 Promise,這樣,API 層面上會(huì)有一個(gè)清晰的界限。這篇文章里,我們主要關(guān)注前者,下一章會(huì)介紹 Promise 的使用。
Future 有多種使用方式,我將通過重寫 “卡布奇諾” 這個(gè)例子來說明。
首先,所有可以并行執(zhí)行的函數(shù),應(yīng)該返回一個(gè) Future:
import scala.concurrent.future
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.util.Random
def grind(beans: CoffeeBeans): Future[GroundCoffee] = Future {
println("start grinding...")
Thread.sleep(Random.nextInt(2000))
if (beans == "baked beans") throw GrindingException("are you joking?")
println("finished grinding...")
s"ground coffee of $beans"
}
def heatWater(water: Water): Future[Water] = Future {
println("heating the water now")
Thread.sleep(Random.nextInt(2000))
println("hot, it's hot!")
water.copy(temperature = 85)
}
def frothMilk(milk: Milk): Future[FrothedMilk] = Future {
println("milk frothing system engaged!")
Thread.sleep(Random.nextInt(2000))
println("shutting down milk frothing system")
s"frothed $milk"
}
def brew(coffee: GroundCoffee, heatedWater: Water): Future[Espresso] = Future {
println("happy brewing :)")
Thread.sleep(Random.nextInt(2000))
println("it's brewed!")
"espresso"
}
上面的代碼有幾處需要解釋。
首先是 Future 伴生對(duì)象里的 apply
方法需要兩個(gè)參數(shù):
object Future {
def apply[T](body: => T)(implicit execctx: ExecutionContext): Future[T]
}
要異步執(zhí)行的計(jì)算通過傳名參數(shù) body
傳入。第二個(gè)參數(shù)是一個(gè)隱式參數(shù),隱式參數(shù)是說,函數(shù)調(diào)用時(shí),如果作用域中存在一個(gè)匹配的隱式值,就無需顯示指定這個(gè)參數(shù)。ExecutionContext
可以執(zhí)行一個(gè) Future,可以把它看作是一個(gè)線程池,是絕大部分 Future API 的隱式參數(shù)。
import scala.concurrent.ExecutionContext.Implicits.global
語句引入了一個(gè)全局的執(zhí)行上下文,確保了隱式值的存在。這時(shí)候,只需要一個(gè)單元素列表,可以用大括號(hào)來代替小括號(hào)。調(diào)用 future
方法時(shí),經(jīng)常使用這種形式,使得它看起來像是一種語言特性,而不是一個(gè)普通方法的調(diào)用。
這個(gè)例子沒有大量計(jì)算,所以用隨機(jī)休眠來模擬以說明問題,而且,為了更清晰的說明并發(fā)代碼的執(zhí)行順序,還在“計(jì)算”之前和之后打印了些東西。
計(jì)算會(huì)在 Future 創(chuàng)建后的某個(gè)不確定時(shí)間點(diǎn)上由 ExecutionContext
給其分配的某個(gè)線程中執(zhí)行。
對(duì)于一些簡(jiǎn)單的問題,使用回調(diào)就能很好解決。Future 的回調(diào)是偏函數(shù),你可以把回調(diào)傳遞給 Future 的 onSuccess
方法,如果這個(gè) Future 成功完成,這個(gè)回調(diào)就會(huì)執(zhí)行,并把 Future 的返回值作為參數(shù)輸入:
grind("arabica beans").onSuccess { case ground =>
println("okay, got my ground coffee")
}
類似的,也可以在 onFailure
上注冊(cè)回調(diào),只不過它是在 Future 失敗時(shí)調(diào)用,其輸入是一個(gè) Throwable
。
通常的做法是將兩個(gè)回調(diào)結(jié)合在一起以更好的處理 Future:在 onComplete
方法上注冊(cè)回調(diào),回調(diào)的輸入是一個(gè) Try。
import scala.util.{Success, Failure}
grind("baked beans").onComplete {
case Success(ground) => println(s"got my $ground")
case Failure(ex) => println("This grinder needs a replacement, seriously!")
}
傳遞給 grind
的是 “baked beans”,因此 grind
方法會(huì)產(chǎn)生異常,進(jìn)而導(dǎo)致 Future 中的計(jì)算失敗。
當(dāng)嵌套使用 Future 時(shí),回調(diào)就變得比較煩人。不過,你也沒必要這么做,因?yàn)?Future 是可組合的,這是它真正發(fā)揮威力的時(shí)候!
你一定已經(jīng)注意到,之前討論過的所有容器類型都可以進(jìn)行 map
、 flatMap
操作,也可以用在 for 語句中。作為一種容器類型,F(xiàn)uture 支持這些操作也不足為奇!
真正的問題是,在還沒有完成的計(jì)算上執(zhí)行這些操作意味這什么,如何去理解它們?
Scala 讓 “時(shí)間旅行” 成為可能!假設(shè)想在水加熱后就去檢查它的溫度,可以通過將 Future[Water]
映射到 Future[Boolean]
來完成這件事情:
val tempreatureOkay: Future[Boolean] = heatWater(Water(25)) map { water =>
println("we're in the future!")
(80 to 85) contains (water.temperature)
}
tempreatureOkay
最終會(huì)包含水溫的結(jié)果。你可以去改變 heatWater
的實(shí)現(xiàn)來讓它拋出異常(比如說,加熱器爆炸了),然后等待 “we're in the future!” 出現(xiàn)在顯示屏上,不過你永遠(yuǎn)等不到。
寫傳遞給 map
的函數(shù)時(shí),你就處在未來(或者說可能的未來)。一旦 Future[Water]
實(shí)例成功完成,這個(gè)函數(shù)就會(huì)執(zhí)行,只不過,該函數(shù)所在的時(shí)間線可能不是你現(xiàn)在所處的這個(gè)。如果 Future[Water
失敗,傳遞給 map
的函數(shù)中的事情永遠(yuǎn)不會(huì)發(fā)生,調(diào)用 map
的結(jié)果將是一個(gè)失敗的 Future[Boolean]
。
如果一個(gè) Future 的計(jì)算依賴于另一個(gè) Future 的結(jié)果,那需要求救于 flatMap
以避免 Future 的嵌套。
假設(shè),測(cè)量水溫的線程需要一些時(shí)間,那你可能想異步的去檢查水溫是否 OK。比如,有一個(gè)函數(shù),接受一個(gè) Water
,并返回 Future[Boolean]
:
def temperatureOkay(water: Water): Future[Boolean] = future {
(80 to 85) contains (water.temperature)
}
使用 flatMap
(而不是 map
)得到一個(gè) Future[Boolean]
,而不是 Future[Future[Boolean]]
:
val nestedFuture: Future[Future[Boolean]] = heatWater(Water(25)) map {
water => temperatureOkay(water)
}
val flatFuture: Future[Boolean] = heatWater(Water(25)) flatMap {
water => temperatureOkay(water)
}
同樣,映射只會(huì)發(fā)生在 Future[Water]
成功完成情況下。
除了調(diào)用 flatMap
,也可以寫成 for 語句。上面的例子可以重寫成:
val acceptable: Future[Boolean] = for {
heatedWater <- heatWater(Water(25))
okay <- temperatureOkay(heatedWater)
} yield okay
如果有多個(gè)可以并行執(zhí)行的計(jì)算,則需要特別注意,要先在 for 語句外面創(chuàng)建好對(duì)應(yīng)的 Futures。
def prepareCappuccinoSequentially(): Future[Cappuccino] =
for {
ground <- grind("arabica beans")
water <- heatWater(Water(25))
foam <- frothMilk("milk")
espresso <- brew(ground, water)
} yield combine(espresso, foam)
這看起來很漂亮,但要知道,for 語句只不過是 flatMap
嵌套調(diào)用的語法糖。這意味著,只有當(dāng) Future[GroundCoffee]
成功完成后, heatWater
才會(huì)創(chuàng)建 Future[Water]
。你可以查看函數(shù)運(yùn)行時(shí)打印出來的東西來驗(yàn)證這個(gè)說法。
因此,要確保在 for 語句之前實(shí)例化所有相互獨(dú)立的 Futures:
def prepareCappuccino(): Future[Cappuccino] = {
val groundCoffee = grind("arabica beans")
val heatedWater = heatWater(Water(20))
val frothedMilk = frothMilk("milk")
for {
ground <- groundCoffee
water <- heatedWater
foam <- frothedMilk
espresso <- brew(ground, water)
} yield combine(espresso, foam)
}
在 for 語句之前,三個(gè) Future 在創(chuàng)建之后就開始各自獨(dú)立的運(yùn)行,顯示屏的輸出是不確定的。唯一能確定的是 “happy brewing” 總是出現(xiàn)在后面,因?yàn)樵撦敵鏊诘暮瘮?shù) brew
是在其他兩個(gè)函數(shù)執(zhí)行完畢后才開始執(zhí)行的。也因?yàn)榇?,可以?for 語句里面直接調(diào)用它,當(dāng)然,前提是前面的 Future 都成功完成。
你可能會(huì)發(fā)現(xiàn) Future[T]
是成功偏向的,允許你使用 map
、flatMap
、filter
等。
但是,有時(shí)候可能處理事情出錯(cuò)的情況。調(diào)用 Future[T]
上的 failed
方法,會(huì)得到一個(gè)失敗偏向的 Future,類型是 Future[Throwable]
。之后就可以映射這個(gè) Future[Throwable]
,在失敗的情況下執(zhí)行 mapping 函數(shù)。
你已經(jīng)見過 Future 了,而且它的前途看起來很光明!因?yàn)樗且粋€(gè)可組合、可函數(shù)式使用的容器類型,這讓我們的工作變得異常舒服。
調(diào)用 future
方法可以輕易將阻塞執(zhí)行的代碼變成并發(fā)執(zhí)行,但是,代碼最好原本就是非阻塞的。為了實(shí)現(xiàn)它,我們還需要 Promise
來完成 Future
,這就是下一章的主題。
更多建議: