小男孩‘自慰网亚洲一区二区,亚洲一级在线播放毛片,亚洲中文字幕av每天更新,黄aⅴ永久免费无码,91成人午夜在线精品,色网站免费在线观看,亚洲欧洲wwwww在线观看

分享

大數(shù)據(jù)IMF傳奇行動(dòng)絕密課程第100

 看風(fēng)景D人 2019-02-24

使用Spark Streaming+Spark SQL+Kafka+FileSystem綜合案例

1、項(xiàng)目分析流程圖
2、項(xiàng)目代碼實(shí)戰(zhàn)

圖100-1 SparkStreaming案例分析架構(gòu)圖

Flume sink到Kafka需要一個(gè)jar包支持
https://github.com/beyondj2ee/flumeng-kafka-plugin/tree/master/flumeng-kafka-plugin

編輯flume-conf.properties

#配置sink
agent1.sinks.sink1.type=org.apache.flume.plugins.KafkaSink
agent1.sinks.sink1.metadata.broker.list=Master:9092,Worker1:9092,Worker2.9092
agent1.sinks.sink1.partition.key=0
agent1.sinks.sink1.partitioner.class=org.apache.flume.plugins.SinglePartition
agent1.sinks.sink1.serializer.class=kafka.serializer.StringEncoder
agent1.sinks.sink1.request.requiredacks=0
agent1.sinks.sink1.max.message.size=1000000
agent1.sinks.sink1.producer.type=sync
agent1.sinks.sink1.custom.encoding=UTF-8
agent1.sinks.sink1.custom.topic.name=HelloKafka
agent1.sinks.sink1.channel= channel1

Kafka也可以監(jiān)控文件夾,但為什么要用Flume?Kafka只能接收json格式的文件
數(shù)據(jù)來(lái)源?
互聯(lián)網(wǎng):電商、社交網(wǎng)絡(luò)等的網(wǎng)站和App程序
傳統(tǒng)行業(yè):金融、電信、醫(yī)療、農(nóng)業(yè)、生產(chǎn)制造行業(yè);
例如說(shuō):在京東上進(jìn)行廣告的推送,當(dāng)我們點(diǎn)擊廣告的時(shí)候,此時(shí)肯定有日志記錄Log發(fā)送回到Server中,或者說(shuō)我們使用Android,iOS等中的App,都會(huì)設(shè)置有數(shù)據(jù)記錄的關(guān)鍵點(diǎn)(埋點(diǎn))
如果是網(wǎng)站,經(jīng)典的方式是通過(guò)JS透過(guò)Ajax把日志穿回到服務(wù)器上,如果是移動(dòng)App等一般是通過(guò)Socket,其他的傳感器或者工業(yè)設(shè)備可以通過(guò)自己的通信協(xié)議把數(shù)據(jù)傳回到服務(wù)器端

為了應(yīng)對(duì)高并發(fā)訪問(wèn),一般采用Nginx等作為Server前段,Server的分布式集群來(lái)做負(fù)載均衡

Tomcat、Apache、WebLogic作為Server后端

Server中接收到請(qǐng)求路由后一般都會(huì)對(duì)每個(gè)請(qǐng)求在文件中寫一條Log

Logs Cluster可以專門設(shè)置日志服務(wù)器集群,所有的Server和J2EE類型的業(yè)務(wù)邏輯在執(zhí)行過(guò)程中產(chǎn)生的日志信息都可以在后臺(tái)同步到日志服務(wù)器集群中

Server中接收到請(qǐng)求路由后一般都會(huì)對(duì)每個(gè)請(qǐng)求在文件中寫一條Log,可以自動(dòng)配置Server寫日志

企業(yè)中一般都會(huì)有Crontab等定時(shí)工具來(lái)通過(guò)日志整理工具來(lái)把當(dāng)天的日志采集、合并和初步的處理形成一份日志文件,然后發(fā)送到Flume監(jiān)控目錄中

當(dāng)Flume發(fā)現(xiàn)有新的日志文件進(jìn)來(lái)的時(shí)候會(huì)按照配置把數(shù)據(jù)通過(guò)Channel來(lái)Sink到目的地,這里是Sink到Kafka集群中

HDFS:
1、使用MapReduce作業(yè)對(duì)數(shù)據(jù)進(jìn)行出不清洗,并寫入新的HDFS文件中。
2、清洗后的數(shù)據(jù)一般導(dǎo)入到Hive數(shù)據(jù)倉(cāng)庫(kù)中,可以采用分區(qū)表
3、通過(guò)Hive中的SQL,在數(shù)據(jù)倉(cāng)庫(kù)的基礎(chǔ)上,進(jìn)行ETL,此時(shí)的ETL會(huì)把原始的數(shù)據(jù)生成很多張目標(biāo)的table

企業(yè)生產(chǎn)環(huán)境下,Spark數(shù)據(jù)都是來(lái)自Hive

一個(gè)小例子

package com.tom.spark.sparkstreaming

import org.apache.commons.codec.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Durations, Seconds, StreamingContext}

/**
  * 使用Scala開(kāi)發(fā)集群運(yùn)行的Spark來(lái)實(shí)現(xiàn)在線熱搜詞
  */
case class MessageItem(name: String, age: Int)

object SparkStreamingFromKafkaFlume2Hive {
  def main(args: Array[String]): Unit = {

    if(args.length < 2) {
      System.err.println("Please input your kafka broker list and topics to consume")
      System.exit(1)
    }
    val conf = new SparkConf().setAppName("SparkStreamingFromKafkaFlume2Hive").setMaster("local[2]")

    val ssc = new StreamingContext(conf, Durations.seconds(5))

    val Array(brokers, topics) = args
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val topicsParams = topics.split(",").toSet

    KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsParams)
      .map(_._2.split(",")).foreachRDD(rdd => {
      val hiveContext = new HiveContext(rdd.sparkContext)

      import hiveContext.implicits._
      rdd.map(record => MessageItem(record(0).trim,record(1).trim.toInt)).toDF().registerTempTable("temp")
      hiveContext.sql("SELECT count(*) FROM temp").show()
    })

    // Flume會(huì)作為Kafka的Producer把數(shù)據(jù)寫入到Kafka供本程序消費(fèi)
    ssc.start()
    ssc.awaitTermination()
  }
}

    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購(gòu)買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶 評(píng)論公約

    類似文章 更多