小男孩‘自慰网亚洲一区二区,亚洲一级在线播放毛片,亚洲中文字幕av每天更新,黄aⅴ永久免费无码,91成人午夜在线精品,色网站免费在线观看,亚洲欧洲wwwww在线观看

分享

Spark學(xué)習(xí)筆記總結(jié)

 看風(fēng)景D人 2016-10-07

Spark簡介

     spark 可以很容易和yarn結(jié)合,直接調(diào)用HDFS、Hbase上面的數(shù)據(jù),和hadoop結(jié)合。配置很容易。

     spark發(fā)展迅猛,框架比hadoop更加靈活實用。減少了延時處理,提高性能效率實用靈活性。也可以與hadoop切實相互結(jié)合。 

     spark核心部分分為RDD。Spark SQL、Spark Streaming、MLlib、GraphX、Spark R等核心組件解決了很多的大數(shù)據(jù)問題,其完美的框架日受歡迎。其相應(yīng)的生態(tài)環(huán)境包括zepplin等可視化方面,正日益壯大。大型公司爭相實用spark來代替原有hadoop上相應(yīng)的功能模塊。Spark讀寫過程不像hadoop溢出寫入磁盤,都是基于內(nèi)存,因此速度很快。另外DAG作業(yè)調(diào)度系統(tǒng)的寬窄依賴讓Spark速度提高。

 


 

Spark核心組成

1、RDD

     是彈性分布式數(shù)據(jù)集,完全彈性的,如果數(shù)據(jù)丟失一部分還可以重建。有自動容錯、位置感知調(diào)度和可伸縮性,通過數(shù)據(jù)檢查點和記錄數(shù)據(jù)更新金象容錯性檢查。通過SparkContext.textFile()加載文件變成RDD,然后通過transformation構(gòu)建新的RDD,通過action將RDD存儲到外部系統(tǒng)。

     RDD使用延遲加載,也就是懶加載,只有當(dāng)用到的時候才加載數(shù)據(jù)。如果加載存儲所有的中間過程會浪費空間。因此要延遲加載。一旦spark看到整個變換鏈,他可以計算僅需的結(jié)果數(shù)據(jù),如果下面的函數(shù)不需要數(shù)據(jù)那么數(shù)據(jù)也不會再加載。轉(zhuǎn)換RDD是惰性的,只有在動作中才可以使用它們。

     Spark分為driver和executor,driver提交作業(yè),executor是application早worknode上的進(jìn)程,運行task,driver對應(yīng)為sparkcontext。Spark的RDD操作有transformation、action。Transformation對RDD進(jìn)行依賴包裝,RDD所對應(yīng)的依賴都進(jìn)行DAG的構(gòu)建并保存,在worknode掛掉之后除了通過備份恢復(fù)還可以通過元數(shù)據(jù)對其保存的依賴再計算一次得到。當(dāng)作業(yè)提交也就是調(diào)用runJob時,spark會根據(jù)RDD構(gòu)建DAG圖,提交給DAGScheduler,這個DAGScheduler是在SparkContext創(chuàng)建時一同初始化的,他會對作業(yè)進(jìn)行調(diào)度處理。當(dāng)依賴圖構(gòu)建好以后,從action開始進(jìn)行解析,每一個操作作為一個task,每遇到shuffle就切割成為一個taskSet,并把數(shù)據(jù)輸出到磁盤,如果不是shuffle數(shù)據(jù)還在內(nèi)存中存儲。就這樣再往前推進(jìn),直到?jīng)]有算子,然后運行從前面開始,如果沒有action的算子在這里不會執(zhí)行,直到遇到action為止才開始運行,這就形成了spark的懶加載,taskset提交給TaskSheduler生成TaskSetManager并且提交給Executor運行,運行結(jié)束后反饋給DAGScheduler完成一個taskSet,之后再提交下一個,當(dāng)TaskSet運行失敗時就返回DAGScheduler并重新再次創(chuàng)建。一個job里面可能有多個TaskSet,一個application可能包含多個job。

 


 

