小男孩‘自慰网亚洲一区二区,亚洲一级在线播放毛片,亚洲中文字幕av每天更新,黄aⅴ永久免费无码,91成人午夜在线精品,色网站免费在线观看,亚洲欧洲wwwww在线观看

分享

貫通Spark Streaming流計算框架的運行源碼

 陳永正的圖書館 2017-07-03

本章節(jié)內(nèi)容:

一、在線動態(tài)計算分類最熱門商品案例回顧

二、基于案例貫通Spark Streaming的運行源碼

先看代碼(源碼場景:用戶、用戶的商品、商品的點擊量排名,按商品、其點擊量排名前三):

package com.dt.spark.sparkstreaming

import org.apache.spark.SparkConf

import org.apache.spark.sql.Row

import org.apache.spark.sql.hive.HiveContext

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

import org.apache.spark.streaming.{Seconds, StreamingContext}

/**

  * 使用Spark Streaming+Spark SQL來在線動態(tài)計算電商中不同類別中最熱門的商品排名,例如手機這個類別下面最熱門的三種手機、電視這個類別

  * 下最熱門的三種電視,該實例在實際生產(chǎn)環(huán)境下具有非常重大的意義;

  * @author DT大數(shù)據(jù)夢工廠

  * 新浪微博:http://weibo.com/ilovepains/

  *   實現(xiàn)技術(shù):Spark Streaming+Spark SQL,之所以Spark Streaming能夠使用ML、sql、graphx等功能是因為有foreachRDD和Transform

  * 等接口,這些接口中其實是基于RDD進行操作,所以以RDD為基石,就可以直接使用Spark其它所有的功能,就像直接調(diào)用API一樣簡單。

  *  假設(shè)說這里的數(shù)據(jù)的格式:user item category,例如Rocky Samsung Android

  */

object OnlineTheTop3ItemForEachCategory2DB {

  def main(args: Array[String]){

    /**

      * 第1步:創(chuàng)建Spark的配置對象SparkConf,設(shè)置Spark程序的運行時的配置信息,

      * 例如說通過setMaster來設(shè)置程序要鏈接的Spark集群的Master的URL,如果設(shè)置

      * 為local,則代表Spark程序在本地運行,特別適合于機器配置條件非常差(例如

      * 只有1G的內(nèi)存)的初學(xué)者

      */

    val conf = new SparkConf() //創(chuàng)建SparkConf對象

    conf.setAppName("OnlineTheTop3ItemForEachCategory2DB") //設(shè)置應(yīng)用程序的名稱,在程序運行的監(jiān)控界面可以看到名稱

  //conf.setMaster("spark://Master:7077") //此時,程序在Spark集群

    conf.setMaster("local[6]")

    //設(shè)置batchDuration時間間隔來控制Job生成的頻率并且創(chuàng)建Spark Streaming執(zhí)行的入口

    val ssc = new StreamingContext(conf, Seconds(5))

    ssc.checkpoint("/root/Documents/SparkApps/checkpoint")

    val userClickLogsDStream = ssc.socketTextStream("Master", 9999)

    val formattedUserClickLogsDStream = userClickLogsDStream.map(clickLog =>

        (clickLog.split(" ")(2) + "_" + clickLog.split(" ")(1), 1))

    val categoryUserClickLogsDStream =

//窗口的總長度是60秒,每隔20秒滑動一次,應(yīng)該在過去60秒的總長度加上新的20秒,在新的結(jié)果基礎(chǔ)上減去20秒

formattedUserClickLogsDStream.reduceByKeyAndWindow(_+_,_-_, Seconds(60), Seconds(20))

    categoryUserClickLogsDStream.foreachRDD { rdd => {

      if (rdd.isEmpty()) {//判斷RDD是否為空

        println("No data inputted!!!")

      } else {

        val categoryItemRow = rdd.map(reducedItem => {

          val category = reducedItem._1.split("_")(0)

          val item = reducedItem._1.split("_")(1)

          val click_count = reducedItem._2

          Row(category, item, click_count)

        })

        val structType = StructType(Array(

          StructField("category", StringType, true),

          StructField("item", StringType, true),

          StructField("click_count", IntegerType, true)

        ))

        //生產(chǎn)環(huán)境下注意用hiveContext,其繼承了SparkContext所有功能

        val hiveContext = new HiveContext(rdd.context)

        val categoryItemDF = hiveContext.createDataFrame(categoryItemRow, structType)

        categoryItemDF.registerTempTable("categoryItemTable")

        val reseltDataFram = hiveContext.sql("SELECT category,item,click_count FROM (SELECT category,item,click_count,row_number()" +

          " OVER (PARTITION BY category ORDER BY click_count DESC) rank FROM categoryItemTable) subquery " +

          " WHERE rank <= 3")

        reseltDataFram.show()

        val resultRowRDD = reseltDataFram.rdd

        resultRowRDD.foreachPartition { partitionOfRecords => {

          if (partitionOfRecords.isEmpty){

            println("This RDD is not null but partition is null")

          } else {

            val connection = ConnectionPool.getConnection()  // ConnectionPool is a static, lazily initialized pool of connections

            partitionOfRecords.foreach(record => {

              val sql = "insert into categorytop3(category,item,client_count) values('" + record.getAs("category") + "','" +

                record.getAs("item") + "'," + record.getAs("click_count") + ")"

              val stmt = connection.createStatement();

              stmt.executeUpdate(sql);

            })

            ConnectionPool.returnConnection(connection) // return to the pool for future reuse

          }

        }

        }

      }

    }

    }

    /**

      * 在StreamingContext調(diào)用start方法的內(nèi)部其實是會啟動JobScheduler的Start方法,進行消息循環(huán),在JobScheduler

      * 的start內(nèi)部會構(gòu)造JobGenerator和ReceiverTacker,并且調(diào)用JobGenerator和ReceiverTacker的start方法:

      *   1,JobGenerator啟動后會不斷的根據(jù)batchDuration生成一個個的Job

      *   2,ReceiverTracker啟動后首先在Spark Cluster中啟動Receiver(其實是在Executor中先啟動ReceiverSupervisor),在Receiver收到

      *   數(shù)據(jù)后會通過ReceiverSupervisor存儲到Executor并且把數(shù)據(jù)的Metadata信息發(fā)送給Driver中的ReceiverTracker,在ReceiverTracker

      *   內(nèi)部會通過ReceivedBlockTracker來管理接受到的元數(shù)據(jù)信息

      * 每個BatchInterval會產(chǎn)生一個具體的Job,其實這里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDD

      * 的DAG而已,從Java角度講,相當于Runnable接口實例,此時要想運行Job需要提交給JobScheduler,在JobScheduler中通過線程池的方式找到一個

      * 單獨的線程來提交Job到集群運行(其實是在線程中基于RDD的Action觸發(fā)真正的作業(yè)的運行),為什么使用線程池呢?

      *   1,作業(yè)不斷生成,所以為了提升效率,我們需要線程池;這和在Executor中通過線程池執(zhí)行Task有異曲同工之妙;

      *   2,有可能設(shè)置了Job的FAIR公平調(diào)度的方式,這個時候也需要多線程的支持;

      */

    ssc.start()

    ssc.awaitTermination()

  }

}

