小男孩‘自慰网亚洲一区二区,亚洲一级在线播放毛片,亚洲中文字幕av每天更新,黄aⅴ永久免费无码,91成人午夜在线精品,色网站免费在线观看,亚洲欧洲wwwww在线观看

分享

Spark Streaming實(shí)踐和優(yōu)化

 melon1024 2016-08-16







一、Spark Streaming概述

Spark是美國(guó)加州伯克利大學(xué)AMP實(shí)驗(yàn)室推出的新一代分布式計(jì)算框架,其核心概念是RDD,一個(gè)只讀的、有容錯(cuò)機(jī)制的分布式數(shù)據(jù)集,RDD可以全部緩存在內(nèi)存中,并在多次計(jì)算中重復(fù)使用。相比于MapReduce編程模型,Spark具有以下幾個(gè)優(yōu)點(diǎn):

  1. 更大的靈活性和更高的抽象層次,使得用戶(hù)用更少的代碼即可實(shí)現(xiàn)同樣的功能;

  2. 適合迭代算法,在MapReduce編程模型中,每一輪迭代都需要讀寫(xiě)一次HDFS,磁盤(pán)IO負(fù)載很大,相比之下,Spark中的RDD可以緩存在內(nèi)存中,只需第一次讀入HDFS文件,之后迭代的數(shù)據(jù)全都存儲(chǔ)在內(nèi)存中,這使得程序的計(jì)算速度可提升約10-100倍。

SparkStreamingSpark生態(tài)系統(tǒng)中的重要組成部分,在實(shí)現(xiàn)上復(fù)用Spark計(jì)算引擎。如圖1所示,Spark Streaming支持數(shù)據(jù)源有很多,如Kafka、Flume、TCP等。SparkStreaming內(nèi)部的數(shù)據(jù)表示形式為DStreamDiscretizedStream離散的數(shù)據(jù)流,接口設(shè)計(jì)與RDD非常相似,這使得它對(duì)Spark用戶(hù)非常友好。SparkStreaming的核心思想是把流式處理轉(zhuǎn)化為“微批處理”,即以時(shí)間為單位切分?jǐn)?shù)據(jù)流,每個(gè)切片內(nèi)的數(shù)據(jù)對(duì)應(yīng)一個(gè)RDD,進(jìn)而可以采用Spark引擎進(jìn)行快速計(jì)算。由于SparkStreaming采用了微批處理方式,是近實(shí)時(shí)的處理系統(tǒng),而不是嚴(yán)格意義上的流式處理系統(tǒng)。

1Spark Streaming數(shù)據(jù)流

        另一個(gè)著名的開(kāi)源流式計(jì)算引擎是Storm,這是一個(gè)真正的流式處理系統(tǒng),它每次從數(shù)據(jù)源讀一條數(shù)據(jù),然后單獨(dú)處理。相比于SparkStreamingStorm有更快速的響應(yīng)時(shí)間(小于一秒),更適合低延遲的應(yīng)用場(chǎng)景,比如信用卡欺詐系統(tǒng),廣告系統(tǒng)等。相比之下,SparkStreaming的優(yōu)勢(shì)是吞吐量大,響應(yīng)時(shí)間也可以接受(秒級(jí)),并且兼容Spark系統(tǒng)中的其他工具庫(kù)如MLlibGraphX。對(duì)于時(shí)間不敏感且流量很大的系統(tǒng),Spark Streaming是更優(yōu)的選擇。

 

二、Spark StreamingHulu應(yīng)用

Hulu是美國(guó)的專(zhuān)業(yè)在線視頻網(wǎng)站,每天會(huì)有大量用戶(hù)在線觀看視頻,進(jìn)而產(chǎn)生大量用戶(hù)觀看行為數(shù)據(jù),這些數(shù)據(jù)通過(guò)收集系統(tǒng)進(jìn)入Hulu的大數(shù)據(jù)平臺(tái),從而進(jìn)行存儲(chǔ)和進(jìn)一步處理。在大數(shù)據(jù)平臺(tái)之上,各個(gè)團(tuán)隊(duì)會(huì)根據(jù)需要設(shè)計(jì)相應(yīng)的算法對(duì)數(shù)據(jù)進(jìn)行分析和挖掘以便產(chǎn)生商業(yè)價(jià)值:推薦團(tuán)隊(duì)從這些數(shù)據(jù)里挖掘出用戶(hù)感興趣的內(nèi)容并做精準(zhǔn)推薦,廣告團(tuán)隊(duì)根據(jù)用戶(hù)的歷史行為推送最合適的廣告,數(shù)據(jù)團(tuán)隊(duì)從數(shù)據(jù)的各個(gè)維度進(jìn)行分析從而為公司的策略制定提供可靠依據(jù)。

