[Android] Coroutines Hot Flow:關於冷熱的一切~尤其是熱的那些(1/3)

Connie Lin
9 min readMay 12, 2022

Coroutines Flow 可異步發送多筆資料,將源源不絕的資料以「流」的形式提供給訂閱對象作使用。同樣是異步處理的 suspend function,只能 return 單一個回傳值。因為 Flow 具有異步處理、持續更新的特性,還有便於資料處理的強大 opertator,使用 Flow 可以讓資料的處理與傳遞上更加順暢。

你知道嗎? Flow 還有分成冷流、熱流。如標題所示,這個系列文一共有三篇,主要著重在熱流的介紹。這篇作為開場,會先聊一下冷流與熱流的基本差異,如此一來更能掌握什麼情境適用哪一種 Flow 。

Cold flow v.s. hot flow

Flow 有分成冷流(cold flow)與熱流(hot flow)。這兩者的差別在於,cold flow 是由單一對象透過 Flow.collect() 來驅動資料流,並將資料 emit 出來,每一個訂閱者都會從頭驅動資料變化。Hot flow 則是只要被創建出來後,不管有沒有被訂閱都可以持續 emit 資料,也可以提供給多個對象訂閱。相較起來,cold flow 像是一條私人管線,hot flow 則更像是個公共財。

一般常見的 Flow<T> 就是 cold flow,而本篇文章主要要介紹的 hot flow,有 StateFlow<T>SharedFlow<T> 兩種型別。

🌟 Cold flow 像是一條私人管線,因為只有一個人可以從管線中拿到東西,因此需要資料的人都會取得新的 instance,只有這個 flow 本身可以 emit 資料,而 flow { } block 內的事情做完,這條 flow 也會終止。

// 每個 collector 拿到的 flow 都會是不同的 instance
fun getFlow(): Flow<T> = flow {
emit(1)
delay(100)
emit(2)
}
getFlow.collect() // collector A
getFlow.collect() // collector B

🌟 Hot flow 就像一條公有管線,被創建出來之後,會一直保留在記憶體中,只要能取得 reference ,就可以從中訂閱資料,也可以向 Mutable 的 instance emit 資料。不像是 cold flow 事情做完就會結束,有句描述是 “Hot flow never ends”

private val _sharedFlow = MutableSharedFlow<T>() // 可以 emit data
val sharedFlow = _sharedFlow.asSharedFlow() // 開放給別人訂閱
// 訂閱者都會訂閱同一個 SharedFlow
sharedFlow.collect()
// 生產者可以對 MutableSharedFlow emit 資料
_sharedFlow.emit(T)

接下來,就進入這個系列文的重點 — hot flow。可以想像 SharedFlow 是熱流的鼻祖,StateFlow 則屬於 SharedFlow 的延伸,那我們就從祖先開始聊起吧。接下來會以「管線」為例子,先透過具象化的情境來理解 SharedFlow 的基本特性。

SharedFlow

我們可以把 SharedFlow 當成是一條專門運送材料的水管,只要接上這條水管的工人(subscriber),都能夠拿到材料(data)。工人透過 .collect() 接上水管,從水管收集材料,並且在收到材料之後做任何想做的事情。

SharedFlowreplayextraBufferCapacity 以及 onBufferOverflow 等設定,讓資料分享端能夠依需求情境,彈性設定資料分享行為。

  • replay:新訂閱者可以拿到多少舊的資料?
  • extraBufferCapacity:有多少額外容量,來放置未處理的新資料?
  • onBufferOverflow:管線實際容量是 replay + extraBufferCapacity。容量爆掉之後,要如何處理裝不下的資料? 可以把機制設定為 SUSPENDDROP_LATESTDROP_OLDEST

來點例子吧。

#1 SharedFlow 的預設值:講究的工頭

(replay = 0, extraBufferCapacity = 0, onBufferOverflow = BufferOverflow.SUSPEND)

這是 MutableSharedFlow 的預設值。 replay = 0,代表「錯過不再」,新的工人拿不到先前釋出的材料。如果你希望為新工人留下舊材料,可以設定 replay = N ,可以想像是你多放了 N 個儲物桶在管線尾端。

extraBufferCapacity 代表想在「管線的額外空間」擱置多少沒被處理的材料。因為 collect 的行為是異步的,所以會有「無法及時處理的狀況」。預設值 extraBufferCapacity = 0 ,代表管線沒有額外空間,因此流進管線內的材料,都得即時地被工人處理。