2、Spark Streaming 

     通過對kafka數(shù)據(jù)讀取,將Stream數(shù)據(jù)分成小的時間片段(幾秒),以類似batch批處理的方式來處理這一部分小數(shù)據(jù),每個時間片生成一個RDD,有高效的容錯性,對小批量數(shù)據(jù)可以兼容批量實時數(shù)據(jù)處理的邏輯算法,用一些歷史數(shù)據(jù)和實時數(shù)據(jù)聯(lián)合進(jìn)行分析,比如分類算法等。也可以對小批量的stream進(jìn)行mapreduce、join等操作,而保證其實時性。針對數(shù)據(jù)流時間要求不到毫秒級的工程性問題都可以。

     Spark Streaming也有一個StreamingContext,其核心是DStream,是通過以組時間序列上的連續(xù)RDD來組成的,包含一個有Time作為key、RDD作為value的結(jié)構(gòu)體,每一個RDD都包含特定時間間隔的數(shù)據(jù)流,可以通過persist將其持久化。在接受不斷的數(shù)據(jù)流后,在blockGenerator中維護(hù)一個隊列,將流數(shù)據(jù)放到隊列中,等處理時間間隔到來后將其中的所有數(shù)據(jù)合并成為一個RDD(這一間隔中的數(shù)據(jù))。其作業(yè)提交和spark相似,只不過在提交時拿到DStream內(nèi)部的RDD并產(chǎn)生Job提交,RDD在action觸發(fā)之后,將job提交給jobManager中的JobQueue,又jobScheduler調(diào)度,JobScheduler將job提交到spark的job調(diào)度器,然后將job轉(zhuǎn)換成為大量的任務(wù)分發(fā)給spark集群執(zhí)行。Job從outputStream中生成的,然后觸發(fā)反向回溯執(zhí)行DStreamDAG。在流數(shù)據(jù)處理的過程中,一般節(jié)點失效的處理比離線數(shù)據(jù)要復(fù)雜。Spark streamin在1.3之后可以周期性的將DStream寫入HDFS,同時將offset也進(jìn)行存儲,避免寫到zk。一旦主節(jié)點失效,會通過checkpoint的方式讀取之前的數(shù)據(jù)。當(dāng)worknode節(jié)點失效,如果HDFS或文件作為輸入源那Spark會根據(jù)依賴關(guān)系重新計算數(shù)據(jù),如果是基于Kafka、Flume等網(wǎng)絡(luò)數(shù)據(jù)源spark會將手機(jī)的數(shù)據(jù)源在集群中的不同節(jié)點進(jìn)行備份,一旦有一個工作節(jié)點失效,系統(tǒng)能夠根據(jù)另一份還存在的數(shù)據(jù)重新計算,但是如果接受節(jié)點失效會丟失一部分?jǐn)?shù)據(jù),同時接受線程會在其他的節(jié)點上重新啟動并接受數(shù)據(jù)。

 


 

3、Graphx

     主要用于圖的計算。核心算法有PageRank、SVD奇異矩陣、TriangleConut等。

 


 

4、Spark SQL

    是Spark新推出的交互式大數(shù)據(jù)SQL技術(shù)。把sql語句翻譯成Spark上的RDD操作可以支持Hive、Json等類型的數(shù)據(jù)。

 


 

5、Spark R

     通過R語言調(diào)用spark,目前不會擁有像Scala或者java那樣廣泛的API,Spark通過RDD類提供Spark API,并且允許用戶使用R交互式方式在集群中運行任務(wù)。同時集成了MLlib機(jī)器學(xué)習(xí)類庫。

 


 