Hulu大數(shù)據(jù)平臺(tái)的實(shí)現(xiàn)依循Lambda架構(gòu)。Lambda架構(gòu)是一個(gè)通用的大數(shù)據(jù)處理框架,包含離線的批處理層、在線的加速層和服務(wù)層三部分,具體如圖2所示。服務(wù)層一般使用HTTP服務(wù)或自定制的客戶(hù)端對(duì)外提供數(shù)據(jù)訪問(wèn),離線的批處理層一般使用批處理計(jì)算框架SparkMapReduce進(jìn)行數(shù)據(jù)分析,在線的加速層一般使用流式實(shí)時(shí)計(jì)算框架SparkStreamingStorm進(jìn)行數(shù)據(jù)分析。



2lambda架構(gòu)原理圖

對(duì)于實(shí)時(shí)計(jì)算部分,Hulu內(nèi)部使用了Kafka、CodisSparkStreaming。面按照數(shù)據(jù)流的過(guò)程,介紹我們的項(xiàng)目。

1.     收集數(shù)據(jù) - 從服務(wù)器日志中收集數(shù)據(jù),流程如圖3所示:

  1. 來(lái)自網(wǎng)頁(yè)、手機(jī)App、機(jī)頂盒等設(shè)備的用戶(hù)產(chǎn)生視頻觀看、廣告點(diǎn)擊等行為,這些行為數(shù)據(jù)記錄在各自的Nginx服務(wù)的日志中;

  2. 使用Flume將用戶(hù)行為數(shù)據(jù)同時(shí)導(dǎo)入HDFSKafka,其中HDFS中的數(shù)據(jù)用于離線分析,而Kafka中數(shù)據(jù)則用于流式實(shí)時(shí)分析。

3Hulu數(shù)據(jù)收集流程

2.     存儲(chǔ)標(biāo)簽數(shù)據(jù) - Hulu使用HBase存儲(chǔ)用戶(hù)標(biāo)簽數(shù)據(jù),包括基本信息如性別、年齡、是否付費(fèi),以及其他模型推測(cè)出來(lái)的偏好屬性。這些屬性需要作為計(jì)算模型的輸入,同時(shí)HBase隨機(jī)讀取的速度比較慢,所以需要將數(shù)據(jù)同步到緩存服務(wù)器中以加快數(shù)據(jù)讀取速度。Redis是一個(gè)應(yīng)用廣泛的開(kāi)源緩存服務(wù)器,一個(gè)免費(fèi)開(kāi)源的高性能Key-Value數(shù)據(jù)庫(kù),但其本身是個(gè)單機(jī)系統(tǒng),不能很好地支持大量數(shù)據(jù)的緩存。為解決Redis擴(kuò)展性差的問(wèn)題,豌豆莢開(kāi)源了Codis,一個(gè)分布式Redis解決方案。HuluCodis打成Docker鏡像,并實(shí)現(xiàn)一鍵式構(gòu)建緩存系統(tǒng),附帶自動(dòng)監(jiān)控和修復(fù)功能。為了更精細(xì)的監(jiān)控,我們構(gòu)建了多個(gè)Codis緩存,分別是:

  1.   codis-profile,同步HBase中的用戶(hù)屬性;

  2.   codis-action,緩存來(lái)自Kafka的用戶(hù)行為;

  3.   codis-result,記錄計(jì)算結(jié)果。

3.     實(shí)時(shí)處理數(shù)據(jù) - 準(zhǔn)備就緒,啟動(dòng)Spark Streaming程序

1)      SparkStreaming啟動(dòng)Kafka Receiver,持續(xù)Kafka服務(wù)器拉數(shù)據(jù);