儲物桶(replay)加上管線容量(extraBufferCapacity),等於整個管路的總容量。當總容量已滿,沒有額外空間,又有新的材料產生要怎麼辦?就是看 BufferOverflow 的設定了。以預設值 BufferOverflow.SUSPEND 來說,你會是一個講究的工頭,希望讓工人們都能摸過一遍流經水管的材料,所以你會先把新的材料擱置在一旁,直到所有的工人都有空,才將材料放進水管中。實際上,就是會先 suspend emit 下一個 data 的行為。

那其實也不難想像 BufferOverflow.DROP_LATEST 以及 DROP_OLDEST 的用處了。作為工頭,可以選擇擱置或拋棄,如果不希望老鼠屎卡住整個線路,就可以選擇當管線阻塞的時候拋棄材料,讓水管保持暢通。

藉由這個例子,應該能對三個參數有些基本概念了。想當初我看完參數定義後,以為自己懂了,但一思考操弄 replayextraBufferCapacity 的數值以後這條流會如何運作,又立馬茫了。

只好再多給幾個例子,繼續看下去。

#2 在水管末端放一個儲物桶,儲存一份舊材料

(replay = 1, extraBufferCapacity = 0, onBufferOverflow = BufferOverflow.SUSPEND)

與上個例子相同,工頭還是相當龜毛,希望所有工人都摸過材料,所以會擱置沒辦法被處理的新材料(BufferOverflow.SUSPEND)。不過,透過 replay = 1 的設定,工頭在水管的 出口 多放了一個水桶,來裝一份最新的材料,因此初來乍到的新工人不至於閒到沒事做。

不自量力的工人甲來接這根水管後,就去忙別的事情了,因此工人甲處理材料 A 的行為先被 suspended 了。這時即便新的材料 B 已經準備好,但因爲工人甲還沒回過神來(resume),管路總容量已滿,工頭也只能先把材料 B 擱置(suspend)在一旁。

若是工人乙在這個時候來接水管,他可以立即從水桶拿到材料 A(replay cache)。不過接下來,他可要當個薪水小倫,畢竟要等工人甲回過神,講究的工頭才會願意把材料 B 丟出來。

過一會,姍姍來遲的工人甲終於回過神來,立即處理了材料 A。材料被所有工人摸過一輪,工頭終於可以安心放進擱置的材料 B,兩位工人一起領收。同時,材料 B 也可以化作歷史,取代材料 A 乖乖進桶(replay cache)了。

🔥 如果工頭不要這麼講究,可以設定為 BufferOverflow.DROP_OLDEST ,那麼每份新材料都會即時替換水桶內的舊材料,而不會阻塞水管。也許大家工作完一輪,工人甲才回神,此時水桶已經是材料 Z 了呢。

#3 複雜題:拿掉水桶,加長水管,工頭也不再講究

(replay = 0, extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)

現在我們拿掉水桶,但加長水管,拿來存放一份沒被處理完的材料。沒有水桶,代表工頭不會為後來才來的工人提供任何舊材料(replay = 0),不過水管內可以多放一份沒被處理的材料(extraBufferCapacity = 1),工頭也不再強求每個人都要做完工作,會適時放掉舊材料(BufferOverflow.DROP_OLDEST)。

有點複雜,因此我們延續上一個例子。

無能的工人甲先來接水管,工頭放行材料 A 的時候他恰巧沒空,不過因為水管還有一個位置可以放沒被處理完的材料(extraBufferCapacity = 1),材料 A 就先放在水管的額外空間,等待工人甲回神。

這時候工人乙來接水管了,但按規定,他拿不到先前的材料 A(replay = 0),所以什麼事也不會發生。

倘若這時工頭要把材料 B 放進水管,會發生什麼事呢?

因為工人甲還是沒空,而管線的額外空間只能容納一份材料,工頭索性丟掉材料 A (DROP_OLDEST),改放材料 B 在水管中。同時,有空閒的工人乙可以即時領收到材料 B 。

過一陣子,工人甲回神後領走了材料 B。如此一來,材料 B 便也船過水無痕了(replay = 0)。

透過以上三個例子,稍微畫畫圖、消化過後,應該就更能夠了解 SharedFlow 的基本特性,他就是一個可以自行客製化的水管,可以捏成自己想要的樣貌。

而 StateFlow 其實就是繼承了 SharedFlow,再針對特定情境來改良(replay = 1, extra = 0, onBufferOverflow = BufferOverflow.DROP_OLDEST),來做到類似 LiveData 的型態。

這篇文章說比較多故事,下一篇會再實際介紹用法與注意事項。

--

--