演示源代碼效果

啟動Hive服務(wù):

啟動應(yīng)用程序:

 webui控制臺監(jiān)控運行情況:

如果報錯的話是數(shù)據(jù)源沒有鏈接上,需要啟動nc –lk 9999即可:

查看運行結(jié)果:

以local模式運行的話刷新local的History-server webui界面:

Local的模式,所有任務(wù)都在driver上運行:

使用了reduceByKeyAndWindow窗口滑動操作,需要對窗口輸出結(jié)果聚合:

源碼分析

  1. 創(chuàng)建sparkConf的配置。
  2. 創(chuàng)建實例StreamingContext

進入StreamingContext內(nèi)部構(gòu)造函數(shù)需要傳入master、appname、batchDuration等參數(shù);關(guān)鍵點createNewSparkContext對象,這說明StreamingContext構(gòu)建是通過再其內(nèi)部會構(gòu)建SparkContext,通過sparkConf構(gòu)建SparkConcoxt,進一步說明SparkStreaming是SparkCore上的一個應(yīng)用程序。用戶編寫SparkStreaming應(yīng)用程序在StreamingContext內(nèi)部會構(gòu)建SparkConcoxt。

看下val userClickLogsDStream = ssc.socketTextStream("Master", 9999)源碼,創(chuàng)建一個DStream輸入流即socketinputDSteam:

有了DStream后可以基于Receiver接收數(shù)據(jù),Receiver內(nèi)部的onStart方法開啟run線程來調(diào)用Receiver方法連接上socket:

在Receiver方法中new Socket對象,并根據(jù)應(yīng)用程序業(yè)務(wù)代碼指定的host、port進行接收inputStream輸入數(shù)據(jù),將接收到的數(shù)據(jù)不斷一條一條循環(huán)進行Store:

再看業(yè)務(wù)邏輯代碼的一系列transformation,從SparkStreaming框架角度來講不是很重要,因為這些代碼是你自己的業(yè)務(wù)邏輯,不是框架代碼,在研究源碼時暫時放在一邊:

接下來很重要的一步就是SparkStreaming的start方法,看下這個方法的描述:

