更新時間:2017-09-01 來源:黑馬程序員云計算大數(shù)據(jù)培訓(xùn)學(xué)院 瀏覽量:
1,盡量避免使用會發(fā)生shuffle的算子
2,在數(shù)據(jù)源頭,對我們的數(shù)據(jù)提前進行聚合
七,優(yōu)化數(shù)據(jù)結(jié)構(gòu)
1、避免對象套對象
2、減少數(shù)據(jù)集合的使用,盡量使用數(shù)組
3、因為對象需要序列化,能不用對象就盡量不用,建議使用字符串拼接
4、可以使用第三方提供的占用內(nèi)存小,序列化速度快的數(shù)據(jù)結(jié)構(gòu)類庫,例如 fastUtil類庫
八,避免數(shù)據(jù)傾斜
1,數(shù)據(jù)傾斜的概念
有的時候,我們可能會遇到大數(shù)據(jù)計算中一個最棘手的問題——數(shù)據(jù)傾斜,此時spark作業(yè)性能會比期望查很多。數(shù)據(jù)傾斜調(diào)優(yōu),就是使用各種技術(shù)方案解決不同類型的數(shù)據(jù)傾斜問題,一保證spark作業(yè)的性能
2,數(shù)據(jù)傾斜發(fā)生的現(xiàn)象:
A、絕大多數(shù)task執(zhí)行的都非??欤珎€別task執(zhí)行極慢。比如,總共有1000個task,997個task都在一分鐘內(nèi)執(zhí)行完了,但是剩余兩三個task卻要一兩個小時,這種情況很常見
B、原本能夠正常執(zhí)行的spark作業(yè),某天突然爆出 OOM(內(nèi)存溢出)異常,官場異常棧,是我們寫的業(yè)務(wù)代碼造成的,這種情況比較少見
3,數(shù)據(jù)傾斜發(fā)生的原理
數(shù)據(jù)傾斜的原理很簡單:在進行shuffle的時候,必須將各個節(jié)點上相同的key拉取到某個節(jié)點上的一個task來進行處理,比如按照key進行聚合或者join等操作,此時,如果某個key對應(yīng)的數(shù)據(jù)量特別大的話,就會發(fā)生數(shù)據(jù)傾斜,比如大部分key對應(yīng)10 條數(shù)據(jù),但是個別的key卻對應(yīng)了100萬調(diào)數(shù)據(jù),那么大部分task可能就只會分配到10-條數(shù)據(jù),然后1秒就運行完了,但是個別task可能分配到了100萬數(shù)據(jù),要運行一兩個小時。因此,整個spark作業(yè)的運行進度是由運行時間最長的那個task決定的
4,然后定位導(dǎo)致數(shù)據(jù)傾斜的代碼
數(shù)據(jù)傾斜只會發(fā)生在shuffle過程中,這里給大家羅列了一些常用的并且可能會觸發(fā)shuffle操作的算子:groupByKey, reduceByKey, aggregateByKey, join, cogroup, repartition等。出現(xiàn)數(shù)據(jù)傾斜時,可能就是你的代碼中使用了這些算子的某一個導(dǎo)致的
查看數(shù)據(jù)傾斜的數(shù)據(jù):eventLogRDD.sample(false,0.1).countByKey().foreach(println(_))
5,數(shù)據(jù)傾斜的解決方案
解決方案一:使用HIve ETL預(yù)處理數(shù)據(jù)
方案使用場景:導(dǎo)致數(shù)據(jù)傾斜的是Hive表,如果該Hive表中的數(shù)據(jù)本身很不均勻(比如某個key對應(yīng)了100萬數(shù)據(jù),其他的key才對應(yīng)了10條數(shù)據(jù)),而且業(yè)務(wù)場景需要頻繁使用spark對Hive表執(zhí)行某個分析操作,那么比較適合使用這種方案
方案實現(xiàn)思路:此時可以評估一下,是否可以通過Hive來進行數(shù)據(jù)預(yù)處理(即通過Hive ETL預(yù)先對數(shù)據(jù)按照key進行聚合,或者是預(yù)先和其他表進行join),然后在spark作業(yè)中針對的數(shù)據(jù)源就不是原來的Hive表了,而是預(yù)處理后Hive表。此時由于數(shù)據(jù)已經(jīng)預(yù)先進行了聚合或join操作,那么spark作業(yè)中也就不需要原先的shuffle類算子執(zhí)行的這類操作了
方案實現(xiàn)原理:這種方案從根本上解決了數(shù)據(jù)傾斜,因為徹底避免了在spark中執(zhí)行shuffle類算子,那么就不會有數(shù)據(jù)傾斜的問題了,但是這里也要提醒一下大家,這種方式屬于治標(biāo)不治本,因為畢竟數(shù)據(jù)本身就存在不均勻的問題,所以Hive ETL中進行g(shù)roupBy 或者join等shuffle操作時,還是會出現(xiàn)數(shù)據(jù)傾斜,導(dǎo)致Hive ETL的速度很慢,我們只是把數(shù)據(jù)傾斜的發(fā)生提前到了Hive ETL中,避免了spark程序發(fā)生數(shù)據(jù)傾斜而已
方案優(yōu)點:實現(xiàn)起來簡單便捷,效果還非常好,完全規(guī)避了數(shù)據(jù)傾斜,spark作業(yè)的性能會大幅提升
方案缺點:治標(biāo)不治本,Hive ETL 中還是會發(fā)生數(shù)據(jù)傾斜
解決方案二:過濾少數(shù)導(dǎo)致傾斜的key
方案使用的場景:如果發(fā)現(xiàn)導(dǎo)致傾斜的key就少數(shù)幾個,而且對計算本身的影響并不是很大的話,那么很適合使用這種方案,比如99% 的key就對應(yīng)10條數(shù)據(jù),但是只有一個key對應(yīng)了100萬數(shù)據(jù),從而導(dǎo)致了數(shù)據(jù)傾斜
方案實現(xiàn)思路:如果我們判斷那少數(shù)的幾個數(shù)據(jù)量特別多的key,對作業(yè)的執(zhí)行和就算結(jié)果不是特別重要的話,那么干脆就直接過濾掉那少數(shù)的幾個key,比如,在spark SQL中可以使用where 子句過濾掉這些key或者在sparkRDD執(zhí)行filter算子過濾掉這些key,如果每次作業(yè)執(zhí)行時,動態(tài)判定那些key的數(shù)據(jù)量最多然后再進行過濾,那么可以直接使用sample算子對RDD進行采樣,然后計算出每個key的數(shù)量,取數(shù)據(jù)量最多的key過濾掉即可
方案實現(xiàn)原理:將導(dǎo)致數(shù)據(jù)傾斜的key過濾掉之后,這些key就不參與計算了,自然不可能產(chǎn)生數(shù)據(jù)傾斜
方案優(yōu)點:實現(xiàn)簡單,而且效果也很好,可以完全規(guī)避掉數(shù)據(jù)傾斜。
方案缺點:使用場景不多,大多數(shù)情況下,導(dǎo)致傾斜的key還是很多的,并不是只有少數(shù)幾個
解決方案三:提高shuffle操作的并行度
方案使用場景:如果我們必須要對數(shù)據(jù)傾斜迎難而上,那么就建議優(yōu)先使用這種方案,因為這是處理數(shù)據(jù)傾斜最簡單的一種方案
方案實現(xiàn)思路:在對RDD執(zhí)行shuffle算子時,給shuffle算子傳入一個參數(shù),比如:reduceByKey(+,1000),該參數(shù)就設(shè)置了這個shuffle算子執(zhí)行時shuffle read task的數(shù)量,對于sparkSQL中的shuffle類語句,比如GroupByKeyDemo,join,等,需要設(shè)置一個參數(shù),即spark.sql.shuffle.partitions,該參數(shù)代表了shuffle read task 的并行度,該值默認(rèn)是200,對于很多場景來說都有點過小
方案實現(xiàn)原理:增加shuffle read task的數(shù)量,可以讓原本分配給一個task的多個key分配給多個task,從而讓每個task處理比原來更少的數(shù)據(jù)。舉例來說,如果有5個key,每個key對應(yīng)10條數(shù)據(jù),這5個key都是分配給一個task的。那么這個task就要處理50條數(shù)據(jù),而增加了shuffle read task 以后。每個task就分配到一個key,即每個task就處理10條數(shù)據(jù),那么自然每個task的執(zhí)行時間都會變短了。
方案優(yōu)點:實現(xiàn)起來比較簡單,可以有效緩解和減輕數(shù)據(jù)傾斜的影響
方案缺點:只是緩解了數(shù)據(jù)傾斜而已,沒有徹底根除問題,根據(jù)時間來看,其效果有限
解決方案四:兩階段聚合(局部集合+全局聚合)
方案使用場景:對RDD執(zhí)行reduceByKey等聚合類shuffle算子或者在sparkSQL中使用GroupBy 語句進行分組聚合時,比較適用這種場景
方案實現(xiàn)思路:這個方案的核心實現(xiàn)思路就是進行兩階段聚合。第一次是局部聚合,先給每個key都打上一個隨機數(shù),比如10以內(nèi)的隨機數(shù),此時原先的key就變得不一樣了,比如(hello,1),(hello,1),(hello,1),(hello,1)… 就會變成(1_hello,1),(1_hello,1),(2_hello,1),(2_hello,1)… 接著對打上隨機數(shù)后的數(shù)據(jù),執(zhí)行reduceByKey等聚合操作,進行局部聚合,那么聚合結(jié)果,就會變成(1_hello,2),(2_hello,2)… 然后將各個key的前綴給去掉,就會變成(hello,2),(hello,2)… ,再次進行全局聚合操作,就可以得到最終結(jié)果了,比如(hello,4)
方案實現(xiàn)原理:將原本相同的key通過附加隨機前綴的方式,變成多個不聽的key,就可以讓原本被一個task處理的數(shù)據(jù)分散到多個task上去做局部聚合,進而解決單個task處理數(shù)據(jù)過多的問題,接著取出掉隨機前綴,再次進行全局聚合,就可以得到最終的結(jié)果
方案優(yōu)點:對于聚合類的shuffle操作導(dǎo)致的數(shù)據(jù)傾斜,效果是非常不錯的,通常都可以解決掉數(shù)據(jù)傾斜,或者至少是大幅度緩解數(shù)據(jù)傾斜,將spark作業(yè)的性能提升數(shù)倍以上
方案缺點:僅僅適用于聚合類的shuffle操作,適用范圍相對較窄。如果是join類的shuffle操作,還得用其他的解決方案
九,開啟推測機制
推測機制后,如果集群中,某一臺機器的幾個task特別慢,推測機制會將任務(wù)分配到其他機器執(zhí)行,最后spark會選取最快的作為最終結(jié)果
在spark-default.conf 中添加:spark.speculation true
推測機制與一下幾個參數(shù)有關(guān):
1、spark.speculation.interval 100: 檢測周期,單位毫秒
2、Spark.speculation.quantile 0.75: 完成task的百分比時啟動推測
3、Spark.speculation.multiplier 1.5: 比其他的慢多少倍時啟動推測
十,算子使用技巧
1、非必要的情況下,不要使用collect
2、盡量使用,例如:foreachPartition 就不要使用foreach,在創(chuàng)建連接對象(數(shù)據(jù)連接對象等),盡量在分區(qū)上創(chuàng)建
云計算大數(shù)據(jù)培訓(xùn)之Spark調(diào)優(yōu)(3)
2017-09-01云計算大數(shù)據(jù)培訓(xùn)之Spark調(diào)優(yōu)(2)
2017-09-01云計算大數(shù)據(jù)培訓(xùn)之Spark調(diào)優(yōu)(1)
2017-09-01云計算大數(shù)據(jù)培訓(xùn)之Hadoop組件:zookeeper(3)
2017-09-01云計算大數(shù)據(jù)培訓(xùn)之Hadoop組件:zookeeper(2)
2017-09-01云計算大數(shù)據(jù)培訓(xùn)之Hadoop組件:zookeeper(1)
2017-09-01