小男孩‘自慰网亚洲一区二区,亚洲一级在线播放毛片,亚洲中文字幕av每天更新,黄aⅴ永久免费无码,91成人午夜在线精品,色网站免费在线观看,亚洲欧洲wwwww在线观看

分享

Spark join種類(>3種)及join選擇依據(jù)

 印度阿三17 2021-03-15

浪院長(zhǎng) 浪尖聊大數(shù)據(jù)

hashjoin


join是作為業(yè)務(wù)開(kāi)發(fā)繞不開(kāi)的SQL話題,無(wú)論是傳統(tǒng)的數(shù)據(jù)庫(kù)join,還是大數(shù)據(jù)里的join。
做過(guò)Spark/flink流處理的應(yīng)該都用過(guò)一種流表和維表的join,維表對(duì)于Spark來(lái)說(shuō)可以是driver端獲取后廣播到每個(gè)Executor,然后在executor端執(zhí)行流表task的時(shí)候join,其實(shí)大多數(shù)是個(gè)hashmap,而很多時(shí)候這個(gè)維表比較大會(huì)存儲(chǔ)于redis/hbase。Flink進(jìn)行維表join可以用的方式比較多了,比如直接open方法里從外部加載的靜態(tài)hashmap,這種就無(wú)法更新,因?yàn)镕link不像Spark可以每個(gè)批次或者若干批次加載一次維表。也可以使用LRU 異步IO 外部存儲(chǔ)來(lái)實(shí)現(xiàn)join,這樣就實(shí)現(xiàn)了對(duì)外部更新的感知。甚至也可以使用Flink的廣播功能實(shí)現(xiàn)join操作。
上面所說(shuō)的就是比較常見(jiàn)的hashjoin的簡(jiǎn)單表達(dá),將維表通過(guò)join的條件key構(gòu)建為一個(gè)hashtable,就拿java 8的HashMap來(lái)說(shuō)吧,就是一個(gè)數(shù)組 鏈表(鏈表過(guò)長(zhǎng)會(huì)變?yōu)榧t黑樹(shù)),數(shù)組下標(biāo)就是key,數(shù)組存儲(chǔ)的是value的指針。
Spark join種類(>3種)及join選擇依據(jù)
join的時(shí)候主表通過(guò)join條件構(gòu)建key去,hashmap里查找。

Spark BroadCastHashJoin


翻過(guò)源碼之后你就會(huì)發(fā)現(xiàn),Spark 1.6之前實(shí)現(xiàn)BroadCastHashJoin就是利用的Java的HashMap來(lái)實(shí)現(xiàn)的。大家感興趣可以去Spark 1.6的源碼里搜索BroadCastHashJoin,HashedRelation,探查一下源碼。
具體實(shí)現(xiàn)就是driver端根據(jù)表的統(tǒng)計(jì)信息,當(dāng)發(fā)現(xiàn)一張小表達(dá)到廣播條件的時(shí)候,就會(huì)將小表collect到driver端,然后構(gòu)建一個(gè)HashedRelation,然后廣播。
Spark join種類(>3種)及join選擇依據(jù)
其實(shí),就跟我們?cè)谑褂肧park Streaming的時(shí)候廣播hashmap一樣。
重點(diǎn)強(qiáng)調(diào)里面最大行數(shù)限制和最大bytes限制并不是我們?cè)O(shè)置的自動(dòng)廣播參數(shù)限制,而是內(nèi)部存儲(chǔ)結(jié)構(gòu)的限制。
Spark join種類(>3種)及join選擇依據(jù)
還有在Spark后期版本主要就是使用了TaskMemoryManager而不是HashMap進(jìn)行了背書(shū)。

ShuffledHashJoin


BroadCastHashJoin適合的是大表和小表的join策略,將整個(gè)小表廣播。很多時(shí)候,參與join的表本身都不適合廣播,也不適合放入內(nèi)存,但是按照一定分區(qū)拆開(kāi)后就可以放入內(nèi)存構(gòu)建為HashRelation。這個(gè)就是分治思想了,將兩張表按照相同的hash分區(qū)器及分區(qū)數(shù)進(jìn)行,對(duì)join條件進(jìn)行分區(qū),那么需要join的key就會(huì)落入相同的分區(qū)里,然后就可以利用本地join的策略來(lái)進(jìn)行join了。
Spark join種類(>3種)及join選擇依據(jù)
也即是ShuffledHashJoin有兩個(gè)重要步驟:

  1. join的兩張表有一張是相對(duì)小表,經(jīng)過(guò)拆分后可以實(shí)現(xiàn)本地join。

  2. 相同的分區(qū)器及分區(qū)數(shù),按照joinkey進(jìn)行分區(qū),這樣約束后joinkey范圍就限制在相同的分區(qū)中,不依賴其他分區(qū)完成join。

  3. 對(duì)小表分區(qū)構(gòu)建一個(gè)HashRelation。然后就可以完成本地hashedjoin了,參考ShuffleHashJoinExec代碼,這個(gè)如下圖:
    Spark join種類(>3種)及join選擇依據(jù)

SortMergeJoin