/**

      * 在StreamingContext調(diào)用start方法的內(nèi)部其實是會啟動JobScheduler的Start方法,進行消息循環(huán),在JobScheduler

      * 的start內(nèi)部會構(gòu)造JobGenerator和ReceiverTacker,并且調(diào)用JobGenerator和ReceiverTacker的start方法:

      *   1,JobGenerator啟動后會不斷的根據(jù)batchDuration生成一個個的Job

      *   2,ReceiverTracker啟動后首先在Spark Cluster中啟動Receiver(其實是在Executor中先啟動ReceiverSupervisor,然后通過ReceiverSupervisor啟動Receiver),在Receiver收到

      *   數(shù)據(jù)后會通過ReceiverSupervisor存儲到Executor并且把數(shù)據(jù)的Metadata信息發(fā)送給Driver中的ReceiverTracker,在ReceiverTracker

      *   內(nèi)部會通過ReceivedBlockTracker來管理接受到的元數(shù)據(jù)信息

      * 每個BatchInterval會產(chǎn)生一個具體的Job,其實這里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDD

      * 的DAG而已,從Java角度講,相當于Runnable接口實例,此時要想運行Job需要提交給JobScheduler,在JobScheduler中通過線程池的方式找到一個

      * 單獨的線程來提交Job到集群運行(其實是在線程中基于RDD的Action觸發(fā)真正的作業(yè)的運行),為什么使用線程池呢?

      *   1,作業(yè)不斷生成,所以為了提升效率,我們需要線程池;這和在Executor中通過線程池執(zhí)行Task有異曲同工之妙;

      *   2,有可能設(shè)置了Job的FAIR公平調(diào)度的方式,這個時候也需要多線程的支持;

    */

我們再深入看看InputDStream源碼,業(yè)務(wù)代碼通過StreamingContext對象的socketTextStream方法創(chuàng)建了SocketStream對象:

通過SocketStream對象創(chuàng)建了SocketInputDStream對象,返回的是ReceiverInputDStream類型:

進入ReceiverInputDStream抽象類,其繼承了InputDStream類:

在進入InputDStream抽象類,其繼續(xù)繼承了DStream(其實就是RDD的模板):

進入DStream抽象類,其只是在RDD的基礎(chǔ)上進行簡單的封裝,就是RDD的模板而已,DStream有以下特征:

  1. DStream依賴其他的DStreams
  2. 依賴DStream模板構(gòu)建出RDD之間的依賴
  3. 基于DStream它會有一個function,function被用來基于batchDuration或batchIntval時間片生成RDD,這個和定時器有關(guān)系
  4. DStream是RDD的模板,DStream上的很多操作轉(zhuǎn)過來其實是對RDD的操作

看下DStream的繼承結(jié)構(gòu)如下(剛才我們看到的InputDStream也在繼承的里面):

InputDStream注釋:所有DStream的子類,其提供start和stop方法,當SparkStreaming系統(tǒng)啟動和停止接收數(shù)據(jù)的時候會被調(diào)用,剛才看到具體子類SocketTextStream里面具體的start和stop方法提供其回調(diào):

InputDStream對應(yīng)的子類的start和stop方法被ReceiverSupervisor的start和stop方法回調(diào)。

繼續(xù)看ReceiverInputDStream類的子類,其中有SocketInputDStream類,此類只實現(xiàn)了getReceiver方法:

在看看DStream整個繼承結(jié)構(gòu)中,發(fā)現(xiàn)有一個ForeachDStream類,在ForeachDStream中會產(chǎn)生作業(yè),里面新建了一個job,DStream級別的action操作都會有foreachDStream的產(chǎn)生,會被的generatorJob調(diào)用,當然新建的job會產(chǎn)生基于function操作,實際上是你自己寫的業(yè)務(wù)邏輯function:

ForeachDStream類中關(guān)鍵代碼generatorRDD根據(jù)時間間隔(batchInterval或batchDuration)生成相應(yīng)的RDD和RDD的操作,我們對DStream的操作反過來作用于RDD的操作,DStream的本質(zhì)就是按照時間的序列存儲的一系列數(shù)據(jù)流,因為RDD中存儲的就是batchDuration時間片的數(shù)據(jù):

如果要擴展SparkStreaming源碼,注意和SparkStreaming包名一致才可以訪問其源碼。

繼續(xù)看代碼,看到getOrCompute方法,因為其作用于DStream,這個方法很關(guān)鍵,根據(jù)指定的時間獲得RDD,如果緩存中有的話可根據(jù)指定時間從緩存中獲取數(shù)據(jù):

DStream類的getOrCompute的操作是生成RDD,其實DStream是rdd的模板,對DStream的getOrCompute操作實際上觸發(fā)運行g(shù)eneratorRDDs,并將生成的RDD放在HashMap中:

所以通過socketTextStream代碼,最終獲得輸入流socketInputDStream的Receiver 在集群上抓取數(shù)據(jù):

通過socketReceiver從網(wǎng)絡(luò)上獲取流數(shù)據(jù),這樣就將網(wǎng)絡(luò)上的離散流數(shù)據(jù)以HashMap的方式存儲起來了:

源碼說的非常清楚:

  假如說一萬年的話每秒一個單位,這就是離散流,用每秒對一萬年進行切分,切成一片一片,切開后就叫離散,離散之間沒什么關(guān)系。

SparkStreaming整體架構(gòu)圖初見:

Spark發(fā)行版筆記5

新浪微博:http://weibo.com/ilovepains

微信公眾號:DT_Spark

博客:http://blog.sina.com.cn/ilovepains

手機:18610086859

QQ:1740415547

郵箱:18610086859@vip.126.com

    本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊一鍵舉報。
    轉(zhuǎn)藏 分享 獻花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多