2)      每隔兩秒,Kafka的數(shù)據(jù)被整理成一個(gè)RDD,交給Spark引擎處理;

3)      對(duì)一條用戶(hù)行為,Spark會(huì)從codis-action緩存中拿到該用戶(hù)的行為記錄,然后把新的行為追加進(jìn)去;

4)      Sparkcodis-actioncodis-profile中獲得該用戶(hù)的所有相關(guān)屬性,然后執(zhí)行廣告推薦的計(jì)算模型,最后把結(jié)果寫(xiě)入codis-result,進(jìn)而供服務(wù)層實(shí)時(shí)讀取這些結(jié)果。

 

三、Spark Streaming優(yōu)化經(jīng)驗(yàn)

        實(shí)踐中,業(yè)務(wù)邏輯首先保證完成,使得在Kafka輸入數(shù)據(jù)量較小的情況下系統(tǒng)穩(wěn)定運(yùn)行,且輸入輸出滿(mǎn)足項(xiàng)目需求。然后開(kāi)始調(diào)優(yōu),修改SparkStreaming的參數(shù),比如Executor的數(shù)量,Core的數(shù)量,Receiver的流量等。最后發(fā)現(xiàn)僅調(diào)參數(shù)無(wú)法完全滿(mǎn)足本項(xiàng)目的業(yè)務(wù)場(chǎng)景,所以有更進(jìn)一步的優(yōu)化方案,總結(jié)如下:

1.    Executor初始化

很多機(jī)器學(xué)習(xí)的模型在第一次運(yùn)行時(shí),需要執(zhí)行初始化方法,還會(huì)連接外部的數(shù)據(jù)庫(kù),常常需要5-10分鐘,這會(huì)成為潛在的不穩(wěn)定因素。在Spark Streaming應(yīng)用中,當(dāng)Receiver完成初始化,它就開(kāi)始源源不斷地接收數(shù)據(jù),并且由Driver定期調(diào)度任務(wù)消耗這些數(shù)據(jù)。如果剛啟動(dòng)時(shí)Executor需要幾分鐘做準(zhǔn)備,會(huì)導(dǎo)致第一個(gè)作業(yè)一直沒(méi)有完成,這段時(shí)間內(nèi)Driver不會(huì)調(diào)度新的作業(yè)。這時(shí)候在Kafka Receiver端會(huì)有數(shù)據(jù)積壓,隨著積壓的數(shù)據(jù)量越來(lái)越大,大部分?jǐn)?shù)據(jù)會(huì)撐過(guò)新生代進(jìn)入老年代,進(jìn)而給Java GC帶來(lái)嚴(yán)重的壓力,容易引發(fā)應(yīng)用程序崩潰。

本項(xiàng)目的解決方案是,修改Spark內(nèi)核,在每個(gè)Executor接收任務(wù)之前先執(zhí)行一個(gè)用戶(hù)自定義的初始化函數(shù),初始化函數(shù)中可以執(zhí)行一些獨(dú)立的用戶(hù)邏輯。示例代碼如下:

  // sc:SparkContext, setupEnvironmentHulu擴(kuò)展的API

  sc.setupEnvironment(() => {

    application.initialize() // 用戶(hù)應(yīng)用程序初始化,需執(zhí)行幾分鐘

    println(“Invoke executor  setup method successfully.”)

  })

該方案需要更改Spark的任務(wù)調(diào)度器,首先將每個(gè)Executor設(shè)置為未初始化狀態(tài)。此時(shí),調(diào)度器只會(huì)給未初始化狀態(tài)的Executor分配初始化任務(wù)(執(zhí)行前面提到的初始化函數(shù))。等初始化任務(wù)完畢,調(diào)度器更新Executor的狀態(tài)為已初始化,這樣的Executor才可以分配正常的計(jì)算任務(wù)。

2.    異步處理Task中的業(yè)務(wù)邏輯

本項(xiàng)目中,模型的輸入?yún)?shù)均來(lái)自Codis,甚至模型內(nèi)部也可能訪問(wèn)外部存儲(chǔ),直接導(dǎo)致模型計(jì)算時(shí)長(zhǎng)不穩(wěn)定,很多時(shí)間消耗在網(wǎng)絡(luò)等待上。