上面兩張情況都是小表本身適合放入內(nèi)存或者中表經(jīng)過(guò)分區(qū)治理后適合放入內(nèi)存,來(lái)完成本地化hashedjoin,小表數(shù)據(jù)放在內(nèi)存中,很奢侈的,所以經(jīng)常會(huì)遇到j(luò)oin,就oom。小表,中表都是依據(jù)內(nèi)存說(shuō)的,你內(nèi)存無(wú)限,那是最好。
那么,大表和大表join怎么辦?這時(shí)候就可以利用SortMergeJoin來(lái)完成。
SortMergeJoin基本過(guò)程如下:

  1. 首先采取相同的分區(qū)器及分區(qū)數(shù)對(duì)兩張表進(jìn)行重分區(qū)操作,保證兩張表相同的key落到相同的分區(qū)。

  2. 對(duì)于單個(gè)分區(qū)節(jié)點(diǎn)兩個(gè)表的數(shù)據(jù),分別進(jìn)行按照key排序。

  3. 對(duì)排好序的兩張分區(qū)表數(shù)據(jù)執(zhí)行join操作。join操作很簡(jiǎn)單,分別遍歷兩個(gè)有序序列,碰到相同join key就merge輸出,否則取更小一邊。'
    Spark join種類(>3種)及join選擇依據(jù)
    Spark 3.1以后的spark版本對(duì)sortmergejoin又進(jìn)一步優(yōu)化了。

Spark SQL的join方式選擇


假如用戶使用Spark SQL的適合用了hints,那Spark會(huì)先采用Hints提示的join方式。
broadcastHashJoin,hints寫(xiě)法如下:

-- 支持 BROADCAST, BROADCASTJOIN and MAPJOIN 來(lái)表達(dá) broadcast hint
SELECT /*  BROADCAST(r) */ * FROM records r JOIN src s ON r.key = s.key

ShuffledHashJoin,hints的sql寫(xiě)法如下:

-- 僅支持 SHUFFLE_HASH 來(lái)表達(dá) ShuffledHashJoin hint
SELECT /*  SHUFFLE_HASH(r) */ * FROM records r JOIN src s ON r.key = s.key

SortMergeJoin,hints的SQL寫(xiě)法如下:

-- 支持 SHUFFLE_MERGE, MERGE and MERGEJOIN 來(lái)表達(dá) SortMergeJoin hintSELECT /*  MERGEJOIN(r) */ * FROM records r JOIN src s ON r.key = s.key

假設(shè)用戶沒(méi)有使用hints,默認(rèn)順序是:
1.先判斷,假設(shè)join的表統(tǒng)計(jì)信息現(xiàn)實(shí),一張表大小大于0,且小于等于用戶配置的自動(dòng)廣播閾值則,采用廣播。

plan.stats.sizeInBytes >= 0 && plan.stats.sizeInBytes <= conf.autoBroadcastJoinThreshold
參數(shù):spark.sql.autoBroadcastJoinThreshold

假設(shè)兩張表都滿足廣播需求,選最小的。
2.不滿足廣播就判斷是否滿足ShuffledHashJoin,首先下面參數(shù)要設(shè)置為false,默認(rèn)為true。

spark.sql.join.preferSortMergeJoin=true,

還有兩個(gè)條件,根據(jù)統(tǒng)計(jì)信息,表的bytes是廣播的閾值*總并行度:

plan.stats.sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions

并且該表bytes乘以3要小于等于另一張表的bytes:

a.stats.sizeInBytes * 3 <= b.stats.sizeInBytes

那么這張表就適合分治之后,作為每個(gè)分區(qū)構(gòu)建本地hashtable的表。
3.不滿足廣播,也不滿足ShuffledHashJoin,就判斷是否滿足SortMergeJoin。條件很簡(jiǎn)單,那就是key要支持可排序。

def createSortMergeJoin() = {
if (RowOrdering.isOrderable(leftKeys)) {
    Some(Seq(joins.SortMergeJoinExec(
      leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right))))
  } else {
    None
  }
}

這段代碼是在SparkStrageties類,JoinSelection單例類內(nèi)部。

createBroadcastHashJoin(hintToBroadcastLeft(hint), hintToBroadcastRight(hint))
  .orElse { if (hintToSortMergeJoin(hint)) createSortMergeJoin() else None }
  .orElse(createShuffleHashJoin(hintToShuffleHashLeft(hint), hintToShuffleHashRight(hint)))
  .orElse { if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None }
  .getOrElse(createJoinWithoutHint())

當(dāng)然,這三種join都是等值join,之前的版本Spark僅僅支持等值join但是不支持非等值join,常見(jiàn)的業(yè)務(wù)開(kāi)發(fā)中確實(shí)存在非等值join的情況,spark目前支持非等值join的實(shí)現(xiàn)有以下兩種,由于實(shí)現(xiàn)問(wèn)題,確實(shí)很容易o(hù)om。

Broadcast nested loop joinShuffle-and-replicate nested loop join。

Spark join種類(>3種)及join選擇依據(jù)

    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購(gòu)買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶 評(píng)論公約

    類似文章 更多