小男孩‘自慰网亚洲一区二区,亚洲一级在线播放毛片,亚洲中文字幕av每天更新,黄aⅴ永久免费无码,91成人午夜在线精品,色网站免费在线观看,亚洲欧洲wwwww在线观看

分享

Spark日志分析項目Demo(8)

 青葉i生活 2018-01-25
2017-09-09 17:35 685人閱讀 評論(0) 收藏 舉報
分類:

廣告點擊統(tǒng)計需求:
(1)對接kafka,獲得數(shù)據(jù)
(2)發(fā)現(xiàn)某個用戶某天對某個廣告的點擊量已經(jīng)大于等于100,寫入黑名單,進行過濾
(3)計算廣告點擊流量實時統(tǒng)計結(jié)果
(4)實時統(tǒng)計每天每個省份top3熱門廣告
(5)實時統(tǒng)計每天每個廣告在最近1小時的滑動窗口內(nèi)的點擊趨勢(每分鐘的點擊量)

主流程代碼

public static void main(String[] args) {
        // 構(gòu)建Spark Streaming上下文
        SparkConf conf = new SparkConf()       
                .setMaster("local[2]")
                .setAppName("AdClickRealTimeStatSpark");
.set("spark.streaming.receiver.writeAheadLog.enable", "true");   

        // spark streaming的上下文是構(gòu)建JavaStreamingContext對象
        // 而不是像之前的JavaSparkContext、SQLContext/HiveContext
        // 傳入的第一個參數(shù),和之前的spark上下文一樣,也是SparkConf對象;第二個參數(shù)則不太一樣

        // 第二個參數(shù)是spark streaming類型作業(yè)比較有特色的一個參數(shù)
        // 實時處理batch的interval
        // spark streaming,每隔一小段時間,會去收集一次數(shù)據(jù)源(kafka)中的數(shù)據(jù),做成一個batch
        // 每次都是處理一個batch中的數(shù)據(jù)

        // 通常來說,batch interval,就是指每隔多少時間收集一次數(shù)據(jù)源中的數(shù)據(jù),然后進行處理
        // 一遍spark streaming的應(yīng)用,都是設(shè)置數(shù)秒到數(shù)十秒(很少會超過1分鐘)

        // 咱們這里項目中,就設(shè)置5秒鐘的batch interval
        // 每隔5秒鐘,咱們的spark streaming作業(yè)就會收集最近5秒內(nèi)的數(shù)據(jù)源接收過來的數(shù)據(jù)
        JavaStreamingContext jssc = new JavaStreamingContext(
                conf, Durations.seconds(5));  
        jssc.checkpoint("hdfs://192.168.1.105:9000/streaming_checkpoint");

        // 正式開始進行代碼的編寫
        // 實現(xiàn)咱們需要的實時計算的業(yè)務(wù)邏輯和功能

        // 創(chuàng)建針對Kafka數(shù)據(jù)來源的輸入DStream(離線流,代表了一個源源不斷的數(shù)據(jù)來源,抽象)
        // 選用kafka direct api(很多好處,包括自己內(nèi)部自適應(yīng)調(diào)整每次接收數(shù)據(jù)量的特性,等等) 

        // 構(gòu)建kafka參數(shù)map
        // 主要要放置的就是,你要連接的kafka集群的地址(broker集群的地址列表)
        Map<String, String> kafkaParams = new HashMap<String, String>();
        kafkaParams.put("metadata.broker.list",   
                ConfigurationManager.getProperty(Constants.KAFKA_METADATA_BROKER_LIST));

        // 構(gòu)建topic set
        String kafkaTopics = ConfigurationManager.getProperty(Constants.KAFKA_TOPICS);
        String[] kafkaTopicsSplited = kafkaTopics.split(",");  

        Set<String> topics = new HashSet<String>();
        for(String kafkaTopic : kafkaTopicsSplited) {
            topics.add(kafkaTopic);
        }

        // 基于kafka direct api模式,構(gòu)建出了針對kafka集群中指定topic的輸入DStream
        // 兩個值,val1,val2;val1沒有什么特殊的意義;val2中包含了kafka topic中的一條一條的實時日志數(shù)據(jù)
        JavaPairInputDStream<String, String> adRealTimeLogDStream = KafkaUtils.createDirectStream(
                jssc, 
                String.class, 
                String.class, 
                StringDecoder.class, 
                StringDecoder.class, 
                kafkaParams, 
                topics);

//      adRealTimeLogDStream.repartition(1000);

        // 根據(jù)動態(tài)黑名單進行數(shù)據(jù)過濾
        JavaPairDStream<String, String> filteredAdRealTimeLogDStream = 
                filterByBlacklist(adRealTimeLogDStream);

        // 生成動態(tài)黑名單
        generateDynamicBlacklist(filteredAdRealTimeLogDStream);

        // 業(yè)務(wù)功能一:計算廣告點擊流量實時統(tǒng)計結(jié)果(yyyyMMdd_province_city_adid,clickCount) 
        // 最粗
        JavaPairDStream<String, Long> adRealTimeStatDStream = calculateRealTimeStat(
                filteredAdRealTimeLogDStream);

        // 業(yè)務(wù)功能二:實時統(tǒng)計每天每個省份top3熱門廣告
        // 統(tǒng)計的稍微細(xì)一些了
        calculateProvinceTop3Ad(adRealTimeStatDStream);  

        // 業(yè)務(wù)功能三:實時統(tǒng)計每天每個廣告在最近1小時的滑動窗口內(nèi)的點擊趨勢(每分鐘的點擊量)
        // 統(tǒng)計的非常細(xì)了
        // 我們每次都可以看到每個廣告,最近一小時內(nèi),每分鐘的點擊量
        // 每支廣告的點擊趨勢
        calculateAdClickCountByWindow(adRealTimeLogDStream);  

        // 構(gòu)建完spark streaming上下文之后,記得要進行上下文的啟動、等待執(zhí)行結(jié)束、關(guān)閉
        jssc.start();
        jssc.awaitTermination();
        jssc.close();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86

計算廣告點擊流量實時統(tǒng)計結(jié)果

/**
     * 計算廣告點擊流量實時統(tǒng)計
     * @param filteredAdRealTimeLogDStream
     * @return
     */
    private static JavaPairDStream<String, Long> calculateRealTimeStat(
            JavaPairDStream<String, String> filteredAdRealTimeLogDStream) {
        // 業(yè)務(wù)邏輯一
        // 廣告點擊流量實時統(tǒng)計
        // 上面的黑名單實際上是廣告類的實時系統(tǒng)中,比較常見的一種基礎(chǔ)的應(yīng)用
        // 實際上,我們要實現(xiàn)的業(yè)務(wù)功能,不是黑名單

        // 計算每天各省各城市各廣告的點擊量
        // 這份數(shù)據(jù),實時不斷地更新到mysql中的,J2EE系統(tǒng),是提供實時報表給用戶查看的
        // j2ee系統(tǒng)每隔幾秒鐘,就從mysql中摟一次最新數(shù)據(jù),每次都可能不一樣
        // 設(shè)計出來幾個維度:日期、省份、城市、廣告
        // j2ee系統(tǒng)就可以非常的靈活
        // 用戶可以看到,實時的數(shù)據(jù),比如2015-11-01,歷史數(shù)據(jù)
        // 2015-12-01,當(dāng)天,可以看到當(dāng)天所有的實時數(shù)據(jù)(動態(tài)改變),比如江蘇省南京市
        // 廣告可以進行選擇(廣告主、廣告名稱、廣告類型來篩選一個出來)
        // 拿著date、province、city、adid,去mysql中查詢最新的數(shù)據(jù)
        // 等等,基于這幾個維度,以及這份動態(tài)改變的數(shù)據(jù),是可以實現(xiàn)比較靈活的廣告點擊流量查看的功能的

        // date province city userid adid
        // date_province_city_adid,作為key;1作為value
        // 通過spark,直接統(tǒng)計出來全局的點擊次數(shù),在spark集群中保留一份;在mysql中,也保留一份
        // 我們要對原始數(shù)據(jù)進行map,映射成<date_province_city_adid,1>格式
        // 然后呢,對上述格式的數(shù)據(jù),執(zhí)行updateStateByKey算子
        // spark streaming特有的一種算子,在spark集群內(nèi)存中,維護一份key的全局狀態(tài)
        JavaPairDStream<String, Long> mappedDStream = filteredAdRealTimeLogDStream.mapToPair(

                new PairFunction<Tuple2<String,String>, String, Long>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, Long> call(Tuple2<String, String> tuple)
                            throws Exception {
                        String log = tuple._2;
                        String[] logSplited = log.split(" "); 

                        String timestamp = logSplited[0];
                        Date date = new Date(Long.valueOf(timestamp));
                        String datekey = DateUtils.formatDateKey(date); // yyyyMMdd

                        String province = logSplited[1];
                        String city = logSplited[2];
                        long adid = Long.valueOf(logSplited[4]);

                        String key = datekey + "_" + province + "_" + city + "_" + adid;

                        return new Tuple2<String, Long>(key, 1L);  
                    }

                });

        // 在這個dstream中,就相當(dāng)于,有每個batch rdd累加的各個key(各天各省份各城市各廣告的點擊次數(shù))
        // 每次計算出最新的值,就在aggregatedDStream中的每個batch rdd中反應(yīng)出來
        JavaPairDStream<String, Long> aggregatedDStream = mappedDStream.updateStateByKey(

                new Function2<List<Long>, Optional<Long>, Optional<Long>>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Optional<Long> call(List<Long> values, Optional<Long> optional)
                            throws Exception {
                        // 舉例來說
                        // 對于每個key,都會調(diào)用一次這個方法
                        // 比如key是<20151201_Jiangsu_Nanjing_10001,1>,就會來調(diào)用一次這個方法7
                        // 10個

                        // values,(1,1,1,1,1,1,1,1,1,1)

                        // 首先根據(jù)optional判斷,之前這個key,是否有對應(yīng)的狀態(tài)
                        long clickCount = 0L;

                        // 如果說,之前是存在這個狀態(tài)的,那么就以之前的狀態(tài)作為起點,進行值的累加
                        if(optional.isPresent()) {
                            clickCount = optional.get();
                        }

                        // values,代表了,batch rdd中,每個key對應(yīng)的所有的值
                        for(Long value : values) {
                            clickCount += value;
                        }

                        return Optional.of(clickCount);  
                    }

                });

        // 將計算出來的最新結(jié)果,同步一份到mysql中,以便于j2ee系統(tǒng)使用
        aggregatedDStream.foreachRDD(new Function<JavaPairRDD<String,Long>, Void>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Void call(JavaPairRDD<String, Long> rdd) throws Exception {

                rdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String,Long>>>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public void call(Iterator<Tuple2<String, Long>> iterator)
                            throws Exception {
                        List<AdStat> adStats = new ArrayList<AdStat>();

                        while(iterator.hasNext()) {
                            Tuple2<String, Long> tuple = iterator.next();

                            String[] keySplited = tuple._1.split("_");
                            String date = keySplited[0];
                            String province = keySplited[1];
                            String city = keySplited[2];
                            long adid = Long.valueOf(keySplited[3]);  

                            long clickCount = tuple._2;

                            AdStat adStat = new AdStat();
                            adStat.setDate(date); 
                            adStat.setProvince(province);  
                            adStat.setCity(city);  
                            adStat.setAdid(adid); 
                            adStat.setClickCount(clickCount);  

                            adStats.add(adStat);
                        }

                        IAdStatDAO adStatDAO = DAOFactory.getAdStatDAO();
                        adStatDAO.updateBatch(adStats);  
                    }

                });

                return null;
            }

        });

        return aggregatedDStream;
    }

    本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊一鍵舉報。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多