為提高系統(tǒng)吞吐量,增大并行度是比較通用的優(yōu)化方案,但在本項(xiàng)目的場(chǎng)景中并不適用。因?yàn)?/span>Spark作業(yè)的調(diào)度策略是,等待上一個(gè)作業(yè)的所有Task執(zhí)行完畢,然后調(diào)度下一個(gè)作業(yè)。如果單個(gè)Task的運(yùn)行時(shí)間不穩(wěn)定,易發(fā)生個(gè)別Task拖慢整個(gè)作業(yè)的情況,以至于資源利用率不高,系統(tǒng)吞吐量上不去;甚至并行度越大,該問(wèn)題越嚴(yán)重。一種常用的解決Task不穩(wěn)定的方案是增大Spark Streamingmicro batch的時(shí)間間隔,該方案會(huì)使整個(gè)實(shí)時(shí)系統(tǒng)的延遲變長(zhǎng),并不推薦。

該問(wèn)題的解決方案是異步處理Task中的業(yè)務(wù)邏輯。如下文的代碼所示,同步方案中,Task內(nèi)執(zhí)行業(yè)務(wù)邏輯,處理時(shí)間不定;異步方案中,Task把業(yè)務(wù)邏輯嵌入線程,交給線程池執(zhí)行,Task立刻結(jié)束,ExecutorDriver報(bào)告執(zhí)行完畢,異步處理的時(shí)間非常短,在100ms以?xún)?nèi)。另外,當(dāng)線程池中積壓的線程數(shù)量太大時(shí)(代碼中qsize>100的情況),會(huì)暫時(shí)使用同步處理,配合反壓機(jī)制(見(jiàn)下文的參數(shù)spark.streaming.backpressure.enabled),可以保證不會(huì)因?yàn)閿?shù)據(jù)積壓過(guò)多而導(dǎo)致系統(tǒng)崩潰。為設(shè)置合適的線程池大小,我們借助JVisualVM工具監(jiān)控ExecutorCPU使用率,通過(guò)調(diào)整參數(shù)找到最優(yōu)并發(fā)線程數(shù)。經(jīng)實(shí)驗(yàn)驗(yàn)證,該方案大大提高了系統(tǒng)的吞吐量。

  // 同步處理

  // 函數(shù)runBusinessLogic Task 中的業(yè)務(wù)邏輯,執(zhí)行時(shí)間不定

  rdd.foreachPartition(partition  => runBusinessLogic (partition))

 

  // 異步處理,threadPool是線程池

   rdd.foreachPartition(partition => {

    val  qsize = threadPool.getQueue.size // 線程池中積壓的線程數(shù)

    if  (qsize > 100) {

      runBusinessLogic(partition)  // 暫時(shí)同步處理

    }

     threadPool.execute(new Runnable {

       override def run() = runBusinessLogic(partition)

    })

  })

異步化Task也存在缺點(diǎn):如果Executor發(fā)生異常,存放在線程池中的業(yè)務(wù)邏輯無(wú)法重新計(jì)算,會(huì)導(dǎo)致部分?jǐn)?shù)據(jù)丟失。經(jīng)實(shí)驗(yàn)驗(yàn)證,僅當(dāng)Executor異常崩潰時(shí)有數(shù)據(jù)丟失,且不常見(jiàn),在本項(xiàng)目的場(chǎng)景中可以接受。

3.     Kafka Receiver的穩(wěn)定性

