沈雅
Apache Flink持續保持高速發展,是Apache最活躍的社區之一。Flink 1.16共有240多個Contributor熱情參與,共完成了19個FLIP和1 100多個issue,給社區帶來非常多振奮人心的功能。
Flink已經是流計算領域的領跑者,流批一體的概念逐漸得到大家的認可,并在越來越多的公司成功落地。之前的流批一體更強調統一的API和統一的計算框架。2022年,在此基礎上,Flink推出了Streaming Warehouse,進一步升級了流批一體的概念:真正完成了流批一體的計算和流批一體的存儲的融合,從而實現流批一體的實時化分析。
在1.16版本里,Flink社區對流、批都完成了眾多改進:在批處理方面,完成了易用性、穩定性、性能提升全方位的改進,1.16是Fink批處理的里程碑式的版本,是走向成熟的重要一步。
易用性,引入SQL Gateway并完全兼容HiveServer2,用戶可以非常方便的提交Flink SQL作業和Hive SQL作業,同時也很容易連接到原有的Hive生態。
功能:Flink SQL用戶支持通過Join Hint指定Join策略,避免不合理的執行計劃;Hive SQL的兼容性已經達到94 %,用戶可以以極低的成本完成Hive到Flink的遷移。
穩定性,通過預測執行減少作業長尾以提高作業整體運行穩定性;支持自適應Hash Join,通過失敗回滾機制避免作業失敗。
性能提升,對多分區表進行動態分區裁剪以提高處理效率,TPC-DS在10 TB規模數據集下性能提升了30 %;支持混合Shuffle模式,提高資源使用率和處理性能。
在流處理方面,也完成了很多重大改進:
Changelog State Backend可以為用戶提供秒級甚至毫秒級Checkpoint,從而大幅提升容錯體驗,同時為事務性Sink作業提供更小的端到端延遲體驗。
維表關聯在流處理中被廣泛被使用,引入了通用的緩存機制加快維表查詢速度,引入了可配置的異步模式提升維表查詢吞吐,引入可重試查詢機制解決維表延遲更新問題。這些功能都非常實用,解決了用戶經常抱怨的痛點,支持了更豐富的場景。
從Flink SQL誕生第一天就存在一些非確定性操作,可能導致用戶作業出現錯誤結果或作業運行異常,這給用戶帶來了極大的困擾。
隨著流批一體的進一步完善和Flink Table Store的不斷迭代,Flink社區正一步一步推動Streaming Warehouse從概念變為現實并走向成熟。
流式數倉(Streaming Warehouse)更準確地說,其實是make data warehouse streaming,就是讓整個數倉所有分層的數據全部實時地流動起來,從而實現一個具備端到端實時性的純流服務(Streaming Service),并且用一套統一API和計算框架來處理和分析所有流動中的數據。
得益于在流處理的長期投資,流處理已經成為流計算領域的領導者,在批處理上也投入了更多的精力,使其成為一個優秀的批處理引擎。流批處理統一的整體體驗也將會更加順暢。
SQL Gateway
從各個渠道反饋中了解到,SQL Gateway一直是用戶非常期待的功能,尤其是對批用戶。1.16里,該功能終于完成。SQL Gateway是對SQL Client的擴展和增強,支持多租戶和插件式API協議(Endpoint),解決了SQL Client只能服務單用戶并且不能對接外部服務或組件的問題。當前SQL Gateway已支持REST API和HiveServer2協議,用戶可以通過cURL、Postman以及各種編程語言的HTTP客戶端鏈接到SQL Gateway提交流作業、批作業,甚至OLAP作業。
為了降低從Hive到Flink的遷移成本,這個版本里引入了HiveServer2協議并繼續改進Hive語法的兼容性。
HiveServer2協議允許用戶使用Hive JDBC/Beeline和SQL Gateway進行交互,Hive生態(DBeaver,Apache Superset,Apache DolphinScheduler,and Apache Zeppelin)也因此很容易遷移到Flink。當用戶使用HiveServer2協議連接SQLGateway,SQL Gateway會自動注冊Hive Catalog,自動切換到Hive方言,自動使用批處理模式提交作業,用戶可以得到和直接使用HiveServer2一樣的體驗。
Hive語法已經是大數據處理的事實標準,Flink完善了對Hive語法的兼容,增加了對Hive若干生產中常用語法的支持。通過對Hive語法的兼容,可以幫助用戶將已有的Hive SQL任務遷移到Flink,并且方便熟悉Hive語法的用戶使用Hive語法編寫SQL以查詢注冊進Flink中的表。到目前為止,基于Hive qtest測試集(包含12 000個SQL案例),Hive 2.3版本的查詢兼容性已達到94.1 %,如果排除ACID的查詢語句,則已達到97.3 %。
Join Hint
Hint一直是業界用來干預執行計劃以改善優化器缺點的通用解決方案。Join作為批作業中最廣泛使用的算子,Flink支持多種Join策略。統計信息缺失或優化器的代價模型不完善都會導致選出錯誤Join策略,從而導致作業運行慢甚至有運行失敗的風險。用戶通過指定Join Hint,讓優化器盡可能選擇用戶指定的Join策略,從而避免優化器的各種不足,以確保批作業的生產可用性。
對于批作業而言,數據傾斜是非常常見的,而此時使用Hash Join可能運行失敗,這是非常糟糕的體驗。為了解決該問題,引入了自適應的Hash Join:Join算子運行時一旦Hash Join運行失敗,可以自動回退到Sort Merge Join,并且是Task粒度。通過該機制可確保Hash Join算子始終成功,從而提高了作業的穩定性。
為了解決問題機器導致批作業處理慢的問題,Flink 1.16引入了預測執行。問題機器是指存在硬件問題、突發I/O繁忙或CPU負載高等問題的機器,這些問題可能會使得運行在該機器上的任務比其他機器上的任務要慢得多,從而影響批處理作業的整體執行時間。
當啟用預測執行時,Flink將持續檢測慢任務。一旦檢測到慢任務,該任務所在的機器將被識別為問題機器,并通過黑名單機制被加黑。調度器將為慢任務創建新的執行實例并將它們部署到未被加黑的節點,同時現有執行實例也將繼續運行。新的執行實例和老的執行實例將處理相同的輸入數據并產出相同的結果數據。一旦任何執行實例率先完成,它將被視為該任務的唯一完成執行實例,并且該任務的其余執行實例都將被取消。
大多數現有Source都可以使用預測執行。只有當一個Source使用了SourceEvent時,它必須額外實現Supports Handle Execution Attempt Source Event接口以支持預測執行。目前Sink尚不支持預測執行,因此預測執行不會在Sink上發生。
Web UI和REST API也有了改進,以顯示任務的多個執行實例和被加黑的TaskManager。
為批處理引入了一種新的混合Shuffle模式,它結合了Blocking Shuffle和Pipeline Shuffle(主要用于流式處理)的優點。與Blocking Shuffle一樣,它不要求上下游任務同時運行,這允許使用很少的資源執行作業;與Pipeline Shuffle一樣,它不要求上游任務完成后才執行下游任務,這在給定足夠資源情況下減少了作業的整體執行時間。
用戶可以選擇不同的落盤策略,以滿足減少數據落盤或是降低任務重啟代價的不同需求。
注意:該功能為實驗性的,并且默認關閉。
在這個版本中進一步改進了Blocking Shuffle的可用性和性能,包括自適應網絡緩沖區分配、順序IO優化和結果分區重用,允許多個消費者節點重用同一個物理結果分區,以減少磁盤IO和存儲空間。在TPC-DS 10 TB規模的測試中,這些優化可以實現7 %的整體性能提升。此外,還引入了2種壓縮率更高的壓縮算法(LZO和ZSTD)。與默認的LZ4壓縮算法相比,可以進一步減少存儲空間,但要付出一些CPU成本。
對于批作業,生產環境中分區表比非分區表使用更為廣泛。當前Flink已經支持靜態分區裁剪,即在優化階段,優化器將Filter中的Partition相關的過濾條件下推到Source Connector中從而減少不必要的分區讀取。星形模型是數據集市模式中最簡單且使用最廣泛的模式,很多用戶的作業沒法使用靜態分區裁剪,因為分區裁剪信息在執行時才能確定,這就需要動態分區裁剪技術,即運行時根據其他相關表的數據確定分區裁剪信息,從而減少對分區表中無效分區的讀取。通過TPC-DS 10 TB規模數據集的驗證,該功能可提升30 %的性能。
在1.16中Checkpoint、SQL、Connector和其他領域都進行了改進,從而確保Flink在流計算領域繼續領先。
Changelog State Backend旨在令Checkpoint的間隔更短、更加可預測。這個版本在自身易用性上和與其他State Backend兼容性上做了諸多改進,使其達到生產可用。
對于使用Flink構建的云服務應用來說,Rescaling是一種非常頻繁的操作。這個版本使用了RocksDB的區間刪除來優化增量RocksDB State Backend的Rescaling性能。區間刪除被用來避免在Rescaling過程中大量的掃描和單點刪除操作,對有大量的狀態需要刪除的擴并發來說,單個并發上的恢復速度可以提高2~10倍。
這個版本還改善了狀態后臺的監控體驗和可用性。之前,RocksDB的日志位于它自己的DB目錄中,這使得調試RocksDB沒那么容易。這個版本讓RocksDB的日志默認留在Flink的日志目錄中,新增了RocksDB相關的統計指標,以幫助調試DB級別的性能,例如,在DB內的總塊緩存命中/失敗計數。
透支緩沖區(Overdraft Buffers)旨在緩解反壓情況下Subtask被阻塞的概率,可以通過設置taskmanager.network. memory.max-overdraft-buffers-per-gate開啟。
從1.16開始,一個Flink的Subtask可以申請5個(默認)額外的透支緩沖區。透支緩沖區會輕微地增加作業的內存使用量,但可以極大地減少Checkpoint的間隔,特別是在開啟Unaligned Checkpoint情況下。只有當前Subtask被下游Subtasks反壓且當前Subtask需要請求超過1個網絡緩沖區(Network Buffer)才能完成當前的操作時,透支緩沖區才會被使用。
這個版本更新了從Aligned Checkpoint(AC)切換到Unaligned Checkpoint(UC)的時間點。在開啟UC的情況下,如果配置了execution.checkpointing.aligned-checkpointtimeout,在啟動時每個Checkpoint仍然是AC,但當全局Checkpoint持續時間超過aligned-checkpoint-timeout時,如果AC還沒完成,那么Checkpoint將會轉換為UC。
以前,對一個Substask來說,AC到UC的切換需要等所有上游的Barriers到達后才能開始,在反壓嚴重的情況下,在checkpointing-timeout過期之前,下游的Substask可能無法完全地收到所有Barriers,從而導致Checkpoint失敗。
在這個版本中,如果上游Subtask中的Barrier無法在execution.checkpointing.aligned-checkpoint-timeout內發送到下游,Flink會讓上游的Subtask先切換成UC,以把Barrier發送到下游,從而減少反壓情況下Checkpoint超時的概率。
Flink SQL用戶經常抱怨理解流處理的成本太高,其中一個痛點是流處理中的非確定性(而且通常不直觀),它可能會導致錯誤的結果或異常,而這些痛點在Flink SQL的早期就已經存在了。
對于復雜的流作業,現在可以在運行前檢測并解決潛在的正確性問題。如果問題不能完全解決,一個詳細的消息可以提示用戶如何調整SQL,以避免引入非確定性問題。
維表關聯在流處理中被廣泛使用,在1.16中為此加入了多項優化和增強:
支持通用的緩存機制和相關指標,可以加速維表查詢;
通過作業配置或查詢提示支持可配置的異步模式(ALLOW_UNORDERED),在不影響正確性的前提下大大提升查詢吞吐;
可重試的查詢機制讓用戶解決維表數據更新延遲問題有了更多的手段。
為異步I/O引入了內置的重試機制,它對用戶現有代碼是透明的,可以靈活地滿足用戶的重試和異常處理需求。
PyFlink
在Flink 1.15中引入了一種新的執行模式:“線程”模式。在該模式下,用戶自定義的Python函數將通過JNI在JVM中執行,而不是在獨立的Python進程中執行。但是,在Flink 1.15中,僅在Table API和SQL上的Python標量函數的執行上支持了該功能。在新版本中對該功能提供了更全面的支持,在Python DataStream API中以及在Table API和SQL的Python表值函數中,也支持了該功能。
除此之外,還補全Python API所缺失的最后幾處功能。在這個版本中,對Python DataStream API提供了更全面的支持,支持了旁路輸出、Broadcast State等功能,并完善了對于窗口功能的支持。在Python DataStream API中,添加了對于更多的Connector以及Format的支持,例如添加了對于Elasticsearch,Kinesis,Pulsar,Hybrid Source等Connector的支持以及對于Orc,Parquet等Format的支持。有了這些功能之后,Python API已經基本對齊了Java和Scala API中絕大部分的重要功能,用戶已經可以使用Python語言完成大多數類型Flink作業的開發。
支持通過DataStream#cache緩存Transformation的執行結果。緩存的中間結果在首次計算中間結果時才生成,以便以后的作業可以重用該結果。如果緩存丟失,原始的Transformation將會被重新計算以得到結果。目前該功能只在批處理模式下支持。這個功能對于Python中的ML和交互式編程非常有用。
在這個版本中加強了查看已完成作業的信息的體驗。
JobManager / HistoryServer WebUI提供了詳細的執行時間指標,包括任務在每個執行狀態下的耗時,以及在運行過程中繁忙/空閑/反壓總時間。
Flink現在支持Protocol Buffers (Protobuf)格式,這允許直接在Table API或SQL應用程序中使用這種格式。
1.15中實現了異步Sink,允許用戶輕松實現自定義異步 Sink,新版本里此進行擴展以支持可配置的RateLimiting Strategy。這意味著Sink的實現者現在可以自定義其異步Sink在請求失敗時的行為方式,具體行為取決于特定的Sink。如果沒有指定RateLimitingStrategy,它將默認使用AIMDScalingStrategy。