6、MLBase

     從上到下包括了MLOptimizer(給使用者)、MLI(給算法使用者)、MLlib(給算法開發(fā)者)、Spark。也可以直接使用MLlib。ML Optimizer,一個優(yōu)化機(jī)器學(xué)習(xí)選擇更合適的算法和相關(guān)參數(shù)的模塊,還有MLI進(jìn)行特征抽取和高級ML編程 抽象算法實現(xiàn)API平臺,MLlib分布式機(jī)器學(xué)習(xí)庫,可以不斷擴(kuò)充算法。MLRuntime基于spark計算框架,將Spark的分布式計算應(yīng)用到機(jī)器學(xué)習(xí)領(lǐng)域。MLBase提供了一個簡單的聲明方法指定機(jī)器學(xué)習(xí)任務(wù),并且動態(tài)地選擇最優(yōu)的學(xué)習(xí)算法。

 


 

7、Tachyon

      高容錯的分布式文件系統(tǒng)。宣稱其性能是HDFS的3000多倍。有類似java的接口,也實現(xiàn)了HDFS接口,所以Spark和MR程序不需要任何的修改就可以運行。目前支持HDFS、S3等。


 

8、Spark算子

1、Map。對原數(shù)據(jù)進(jìn)行處理,類似于遍歷操作,轉(zhuǎn)換成MappedRDD,原分區(qū)不變。

2、flatMap。將原來的RDD中的每一個元素通過函數(shù)轉(zhuǎn)換成新的元素,將RDD的每個集合中的元素合并成一個集合。比如一個元素里面多個list,通過這個函數(shù)都合并成一個大的list,最經(jīng)典的就是wordcount中將每一行元素進(jìn)行分詞以后成為,通過flapMap變成一個個的單詞,line.flapMap(_.split(“ ”)).map((_,1))如果通過map就會將一行的單詞變成一個list。

3、mapPartitions。對每個分區(qū)進(jìn)行迭代,生成MapPartitionsRDD。

4、Union。是將兩個RDD合并成一個。使用這個函數(shù)要保證兩個RDD元素的數(shù)據(jù)類型相同,返回的RDD的數(shù)據(jù)類型和被合并的RDD數(shù)據(jù)類型相同。

5、Filter。其功能是對元素進(jìn)行過濾,對每個元素調(diào)用f函數(shù),返回值為true的元素就保留在RDD中。

6、Distinct。對RDD中元素進(jìn)行去重操作。

7、Subtract。對RDD1中取出RDD1與RDD2交集中的所有元素。

8、Sample。對RDD中的集合內(nèi)元素進(jìn)行采樣,第一個參數(shù)withReplacement是true表示有放回取樣,false表示無放回。第二個參數(shù)表示比例,第三個參數(shù)是隨機(jī)種子。如data.sample(true, 0.3,new Random().nextInt())。

9、takeSample。和sample用法相同,只不第二個參數(shù)換成了個數(shù)。返回也不是RDD,而是collect。

10、Cache。將RDD緩存到內(nèi)存中。相當(dāng)于persist(MEMORY_ONLY)??梢酝ㄟ^參數(shù)設(shè)置緩存和運行內(nèi)存之間的比例,如果數(shù)據(jù)量大于cache內(nèi)存則會丟失。

11、Persist。里面參數(shù)可以選擇DISK_ONLY/MEMORY_ONLY/MEMORY_AND_DISK等,其中的MEMORY_AND_DISK當(dāng)緩存空間滿了后自動溢出到磁盤。

12、MapValues。針對KV數(shù)據(jù),對數(shù)據(jù)中的value進(jìn)行map操作,而不對key進(jìn)行處理。

13、reduceByKey。針對KV數(shù)據(jù)將相同key的value聚合到一起。與groupByKey不同,會進(jìn)行一個類似mapreduce中的combine操作,減少相應(yīng)的數(shù)據(jù)IO操作,加快效率。如果想進(jìn)行一些非疊加操作,我們可以將value組合成字符串或其他格式將相同key的value組合在一起,再通過迭代,組合的數(shù)據(jù)拆開操作。