本項(xiàng)目使用SparkStreaming中的Kafka Receiver,本質(zhì)上調(diào)用Kafka官方的客戶(hù)端ZookeeperConsumerConnector。其策略是每個(gè)客戶(hù)端在Zookeeper的固定路徑下把自己注冊(cè)為臨時(shí)節(jié)點(diǎn),于是所有客戶(hù)端都知道其他客戶(hù)端的存在,然后自動(dòng)協(xié)調(diào)和分配Kafka的數(shù)據(jù)資源。該策略存在一個(gè)弊端,當(dāng)一個(gè)客戶(hù)端與Zookeeper的連接狀態(tài)發(fā)生改變(斷開(kāi)或者連上),所有的客戶(hù)端都會(huì)通過(guò)Zookeeper協(xié)調(diào),重新分配Kafka的數(shù)據(jù)資源;在此期間所有客戶(hù)端都斷開(kāi)與Kafka的連接,系統(tǒng)接收不到Kafka的數(shù)據(jù),直到重新分配成功。如果網(wǎng)絡(luò)質(zhì)量不佳,并且Receiver的個(gè)數(shù)較多,這種策略會(huì)造成數(shù)據(jù)輸入不穩(wěn)定,很多SparkStreaming用戶(hù)遇到這樣的問(wèn)題。在我們的系統(tǒng)中,該策略并沒(méi)有產(chǎn)生明顯的負(fù)面影響。值得注意的是,Kafka 客戶(hù)端與Zookeeper有個(gè)默認(rèn)的參數(shù)zookeeper.session.timeout.ms=6000,表示客戶(hù)端與Zookeeper連接的session有效時(shí)間為6秒,我們的客戶(hù)端多次出現(xiàn)因?yàn)?/span>Full GC超過(guò)6秒而與Zookeeper斷開(kāi)連接,之后再次連接上,期間所有客戶(hù)端都受到影響,系統(tǒng)表現(xiàn)不穩(wěn)定。所以項(xiàng)目設(shè)置參數(shù)zookeeper.session.timeout.ms=30000。

4.     YARN資源搶占問(wèn)題

       Hulu內(nèi)部,Spark Streaming這樣的長(zhǎng)時(shí)服務(wù)與MapRedueSpark,Hive等批處理應(yīng)用共享YARN集群資源。在共享環(huán)境中,經(jīng)常因一個(gè)批處理應(yīng)用占用大量網(wǎng)絡(luò)資源或者CPU資源,導(dǎo)致Spark Streaming服務(wù)不穩(wěn)定(盡管我們采用了CGroup進(jìn)行資源隔離,但效果不佳)。更嚴(yán)重的問(wèn)題是,如果個(gè)別Container崩潰Driver需要向YARN申請(qǐng)新的Container,或者如果整個(gè)應(yīng)用崩潰需要重啟,SparkStreaming不能保證很快申請(qǐng)到足夠的資源,也就無(wú)法保證線上服務(wù)的質(zhì)量。為解決該問(wèn)題,Hulu使用label-based scheduling的調(diào)度策略,從YARN集群中隔離出若干節(jié)點(diǎn)專(zhuān)門(mén)運(yùn)行SparkStreaming和其他長(zhǎng)時(shí)服務(wù),避免與批處理程序競(jìng)爭(zhēng)資源。

5.     完善監(jiān)控信息

監(jiān)控反映系統(tǒng)運(yùn)行的性能狀態(tài),也是一切優(yōu)化的基礎(chǔ)。SparkStreaming Web界面提供了比較豐富的監(jiān)控信息,同時(shí)本項(xiàng)目依據(jù)業(yè)務(wù)邏輯的特點(diǎn)增加了更多監(jiān)控。Hulu使用GraphiteGrafana作為第三方監(jiān)控系統(tǒng),本項(xiàng)目把系統(tǒng)中關(guān)鍵的性能參數(shù)(如計(jì)算時(shí)長(zhǎng)和次數(shù))發(fā)送給Graphite服務(wù)器,就能夠在Grafana網(wǎng)頁(yè)上看到直觀的統(tǒng)計(jì)圖。


4Graphite監(jiān)控信息,展示了Kafka中日志的剩余數(shù)量,一條線對(duì)應(yīng)于一個(gè)partition的歷史余量

4是統(tǒng)計(jì)Kafka中日志的剩余數(shù)量,一條線對(duì)應(yīng)于一個(gè)partition的歷史余量,大部分情況下余量接近零,符合預(yù)期。圖中09:55左右日志余量開(kāi)始出現(xiàn)很明顯的尖峰,之后又迅速逼近零。事后經(jīng)過(guò)多種數(shù)據(jù)核對(duì),證實(shí)Kafka的數(shù)據(jù)一直穩(wěn)定,而當(dāng)時(shí)Spark Streaming執(zhí)行作業(yè)突然變慢,反壓機(jī)制生效,于是Kafka Receiver減小讀取日志的速率,造成Kafka數(shù)據(jù)積壓;一段時(shí)間之后SparkStreaming又恢復(fù)正常,快速消耗了Kafka中的數(shù)據(jù)余量。

