流計算也能像數(shù)據(jù)庫那樣保證數(shù)據(jù)不丟失嗎?
為何要保證流計算中的數(shù)據(jù)不丟失?
在實時計算領(lǐng)域,有些應(yīng)用的對系統(tǒng)的可用性要求不那么苛刻,允許在系統(tǒng)異常時丟失一些數(shù)據(jù)。比如,實時推薦系統(tǒng)或業(yè)務(wù)質(zhì)量實時監(jiān)測系統(tǒng)因系統(tǒng)原因或應(yīng)用程序故障而不能提供實時服務(wù),在故障過程中流數(shù)據(jù)未進(jìn)行處理就丟失了。這類場景對少量數(shù)據(jù)丟失是可容忍的,一是因為它們處理不是關(guān)鍵交易(不直接影響賬戶、交易、訂單等核心數(shù)據(jù));二是因為系統(tǒng)恢復(fù)后再處理“過時”數(shù)據(jù)意義已經(jīng)不大,如客戶已經(jīng)離開特定的商圈,系統(tǒng)還拿“過時”的位置信息向他推薦商品會讓客戶體驗很差。
然而,很多實時處理系統(tǒng)對數(shù)據(jù)的丟失是不容忍的,要求對所有數(shù)據(jù)至少處理一次或準(zhǔn)確地處理一次。如CDR話單實時處理,實時計費等應(yīng)用要求所有數(shù)據(jù)至少且至多處理一次,因為一旦出錯就會造成損失。
如何做到“至少處理一次”?
傳統(tǒng)的數(shù)據(jù)庫依賴日志、事務(wù)控制、Checkpoint等技術(shù)實現(xiàn)數(shù)據(jù)的ACID。而Streams流計算采用了“一致區(qū)域(Consistent Region)”機(jī)制實現(xiàn)流數(shù)據(jù)“至少處理一次”。
什么是一致區(qū)域
在Streams流應(yīng)用程序中定義的sub-graph(或叫Region),一旦有tuple流入該區(qū)域就要保證該tuple經(jīng)過了所有Operator的至少一次處理,那么該區(qū)域就是“一致區(qū)域”。
什么是一致狀態(tài)
在一致區(qū)域里的所有流的所有元組tuples全部經(jīng)過一致區(qū)域里的operators完全處理的當(dāng)前狀態(tài)。
下圖中的左邊是由3個Operator組成的應(yīng)用,tuple經(jīng)過op1處理后分布輸出到op2和op3,右邊表示tuple提交的時間軸。根據(jù)“一致狀態(tài)”的定義,第一條虛線所捕獲的狀態(tài)是“一致的”,即當(dāng)前時刻的tuples都在3個Operators處理并提交。第二條和第三條虛線所捕獲的狀態(tài)是不一致的,因為在捕獲每個Operator狀態(tài)那個時間點,同一個tuple并未在一致區(qū)域內(nèi)的完全處理完成。
一致區(qū)域的特點
- 一致區(qū)域的第一個operator必須能重發(fā)數(shù)據(jù)。
- 一致區(qū)域保證每一條數(shù)據(jù)至少被處理一次(at-least-once)。
- 一致區(qū)域里的所有operators要滿足如下條件之一,則保證每個元組僅被處理一次(Exactly Once):
- 能夠?qū)⒆陨淼臓顟B(tài)和交互的外部系統(tǒng)的狀態(tài)重置為最后一次檢查點一致狀態(tài)
- 能夠檢查重復(fù)的元組tuples,并且不予處理
- 不管處理幾次,都和第一次處理結(jié)果一樣
- 支持多線程的Operator。
- 允許用戶自定義一致區(qū)域的開始和結(jié)束(默認(rèn)沒有輸出端口的Operator是一致區(qū)域的結(jié)束)。
實現(xiàn)原理
實現(xiàn)一致區(qū)域關(guān)鍵是定時建立該區(qū)域的一致狀態(tài)。從一致區(qū)域內(nèi)的operator角度來看,建立一致狀態(tài)包含2個階段:(1)排干和(2)檢查點。當(dāng)所有Operators完成這兩個階段,則一致區(qū)域的一致狀態(tài)被成功建立。
排干/Drain - 在這個階段,Operator排干它的內(nèi)部狀態(tài)和輸出流。這意味著Operator已經(jīng)完成處理之前收到的任何元組,排出其內(nèi)部狀態(tài)并提交任何Pending的元組到其內(nèi)部緩沖區(qū)輸出流。用通俗的話講,這個Operator完成排干后,對之前收到的任何元組來說它已經(jīng)完全解放了。下游的Operator在排干完成之前,會完全處理前面Operator發(fā)送過來的元組,直到確定一致狀態(tài)。如果一個具有輸入流的Operator在一致區(qū)域,它的排干階段一定是發(fā)生在上游Operators排干完成之后和處理完所有上游排干時發(fā)過來的所有元組之后。
檢查點/checkpointing - 在這個階段,Operater序列化它的狀態(tài)到檢查點后端(后端可以是Redis或文件系統(tǒng)),這一階段總是發(fā)生排干階段之后。Streams runtime自動管理Operator的檢查點,這意味著runtime維護(hù)多個檢查點版本和自動刪除無需用于失敗恢復(fù)的檢查點。
恢復(fù)一致狀態(tài)
一旦發(fā)生異常,Streams runtime自動將一致區(qū)域恢復(fù)到最新的一致狀態(tài)。任何一個Operator的狀態(tài)重置一定是發(fā)生在其上游Operators已完成狀態(tài)重置之后。當(dāng)一致區(qū)域內(nèi)的全部Operator完成重置,這時就恢復(fù)到了一致狀態(tài)。之后就可以重放元組(最后確定一致狀態(tài)之后的的元組),這點類似數(shù)據(jù)庫的事務(wù)在rollback之后重新從第一條語句開始執(zhí)行事務(wù)。
恢復(fù)一致狀態(tài)的操作在這幾個場景下發(fā)生:1)PE crash。2)主機(jī)crash。3)PE之間的連接中斷。4)Operator發(fā)現(xiàn)并報告錯誤。5)通過人工管理介入。
關(guān)鍵技術(shù) - Chandy-Lamport算法
Chandy-Lamport算法是一個經(jīng)典的分布式快照記錄算法。
為達(dá)到一致性狀態(tài),并且減少建立一致狀態(tài)而對性能帶來的影響,Streams使用Chandy-Lamport算法的變體來建立一個分布式的快照。該算法會考慮流連接的渠道狀態(tài)channel state和運算符的進(jìn)程狀態(tài)process state永久性存儲。Streams對一致性有著嚴(yán)格的定義:每一個提交的tuple必須被處理。因此,SPL系統(tǒng)并不需要保存渠道狀態(tài)。這是因為它有效地強(qiáng)制任何渠道狀態(tài)在進(jìn)程狀態(tài)中得到映射。
如何使用?
Streams通過操作符Operator和注釋annotations得到增強(qiáng),這些操作程序和注釋允許定義在流處理期間不會丟失元組的區(qū)域,確保元組在一致區(qū)域至少被處理一次。
可在允許的操作符上使用 @consistent 注釋定義一致區(qū)域的開頭。Streams自動確定一致區(qū)域的作用域,但是您可使用 @autonomous 注釋更改該區(qū)域的結(jié)束操作符。所定義的一致區(qū)域?qū)⒍ㄆ诮⒁恢聽顟B(tài)。您的應(yīng)用程序還必須具有新的Job Control Plane操作符,該操作符將協(xié)調(diào)一致區(qū)域的排干 (draining) 和重置。具體語法如下:
@consistent( //一致區(qū)域定義
trigger={periodic|operatorDriven}, //如何觸發(fā)開始建立一個一致狀態(tài)
period=3.0, //周期
drainTimeout=30.0, //排干超時
resetTimeout=30.0, //重置超時
maxConsecutiveResetAttempts=5) //重置重試次數(shù)
更多大數(shù)據(jù)與分析相關(guān)行業(yè)資訊、解決方案、案例、教程等請點擊查看>>>
詳情請咨詢在線客服!
客服熱線:023-66090381