| 浪院長(zhǎng) 浪尖聊大數(shù)據(jù) hashjoinjoin是作為業(yè)務(wù)開(kāi)發(fā)繞不開(kāi)的SQL話題,無(wú)論是傳統(tǒng)的數(shù)據(jù)庫(kù)join,還是大數(shù)據(jù)里的join。 Spark BroadCastHashJoin翻過(guò)源碼之后你就會(huì)發(fā)現(xiàn),Spark 1.6之前實(shí)現(xiàn)BroadCastHashJoin就是利用的Java的HashMap來(lái)實(shí)現(xiàn)的。大家感興趣可以去Spark 1.6的源碼里搜索BroadCastHashJoin,HashedRelation,探查一下源碼。 ShuffledHashJoinBroadCastHashJoin適合的是大表和小表的join策略,將整個(gè)小表廣播。很多時(shí)候,參與join的表本身都不適合廣播,也不適合放入內(nèi)存,但是按照一定分區(qū)拆開(kāi)后就可以放入內(nèi)存構(gòu)建為HashRelation。這個(gè)就是分治思想了,將兩張表按照相同的hash分區(qū)器及分區(qū)數(shù)進(jìn)行,對(duì)join條件進(jìn)行分區(qū),那么需要join的key就會(huì)落入相同的分區(qū)里,然后就可以利用本地join的策略來(lái)進(jìn)行join了。 
 SortMergeJoin上面兩張情況都是小表本身適合放入內(nèi)存或者中表經(jīng)過(guò)分區(qū)治理后適合放入內(nèi)存,來(lái)完成本地化hashedjoin,小表數(shù)據(jù)放在內(nèi)存中,很奢侈的,所以經(jīng)常會(huì)遇到j(luò)oin,就oom。小表,中表都是依據(jù)內(nèi)存說(shuō)的,你內(nèi)存無(wú)限,那是最好。 
 Spark SQL的join方式選擇假如用戶使用Spark SQL的適合用了hints,那Spark會(huì)先采用Hints提示的join方式。 -- 支持 BROADCAST, BROADCASTJOIN and MAPJOIN 來(lái)表達(dá) broadcast hint SELECT /* BROADCAST(r) */ * FROM records r JOIN src s ON r.key = s.key ShuffledHashJoin,hints的sql寫(xiě)法如下: -- 僅支持 SHUFFLE_HASH 來(lái)表達(dá) ShuffledHashJoin hint SELECT /* SHUFFLE_HASH(r) */ * FROM records r JOIN src s ON r.key = s.key SortMergeJoin,hints的SQL寫(xiě)法如下: -- 支持 SHUFFLE_MERGE, MERGE and MERGEJOIN 來(lái)表達(dá) SortMergeJoin hintSELECT /* MERGEJOIN(r) */ * FROM records r JOIN src s ON r.key = s.key 假設(shè)用戶沒(méi)有使用hints,默認(rèn)順序是: plan.stats.sizeInBytes >= 0 && plan.stats.sizeInBytes <= conf.autoBroadcastJoinThreshold 參數(shù):spark.sql.autoBroadcastJoinThreshold 假設(shè)兩張表都滿足廣播需求,選最小的。 spark.sql.join.preferSortMergeJoin=true, 還有兩個(gè)條件,根據(jù)統(tǒng)計(jì)信息,表的bytes是廣播的閾值*總并行度: plan.stats.sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions 并且該表bytes乘以3要小于等于另一張表的bytes: a.stats.sizeInBytes * 3 <= b.stats.sizeInBytes 那么這張表就適合分治之后,作為每個(gè)分區(qū)構(gòu)建本地hashtable的表。 def createSortMergeJoin() = {
if (RowOrdering.isOrderable(leftKeys)) {
    Some(Seq(joins.SortMergeJoinExec(
      leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right))))
  } else {
    None
  }
}這段代碼是在SparkStrageties類,JoinSelection單例類內(nèi)部。 createBroadcastHashJoin(hintToBroadcastLeft(hint), hintToBroadcastRight(hint))
  .orElse { if (hintToSortMergeJoin(hint)) createSortMergeJoin() else None }
  .orElse(createShuffleHashJoin(hintToShuffleHashLeft(hint), hintToShuffleHashRight(hint)))
  .orElse { if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None }
  .getOrElse(createJoinWithoutHint())當(dāng)然,這三種join都是等值join,之前的版本Spark僅僅支持等值join但是不支持非等值join,常見(jiàn)的業(yè)務(wù)開(kāi)發(fā)中確實(shí)存在非等值join的情況,spark目前支持非等值join的實(shí)現(xiàn)有以下兩種,由于實(shí)現(xiàn)問(wèn)題,確實(shí)很容易o(hù)om。 Broadcast nested loop joinShuffle-and-replicate nested loop join。 | 
|  | 
來(lái)自: 印度阿三17 > 《開(kāi)發(fā)》