直觀的監(jiān)控系統(tǒng)能有效地暴露問(wèn)題,進(jìn)而理解和強(qiáng)化系統(tǒng)。對(duì)于不同的業(yè)務(wù)邏輯,需要監(jiān)控的信息也不相同。在我們的實(shí)踐中,主要的監(jiān)控指標(biāo)有:

  1.    Kafka的剩余數(shù)據(jù)量

  2.   Spark的作業(yè)運(yùn)行時(shí)間和調(diào)度時(shí)間

  3.   每個(gè)Task的計(jì)算時(shí)間

  4.   Codis的訪問(wèn)次數(shù)、時(shí)間、命中率

另外,有腳本定期分析這些統(tǒng)計(jì)數(shù)據(jù),出現(xiàn)異常則發(fā)郵件報(bào)警。比如圖4 Kafka 的日志余量過(guò)大時(shí),會(huì)有連續(xù)的報(bào)警郵件。我們的經(jīng)驗(yàn)是,監(jiān)控越細(xì)致,之后的優(yōu)化工作越輕松。同時(shí),優(yōu)秀的監(jiān)控也需要對(duì)系統(tǒng)深刻的理解。

6.    參數(shù)優(yōu)化

下表列出本項(xiàng)目中比較關(guān)鍵的幾個(gè)參數(shù):

spark.yarn.max.executor.failures

Executor允許的失敗上限如果超過(guò)該上限,整個(gè)Spark Streaming會(huì)失敗,需要設(shè)置比較大

spark.yarn.executor.memoryOverhead

ExecutorJVM的開(kāi)銷(xiāo),與堆內(nèi)存不一樣,設(shè)置太小會(huì)導(dǎo)致內(nèi)存溢出異常

spark.receivers.num

Kafka Receiver的個(gè)數(shù)

spark.streaming.receiver.maxRate

每個(gè)Receiver能夠接受數(shù)據(jù)的最大速率;這個(gè)值超過(guò)峰值約50%

spark.streaming.backpressure.enabled

反壓機(jī)制;如果目前系統(tǒng)的延遲較長(zhǎng),Receiver端會(huì)自動(dòng)減小接受數(shù)據(jù)的速率,避免系統(tǒng)因數(shù)據(jù)積壓過(guò)多而崩潰

spark.locality.wait

系統(tǒng)調(diào)度Task會(huì)盡量考慮數(shù)據(jù)的局部性,如果超過(guò)spark.locality.wait設(shè)置時(shí)間的上限,就放棄局部性;該參數(shù)直接影響Task的調(diào)度時(shí)間

spark.cleaner.ttl

Spark系統(tǒng)內(nèi)部的元信息的超時(shí)時(shí)間;Streaming長(zhǎng)期運(yùn)行,元信息累積太多會(huì)影響性能

四、總結(jié)

        Spark Streaming的產(chǎn)品上線運(yùn)行一年多,期間進(jìn)行了多次Spark版本升級(jí),從最早期的0.8版本到最近1.5.x版本。總體上Spark Streaming是一款優(yōu)秀的實(shí)時(shí)計(jì)算框架,可以在線上使用。但仍然存在一些不足,包括:

1.     Spark同時(shí)使用堆內(nèi)和堆外的內(nèi)存,缺乏一些有效的監(jiān)控信息,遇到OOM時(shí)分析和調(diào)試比較困難;

2.     缺少Executor初始化接口;

3.     Spark采用函數(shù)式編程方式,抽象層次高,好處是使用方便,壞處是理解和優(yōu)化困難;

4.     新版本的Spark有一些異常,如Shuffle過(guò)程中Block丟失、內(nèi)存溢出。


    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶(hù)發(fā)布,不代表本站觀點(diǎn)。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購(gòu)買(mǎi)等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶(hù) 評(píng)論公約

    類(lèi)似文章 更多