14、partitionBy。可以將RDD進(jìn)行分區(qū),重新生成一個ShuffleRDD,進(jìn)行一個shuffle操作,對后面進(jìn)行頻繁的shuffle操作可以加快效率。

15、randomSplit。對RDD進(jìn)行隨機(jī)切分。如data.randomSplit(new double[]{0.7, 0.3})返回一個RDD的數(shù)組。

16、Cogroup。對兩個RDD中的KV元素,每個RDD中相同key中的元素分別聚合成一個集合。與reduceByKey不同的是針對兩個RDD中相同的key的元素進(jìn)行合并。

17、Join。相當(dāng)于inner join。對兩個需要連接的RDD進(jìn)行cogroup,然后對每個key下面的list進(jìn)行笛卡爾積的操作,輸出兩兩相交的兩個集合作為value。 相當(dāng)于sql中where a.key=b.key。

18、leftOutJoin,rightOutJoin。在數(shù)據(jù)庫中左連接以左表為坐標(biāo)將表中所有的數(shù)據(jù)列出來,右面不存在的用null填充。在這里面對join的基礎(chǔ)上判斷左側(cè)的RDD元素是否是空,如果是空則填充。右連接則相反。

19、saveAsTestFile。將數(shù)據(jù)輸出到HDFS的指定目錄。

20、saveAsObjectFile。寫入HDFS為SequenceFile格式。

21、Collect、collectAsMap。將RDD轉(zhuǎn)換成list或者M(jìn)ap。結(jié)果以List或者HashMap的方式輸出。

22、Count。對RDD的元素進(jìn)行統(tǒng)計,返回個數(shù)。

23、Top(k)。返回最大的k個元素,返回List的形式。

24、Take返回數(shù)據(jù)的前k個元素。

25、takeOrdered。返回數(shù)據(jù)的最小的k個元素,并在返回中保持元素的順序。


 

9、Tips

1、RDD.repartition(n)可以在最初對RDD進(jìn)行分區(qū)操作,這個操作實際上是一個shuffle,可能比較耗時,但是如果之后的action比較多的話,可以減少下面操作的時間。其中的n值看cpu的個數(shù),一般大于2倍cpu,小于1000。

2、Action不能夠太多,每一次的action都會將以上的taskset劃分一個job,這樣當(dāng)job增多,而其中task并不釋放,會占用更多的內(nèi)存,使得gc拉低效率。

3、在shuffle前面進(jìn)行一個過濾,減少shuffle數(shù)據(jù),并且過濾掉null值,以及空值。

4、groupBy盡量通過reduceBy替代。reduceBy會在work節(jié)點做一次reduce,在整體進(jìn)行reduce,相當(dāng)于做了一次hadoop中的combine操作,而combine操作和reduceBy邏輯一致,這個groupBy不能保證。

5、做join的時候,盡量用小RDD去join大RDD,用大RDD去join超大的RDD。

6、避免collect的使用。因為collect如果數(shù)據(jù)集超大的時候,會通過各個work進(jìn)行收集,io增多,拉低性能,因此當(dāng)數(shù)據(jù)集很大時要save到HDFS。

7、RDD如果后面使用迭代,建議cache,但是一定要估計好數(shù)據(jù)的大小,避免比cache設(shè)定的內(nèi)存還要大,如果大過內(nèi)存就會刪除之前存儲的cache,可能導(dǎo)致計算錯誤,如果想要完全的存儲可以使用persist(MEMORY_AND_DISK),因為cache就是persist(MEMORY_ONLY)。

8、設(shè)置spark.cleaner.ttl,定時清除task,因為job的原因可能會緩存很多執(zhí)行過去的task,所以定時回收可能避免集中g(shù)c操作拉低性能。

9、適當(dāng)pre-partition,通過partitionBy()設(shè)定,每次partiti

 

轉(zhuǎn):http://www.cnblogs.com/hellochennan/p/5372946.html

    本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊一鍵舉報。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多