小男孩‘自慰网亚洲一区二区,亚洲一级在线播放毛片,亚洲中文字幕av每天更新,黄aⅴ永久免费无码,91成人午夜在线精品,色网站免费在线观看,亚洲欧洲wwwww在线观看

分享

大數(shù)據(jù)IMF傳奇行動絕密課程第104

 看風景D人 2019-02-24

需求分析和技術架構

廣告點擊系統(tǒng)實時分析
廣告來自于廣告或者移動App等,廣告需要設定在具體的廣告位,當用戶點擊廣告的時候,一般都會通過ajax或Socket往后臺發(fā)送日志數(shù)據(jù),在這里我們是要做基于SparkStreaming做實時在線統(tǒng)計。那么數(shù)據(jù)就需要放進消息系統(tǒng)(Kafka)中,我們的Spark Streaming應用程序就會去Kafka中Pull數(shù)據(jù)過來進行計算和消費,并把計算后的數(shù)據(jù)放入到持久化系統(tǒng)中(MySQL)
廣告點擊系統(tǒng)實時分析的意義:因為可以在線實時的看見廣告的投放效果,就為廣告的更大規(guī)模的投入和調整打下了堅實的基礎,從而為公司帶來最大化的經濟回報。
核心需求:
1、實時黑名單動態(tài)過濾出有效的用戶廣告點擊行為:因為黑名單用戶可能隨時出現(xiàn),所以需要動態(tài)更新;
2、在線計算廣告點擊流量;
3、Top3熱門廣告;
4、每個廣告流量趨勢;
5、廣告點擊用戶的區(qū)域分布分析
6、最近一分鐘的廣告點擊量;
7、整個廣告點擊Spark Streaming處理程序7*24小時運行;

數(shù)據(jù)格式:
時間、用戶、廣告、城市等

技術細節(jié):
在線計算用戶點擊的次數(shù)分析,屏蔽IP等;
使用updateStateByKey或者mapWithState進行不同地區(qū)廣告點擊排名的計算;
Spark Streaming+Spark SQL+Spark Core等綜合分析數(shù)據(jù);
使用Window類型的操作;
高可用和性能調優(yōu)等等;
流量趨勢,一般會結合DB等;
Spark Core

圖109-1 Spark高效操作數(shù)據(jù)庫

/**
 * 
 */
package com.tom.spark.SparkApps.sparkstreaming;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Random;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

/**
 * 數(shù)據(jù)生成代碼,Kafka Producer產生數(shù)據(jù)
 */
public class MockAdClickedStat {

    /**
     * @param args
     */
    public static void main(String[] args) {
        final Random random = new Random();
        final String[] provinces = new String[]{"Guangdong", "Zhejiang", "Jiangsu", "Fujian"};
        final Map<String, String[]> cities = new HashMap<String, String[]>();
        cities.put("Guangdong", new String[]{"Guangzhou", "Shenzhen", "Dongguan"});
        cities.put("Zhejiang", new String[]{"Hangzhou", "Wenzhou", "Ningbo"});
        cities.put("Jiangsu", new String[]{"Nanjing", "Suzhou", "Wuxi"});
        cities.put("Fujian", new String[]{"Fuzhou", "Xiamen", "Sanming"});

        final String[] ips = new String[] {
                "192.168.112.240",
                "192.168.112.239",
                "192.168.112.245",
                "192.168.112.246",
                "192.168.112.247",
                "192.168.112.248",
                "192.168.112.249",
                "192.168.112.250",
                "192.168.112.251",
                "192.168.112.252",
                "192.168.112.253",
                "192.168.112.254",
        };
        /**
         * Kafka相關的基本配置信息
         */
        Properties kafkaConf = new Properties();
        kafkaConf.put("serializer.class", "kafka.serializer.StringEncoder");
        kafkaConf.put("metadeta.broker.list", "Master:9092,Worker1:9092,Worker2:9092");
        ProducerConfig producerConfig = new ProducerConfig(kafkaConf);
        final Producer<Integer, String> producer = new Producer<Integer, String>(producerConfig);
        new Thread(new Runnable() {

            public void run() {
                while(true) {
                    //在線處理廣告點擊流的基本數(shù)據(jù)格式:timestamp、ip、userID、adID、province、city
                    Long timestamp = new Date().getTime();
                    String ip = ips[random.nextInt(12)]; //可以采用網(wǎng)絡上免費提供的ip庫
                    int userID = random.nextInt(10000);
                    int adID = random.nextInt(100);
                    String province = provinces[random.nextInt(4)];
                    String city = cities.get(province)[random.nextInt(3)];
                    String clickedAd = timestamp + "\t" + ip + "\t" + userID + "\t" + adID + "\t" + province + "\t" + city;
                    producer.send(new KeyedMessage<Integer, String>("AdClicked", clickedAd));

                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            }
        }).start();
    }

}
package com.tom.spark.SparkApps.sparkstreaming;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;

import kafka.serializer.StringDecoder;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
import org.apache.spark.streaming.kafka.KafkaUtils;

import com.google.common.base.Optional;

import scala.Tuple2;

/**
  * 數(shù)據(jù)處理,Kafka消費者
  */
public class AdClickedStreamingStats {

    /**
     * @param args
     */
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        //好處:1、checkpoint 2、工廠
        final SparkConf conf = new SparkConf().setAppName("SparkStreamingOnKafkaDirect").setMaster("hdfs://Master:7077/");
        final String checkpointDirectory = "hdfs://Master:9000/library/SparkStreaming/CheckPoint_Data";

        JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {

            public JavaStreamingContext create() {
                // TODO Auto-generated method stub
                return createContext(checkpointDirectory, conf);
            }

        };

        /**
         * 可以從失敗中恢復Driver,不過還需要指定Driver這個進程運行在Cluster,并且在提交應用程序的時候制定--supervise;
         */
        JavaStreamingContext javassc = JavaStreamingContext.getOrCreate(checkpointDirectory, factory);
        /**
         * 第三步:創(chuàng)建Spark Streaming輸入數(shù)據(jù)來源input Stream:
         * 1、數(shù)據(jù)輸入來源可以基于File、HDFS、Flume、Kafka、Socket等
         * 2、在這里我們指定數(shù)據(jù)來源于網(wǎng)絡Socket端口,Spark Streaming連接上該端口并在運行的時候一直監(jiān)聽該端口的數(shù)據(jù)
         *      (當然該端口服務首先必須存在),并且在后續(xù)會根據(jù)業(yè)務需要不斷有數(shù)據(jù)產生(當然對于Spark Streaming
         *      應用程序的運行而言,有無數(shù)據(jù)其處理流程都是一樣的)
         * 3、如果經常在每間隔5秒鐘沒有數(shù)據(jù)的話不斷啟動空的Job其實會造成調度資源的浪費,因為并沒有數(shù)據(jù)需要發(fā)生計算;所以
         *      實際的企業(yè)級生成環(huán)境的代碼在具體提交Job前會判斷是否有數(shù)據(jù),如果沒有的話就不再提交Job;
         */

        //創(chuàng)建Kafka元數(shù)據(jù)來讓Spark Streaming這個Kafka Consumer利用

        Map<String, String> kafkaParameters = new HashMap<String, String>();
        kafkaParameters.put("metadata.broker.list", "Master:9092,Worker1:9092,Worker2:9092");
        Set<String> topics = new HashSet<String>();
        topics.add("SparkStreamingDirected");

        JavaPairInputDStream<String, String> adClickedStreaming = KafkaUtils.createDirectStream(javassc, 
                String.class, String.class, 
                StringDecoder.class, StringDecoder.class,
                kafkaParameters, 
                topics);
        /**因為要對黑名單進行過濾,而數(shù)據(jù)是在RDD中的,所以必然使用transform這個函數(shù);
         * 但是在這里我們必須使用transformToPair,原因是讀取進來的Kafka的數(shù)據(jù)是Pair<String,String>類型,
         * 另一個原因是過濾后的數(shù)據(jù)要進行進一步處理,所以必須是讀進的Kafka數(shù)據(jù)的原始類型
         * 
         * 在此再次說明,每個Batch Duration中實際上講輸入的數(shù)據(jù)就是被一個且僅被一個RDD封裝的,你可以有多個
         * InputDStream,但其實在產生job的時候,這些不同的InputDStream在Batch Duration中就相當于Spark基于HDFS
         * 數(shù)據(jù)操作的不同文件來源而已罷了。
         */

        JavaPairDStream<String, String> filteredadClickedStreaming = adClickedStreaming.transformToPair(new Function<JavaPairRDD<String,String>, JavaPairRDD<String,String>>() {

            public JavaPairRDD<String, String> call(
                    JavaPairRDD<String, String> rdd) throws Exception {
                /**
                 * 在線黑名單過濾思路步驟:
                 * 1、從數(shù)據(jù)庫中獲取黑名單轉換成RDD,即新的RDD實例封裝黑名單數(shù)據(jù);
                 * 2、然后把代表黑名單的RDD的實例和Batch Duration產生的RDD進行Join操作,
                 * 準確的說是進行l(wèi)eftOuterJoin操作,也就是說使用Batch Duration產生的RDD和代表黑名單的RDD實例進行
                 * leftOuterJoin操作,如果兩者都有內容的話,就會是true,否則的話就是false
                 * 
                 * 我們要留下的是leftOuterJoin結果為false;
                 * 
                 */
                final List<String> blackListNames = new ArrayList<String>();
                JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();
                jdbcWrapper.doQuery("SELECT * FROM blacklisttable", null, new ExecuteCallBack() {

                    public void resultCallBack(ResultSet result) throws Exception {

                        while(result.next()){
                            blackListNames.add(result.getString(1));
                        }
                    }
                });
                List<Tuple2<String, Boolean>> blackListTuple = new ArrayList<Tuple2<String,Boolean>>();
                for(String name : blackListNames) {
                    blackListTuple.add(new Tuple2<String, Boolean>(name, true));
                }
                List<Tuple2<String, Boolean>> blacklistFromListDB = blackListTuple; //數(shù)據(jù)來自于查詢的黑名單表并且映射成為<String, Boolean>

                JavaSparkContext jsc = new JavaSparkContext(rdd.context());

                /**
                 * 黑名單的表中只有userID,但是如果要進行join操作的話就必須是Key-Value,所以在這里我們需要
                 * 基于數(shù)據(jù)表中的數(shù)據(jù)產生Key-Value類型的數(shù)據(jù)集合
                 */

                JavaPairRDD<String, Boolean> blackListRDD = jsc.parallelizePairs(blacklistFromListDB);

                /**
                 * 進行操作的時候肯定是基于userID進行join,所以必須把傳入的rdd進行mapToPair操作轉化成為符合格式的RDD
                 *
                 */
                JavaPairRDD<String, Tuple2<String, String>> rdd2Pair = rdd.mapToPair(new PairFunction<Tuple2<String,String>, String, Tuple2<String, String>>() {

                    public Tuple2<String, Tuple2<String, String>> call(
                            Tuple2<String, String> t) throws Exception {
                        // TODO Auto-generated method stub
                        String userID = t._2.split("\t")[2];

                        return new Tuple2<String, Tuple2<String,String>>(userID, t);
                    }
                });
                JavaPairRDD<String, Tuple2<Tuple2<String, String>, Optional<Boolean>>> joined = rdd2Pair.leftOuterJoin(blackListRDD);

                JavaPairRDD<String, String> result = joined.filter(new Function<Tuple2<String,Tuple2<Tuple2<String,String>,Optional<Boolean>>>, Boolean>() {

                    public Boolean call(
                            Tuple2<String, Tuple2<Tuple2<String, String>, Optional<Boolean>>> tuple)
                            throws Exception {
                        // TODO Auto-generated method stub
                        Optional<Boolean> optional = tuple._2._2;
                        if(optional.isPresent() && optional.get()){
                            return false;
                        } else {
                            return true;
                        }
                    }
                }).mapToPair(new PairFunction<Tuple2<String,Tuple2<Tuple2<String,String>,Optional<Boolean>>>, String, String>() {

                    public Tuple2<String, String> call(
                            Tuple2<String, Tuple2<Tuple2<String, String>, Optional<Boolean>>> t)
                            throws Exception {
                        // TODO Auto-generated method stub
                        return t._2._1;
                    }
                });
                return result;
            }
        });
        //廣告點擊的基本數(shù)據(jù)格式:timestamp、ip、userID、adID、province、city
        JavaPairDStream<String, Long> pairs = filteredadClickedStreaming.mapToPair(new PairFunction<Tuple2<String,String>, String, Long>() {

            public Tuple2<String, Long> call(Tuple2<String, String> t) throws Exception {
                 String[] splited=t._2.split("\t");
                 String timestamp = splited[0]; //YYYY-MM-DD
                 String ip = splited[1];
                 String userID = splited[2];
                 String adID = splited[3];
                 String province = splited[4];
                 String city = splited[5];      

                 String clickedRecord = timestamp + "_" +ip + "_"+userID+"_"+adID+"_"
                             +province +"_"+city;
                 return new Tuple2<String, Long>(clickedRecord, 1L);
            }
        });
     /**
     * 第4.3步:在單詞實例計數(shù)為1基礎上,統(tǒng)計每個單詞在文件中出現(xiàn)的總次數(shù)
     */
        JavaPairDStream<String, Long> adClickedUsers= pairs.reduceByKey(new Function2<Long, Long, Long>() {
              public Long call(Long i1, Long i2) throws Exception{
                return i1 + i2;
              }
            });
        /*判斷有效的點擊,復雜化的采用機器學習訓練模型進行在線過濾    簡單的根據(jù)ip判斷1天不超過100次;也可以通過一個batch duration的點擊次數(shù)
            判斷是否非法廣告點擊,通過一個batch來判斷是不完整的,還需要一天的數(shù)據(jù)也可以每一個小時來判斷。*/
        JavaPairDStream<String, Long> filterClickedBatch = adClickedUsers.filter(new Function<Tuple2<String,Long>, Boolean>() {

            public Boolean call(Tuple2<String, Long> v1) throws Exception {
            if (1 < v1._2){
                //更新一些黑名單的數(shù)據(jù)庫表
                return false;
            } else { 
                return true;
                    }

            }
        });

        //filterClickedBatch.print();
        //寫入數(shù)據(jù)庫
        filterClickedBatch.foreachRDD(new Function<JavaPairRDD<String,Long>, Void>() {

            public Void call(JavaPairRDD<String, Long> rdd) throws Exception {
                 rdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String,Long>>>() {

                    public void call(Iterator<Tuple2<String, Long>> partition) throws Exception {
                        //使用數(shù)據(jù)庫連接池的高效讀寫數(shù)據(jù)庫的方式將數(shù)據(jù)寫入數(shù)據(jù)庫mysql
                        //例如一次插入 1000條 records,使用insertBatch 或 updateBatch
                        //插入的用戶數(shù)據(jù)信息:userID,adID,clickedCount,time
                        //這里面有一個問題,可能出現(xiàn)兩條記錄的key是一樣的,此時需要更新累加操作
                        List<UserAdClicked> userAdClickedList = new ArrayList<UserAdClicked>();
                        while(partition.hasNext()) {
                            Tuple2<String, Long> record = partition.next();
                            String[] splited = record._1.split("\t");

                            UserAdClicked userClicked = new UserAdClicked();
                            userClicked.setTimestamp(splited[0]);
                            userClicked.setIp(splited[1]);
                            userClicked.setUserID(splited[2]);
                            userClicked.setAdID(splited[3]);
                            userClicked.setProvince(splited[4]);
                            userClicked.setCity(splited[5]);

                            userAdClickedList.add(userClicked);
                        }

                        final List<UserAdClicked> inserting = new ArrayList<UserAdClicked>();
                        final List<UserAdClicked> updating = new ArrayList<UserAdClicked>();

                        JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();

                        //表的字段timestamp、ip、userID、adID、province、city、clickedCount
                        for(final UserAdClicked clicked : userAdClickedList) {
                            jdbcWrapper.doQuery("SELECT clickedCount FROM adclicked WHERE"
                                    + " timestamp =? AND userID = ? AND adID = ?",
                                    new Object[]{clicked.getTimestamp(), clicked.getUserID(),
                                            clicked.getAdID()}, new ExecuteCallBack() {

                                                public void resultCallBack(ResultSet result) throws Exception {
                                                    // TODO Auto-generated method stub
                                                    if(result.next()) {
                                                        long count = result.getLong(1);
                                                        clicked.setClickedCount(count);
                                                        updating.add(clicked);
                                                    } else {
                                                        inserting.add(clicked);
                                                        clicked.setClickedCount(1L);
                                                    }
                                                }
                                            });
                        }
                        //表的字段timestamp、ip、userID、adID、province、city、clickedCount
                        List<Object[]> insertParametersList = new ArrayList<Object[]>();
                        for(UserAdClicked insertRecord : inserting) {
                            insertParametersList.add(new Object[] {
                                insertRecord.getTimestamp(),
                                insertRecord.getIp(),
                                insertRecord.getUserID(),
                                insertRecord.getAdID(),
                                insertRecord.getProvince(),
                                insertRecord.getCity(),
                                insertRecord.getClickedCount()
                            });
                        }
                        jdbcWrapper.doBatch("INSERT INTO adclicked VALUES(?, ?, ?, ?, ?, ?, ?)", insertParametersList);

                        //表的字段timestamp、ip、userID、adID、province、city、clickedCount
                        List<Object[]> updateParametersList = new ArrayList<Object[]>();
                        for(UserAdClicked updateRecord : updating) {
                            updateParametersList.add(new Object[] {
                                updateRecord.getTimestamp(),
                                updateRecord.getIp(),
                                updateRecord.getUserID(),
                                updateRecord.getAdID(),
                                updateRecord.getProvince(),
                                updateRecord.getCity(),
                                updateRecord.getClickedCount() + 1
                            });
                        }
                        jdbcWrapper.doBatch("UPDATE adclicked SET clickedCount =  ? WHERE"
                                    + " timestamp =? AND ip = ? AND userID = ? AND adID = ? "
                                    + "AND province = ? AND city = ?", updateParametersList);

                    }
                });
                return null;
            }
        });
     //再次過濾,從數(shù)據(jù)庫中讀取數(shù)據(jù)過濾黑名單
    JavaPairDStream<String, Long>  blackListBasedOnHistory = filterClickedBatch.filter(new Function<Tuple2<String,Long>, Boolean>() {

        public Boolean call(Tuple2<String, Long> v1) throws Exception {

            //廣告點擊的基本數(shù)據(jù)格式:timestamp,ip,userID,adID,province,city
            String[] splited = v1._1.split("\t"); //提取key值
            String date =splited[0];
            String userID =splited[2];
            String adID =splited[3];
            //查詢一下數(shù)據(jù)庫同一個用戶同一個廣告id點擊量超過50次列入黑名單
            //接下來 根據(jù)date、userID、adID條件去查詢用戶點擊廣告的數(shù)據(jù)表,獲得總的點擊次數(shù)
            //這個時候基于點擊次數(shù)判斷是否屬于黑名單點擊

            int clickedCountTotalToday = 81 ;

            if (clickedCountTotalToday > 50) {
                  return true;
            }else {
                return false ;
            }

        }


     });
    //map操作,找出用戶的id
    JavaDStream<String>   blackListuserIDBasedInBatchOnhistroy =blackListBasedOnHistory.map(new Function<Tuple2<String,Long>, String>() {

        public String call(Tuple2<String, Long> v1) throws Exception {
            // TODO Auto-generated method stub
            return v1._1.split("\t")[2];
        }
    });

    //有一個問題,數(shù)據(jù)可能重復,在一個partition里面重復,這個好辦;
    //但多個partition不能保證一個用戶重復,需要對黑名單的整個rdd進行去重操作。
    //rdd去重了,partition也就去重了,一石二鳥,一箭雙雕// 找出了黑名單,下一步就寫入黑名單數(shù)據(jù)庫表中
    JavaDStream<String>  blackListUniqueuserBasedInBatchOnhistroy = blackListuserIDBasedInBatchOnhistroy.transform(new Function<JavaRDD<String>, JavaRDD<String>>() {

        public JavaRDD<String> call(JavaRDD<String> rdd) throws Exception {
            // TODO Auto-generated method stub
            return rdd.distinct();
        }


    });

    // 下一步寫入到數(shù)據(jù)表中

    blackListUniqueuserBasedInBatchOnhistroy.foreachRDD(new Function<JavaRDD<String>, Void>() {

        public Void call(JavaRDD<String> rdd) throws Exception {
             rdd.foreachPartition(new VoidFunction<Iterator<String>>() {

                public void call(Iterator<String> t) throws Exception {
                    // TODO Auto-generated method stub
                    //插入的用戶信息可以只包含:useID
                    //此時直接插入黑名單數(shù)據(jù)表即可。
                    //寫入數(shù)據(jù)庫
                    List<Object[]> blackList = new ArrayList<Object[]>();
                    while(t.hasNext()) {
                        blackList.add(new Object[]{t.next()});
                    }
                    JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();
                    jdbcWrapper.doBatch("INSERT INTO blacklisttable values (?)", blackList);


                }

            });
            return null;
        }
    });

    /**廣告點擊累計動態(tài)更新,每個updateStateByKey都會在Batch Duration的時間間隔的基礎上進行廣告點擊次數(shù)的更新,
     * 更新之后我們一般都會持久化到外部存儲設備上,在這里我們存儲到MySQL數(shù)據(jù)庫中
     */
    JavaPairDStream<String, Long> updateStateByKeyDSteam = filteredadClickedStreaming.mapToPair(new PairFunction<Tuple2<String,String>, String, Long>() {

        public Tuple2<String, Long> call(Tuple2<String, String> t)
                throws Exception {
             String[] splited=t._2.split("\t");
             String timestamp = splited[0]; //YYYY-MM-DD
             String ip = splited[1];
             String userID = splited[2];
             String adID = splited[3];
             String province = splited[4];
             String city = splited[5];      

             String clickedRecord = timestamp + "_" +ip + "_"+userID+"_"+adID+"_"
                         +province +"_"+city;
             return new Tuple2<String, Long>(clickedRecord, 1L);
        }
    }).updateStateByKey(new Function2<List<Long>, Optional<Long>, Optional<Long>>() {

        public Optional<Long> call(List<Long> v1, Optional<Long> v2)
                throws Exception {
            // v1:當前的Key在當前的Batch Duration中出現(xiàn)的次數(shù)的集合,例如{1,1,1,。。。,1}
            // v2:當前的Key在以前的Batch Duration中積累下來的結果;
            Long clickedTotalHistory = 0L; 
            if(v2.isPresent()){
                clickedTotalHistory = v2.get();
            }
            for(Long one : v1) {
                clickedTotalHistory += one;
            }
            return Optional.of(clickedTotalHistory);
        }
    });
    updateStateByKeyDSteam.foreachRDD(new Function<JavaPairRDD<String,Long>, Void>() {

        public Void call(JavaPairRDD<String, Long> rdd) throws Exception {
             rdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String,Long>>>() {

                public void call(Iterator<Tuple2<String, Long>> partition) throws Exception {
                    //使用數(shù)據(jù)庫連接池的高效讀寫數(shù)據(jù)庫的方式將數(shù)據(jù)寫入數(shù)據(jù)庫mysql
                    //例如一次插入 1000條 records,使用insertBatch 或 updateBatch
                    //插入的用戶數(shù)據(jù)信息:timestamp、adID、province、city
                    //這里面有一個問題,可能出現(xiàn)兩條記錄的key是一樣的,此時需要更新累加操作
                    List<AdClicked> AdClickedList = new ArrayList<AdClicked>();
                    while(partition.hasNext()) {
                        Tuple2<String, Long> record = partition.next();
                        String[] splited = record._1.split("\t");

                        AdClicked adClicked = new AdClicked();
                        adClicked.setTimestamp(splited[0]);
                        adClicked.setAdID(splited[1]);
                        adClicked.setProvince(splited[2]);
                        adClicked.setCity(splited[3]);
                        adClicked.setClickedCount(record._2);
                        AdClickedList.add(adClicked);
                    }

                    final List<AdClicked> inserting = new ArrayList<AdClicked>();
                    final List<AdClicked> updating = new ArrayList<AdClicked>();

                    JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();

                    //表的字段timestamp、ip、userID、adID、province、city、clickedCount
                    for(final AdClicked clicked : AdClickedList) {
                        jdbcWrapper.doQuery("SELECT clickedCount FROM adclickedcount WHERE"
                                + " timestamp = ? AND adID = ? AND province = ? AND city = ?",
                                new Object[]{clicked.getTimestamp(), clicked.getAdID(),
                                        clicked.getProvince(), clicked.getCity()}, new ExecuteCallBack() {

                                            public void resultCallBack(ResultSet result) throws Exception {
                                                // TODO Auto-generated method stub
                                                if(result.next()) {
                                                    long count = result.getLong(1);
                                                    clicked.setClickedCount(count);
                                                    updating.add(clicked);
                                                } else {
                                                    inserting.add(clicked);
                                                    clicked.setClickedCount(1L);
                                                }
                                            }
                                        });
                    }
                    //表的字段timestamp、ip、userID、adID、province、city、clickedCount
                    List<Object[]> insertParametersList = new ArrayList<Object[]>();
                    for(AdClicked insertRecord : inserting) {
                        insertParametersList.add(new Object[] {
                            insertRecord.getTimestamp(),
                            insertRecord.getAdID(),
                            insertRecord.getProvince(),
                            insertRecord.getCity(),
                            insertRecord.getClickedCount()
                        });
                    }
                    jdbcWrapper.doBatch("INSERT INTO adclickedcount VALUES(?, ?, ?, ?, ?)", insertParametersList);

                    //表的字段timestamp、ip、userID、adID、province、city、clickedCount
                    List<Object[]> updateParametersList = new ArrayList<Object[]>();
                    for(AdClicked updateRecord : updating) {
                        updateParametersList.add(new Object[] {
                            updateRecord.getClickedCount(),
                            updateRecord.getTimestamp(),
                            updateRecord.getAdID(),
                            updateRecord.getProvince(),
                            updateRecord.getCity()
                        });
                    }
                    jdbcWrapper.doBatch("UPDATE adclickedcount SET clickedCount =  ? WHERE"
                                + " timestamp =? AND adID = ? AND province = ? AND city = ?", updateParametersList);

                }
            });
            return null;
        }
    });
    /**
     * 對廣告點擊進行TopN計算,計算出每天每個省份Top5排名的廣告
     * 因為我們直接對RDD進行操作,所以使用了transfomr算子;
     */
    updateStateByKeyDSteam.transform(new Function<JavaPairRDD<String,Long>, JavaRDD<Row>>() {

        public JavaRDD<Row> call(JavaPairRDD<String, Long> rdd) throws Exception {

            JavaRDD<Row> rowRDD = rdd.mapToPair(new PairFunction<Tuple2<String,Long>, String, Long>() {

                public Tuple2<String, Long> call(Tuple2<String, Long> t)
                        throws Exception {
                    // TODO Auto-generated method stub
                     String[] splited=t._1.split("_");
                     String timestamp = splited[0]; //YYYY-MM-DD
                     String adID = splited[3];
                     String province = splited[4];

                     String clickedRecord = timestamp + "_" + adID + "_" + province;

                     return new Tuple2<String, Long>(clickedRecord, t._2);
                }

            }).reduceByKey(new Function2<Long, Long, Long>() {

                public Long call(Long v1, Long v2) throws Exception {
                    // TODO Auto-generated method stub
                    return v1 + v2;
                }
            }).map(new Function<Tuple2<String,Long>, Row>() {

                public Row call(Tuple2<String, Long> v1) throws Exception {
                    // TODO Auto-generated method stub

                     String[] splited=v1._1.split("_");
                     String timestamp = splited[0]; //YYYY-MM-DD
                     String adID = splited[3];
                     String province = splited[4];


                    return RowFactory.create(timestamp, adID, province, v1._2);
                }
            });

            StructType structType = DataTypes.createStructType(Arrays.asList(
                    DataTypes.createStructField("timestamp", DataTypes.StringType, true),
                    DataTypes.createStructField("adID", DataTypes.StringType, true),
                    DataTypes.createStructField("province", DataTypes.StringType, true),
                    DataTypes.createStructField("clickedCount", DataTypes.LongType, true)
                    ));
            HiveContext hiveContext = new HiveContext(rdd.context());

            DataFrame df = hiveContext.createDataFrame(rowRDD, structType);
            df.registerTempTable("topNTableSource");
            DataFrame result = hiveContext.sql("SELECT timestamp, adID, province, clickedCount, FROM"
                    + " (SELECT timestamp, adID, province,clickedCount, "
                    + "ROW_NUMBER() OVER(PARTITION BY province ORDER BY clickeCount DESC) rank "
                    + "FROM topNTableSource) subquery "
                    + "WHERE rank <= 5");

            return result.toJavaRDD();
        }
    }).foreachRDD(new Function<JavaRDD<Row>, Void>() {

        public Void call(JavaRDD<Row> rdd) throws Exception {
            // TODO Auto-generated method stub
            rdd.foreachPartition(new VoidFunction<Iterator<Row>>() {

                public void call(Iterator<Row> t) throws Exception {
                    // TODO Auto-generated method stub
                    List<AdProvinceTopN> adProvinceTopN = new ArrayList<AdProvinceTopN>();
                    while(t.hasNext()) {
                        Row row = t.next();
                        AdProvinceTopN item = new AdProvinceTopN();
                        item.setTimestamp(row.getString(0));
                        item.setAdID(row.getString(1));
                        item.setProvince(row.getString(2));
                        item.setClickedCount(row.getLong(3));
                        adProvinceTopN.add(item);
                    }


//                  final List<AdProvinceTopN> inserting = new ArrayList<AdProvinceTopN>();
//                  final List<AdProvinceTopN> updating = new ArrayList<AdProvinceTopN>();

                    JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();

                    Set<String> set = new HashSet<String>();
                    for(AdProvinceTopN item: adProvinceTopN){
                        set.add(item.getTimestamp() + "_" + item.getProvince());
                    }
                    //表的字段timestamp、adID、province、clickedCount
                    ArrayList<Object[]> deleteParametersList = new ArrayList<Object[]>();
                    for(String deleteRecord : set) {
                        String[] splited = deleteRecord.split("_");
                        deleteParametersList.add(new Object[]{
                                splited[0],
                                splited[1]
                        });
                    }

                    jdbcWrapper.doBatch("DELETE FROM adprovincetopn WHERE timestamp = ? AND province = ?", deleteParametersList);


                    //表的字段timestamp、ip、userID、adID、province、city、clickedCount
                    List<Object[]> insertParametersList = new ArrayList<Object[]>();
                    for(AdProvinceTopN insertRecord : adProvinceTopN) {
                        insertParametersList.add(new Object[] {
                                insertRecord.getClickedCount(),
                                insertRecord.getTimestamp(),
                                insertRecord.getAdID(),
                                insertRecord.getProvince()
                        });
                    }
                    jdbcWrapper.doBatch("INSERT INTO adprovincetopn VALUES (?, ?, ?, ?)", insertParametersList);

                }
            });
            return null;
        }
    });

    /**
     * 計算過去半個小時內廣告點擊的趨勢
     * 廣告點擊的基本數(shù)據(jù)格式:timestamp、ip、userID、adID、province、city
     */
    filteredadClickedStreaming.mapToPair(new PairFunction<Tuple2<String,String>, String, Long>() {

        public Tuple2<String, Long> call(Tuple2<String, String> t)
                throws Exception {

            String splited[] = t._2.split("\t");
            String adID = splited[3];

            String time = splited[0]; //Todo:后續(xù)需要重構代碼實現(xiàn)時間戳和分鐘的轉換提取。此處需要提取出該廣告的點擊分鐘單位

            return new Tuple2<String, Long>(time + "_" + adID, 1L);
        }
    }).reduceByKeyAndWindow(new Function2<Long, Long, Long>() {

        public Long call(Long v1, Long v2) throws Exception {
            // TODO Auto-generated method stub
            return v1 + v2;
        }
    }, new Function2<Long, Long, Long>() {

        public Long call(Long v1, Long v2) throws Exception {
            // TODO Auto-generated method stub
            return v1 - v2;
        }
    }, Durations.minutes(30), Durations.milliseconds(5)).foreachRDD(new Function<JavaPairRDD<String,Long>, Void>() {

        public Void call(JavaPairRDD<String, Long> rdd) throws Exception {
            // TODO Auto-generated method stub
            rdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String,Long>>>() {

                public void call(Iterator<Tuple2<String, Long>> partition)
                        throws Exception {
                    List<AdTrendStat> adTrend = new ArrayList<AdTrendStat>();
                    // TODO Auto-generated method stub
                    while(partition.hasNext()) {
                        Tuple2<String, Long> record = partition.next();
                        String[] splited = record._1.split("_");
                        String time = splited[0];
                        String adID = splited[1];
                        Long clickedCount = record._2;
                        /**
                         * 在插入數(shù)據(jù)到數(shù)據(jù)庫的時候具體需要哪些字段?time、adID、clickedCount;
                         * 而我們通過J2EE技術進行趨勢繪圖的時候肯定是需要年、月、日、時、分這個維度的,所以我們在這里需要
                         * 年月日、小時、分鐘這些時間維度;
                         */
                        AdTrendStat adTrendStat = new AdTrendStat();
                        adTrendStat.setAdID(adID);
                        adTrendStat.setClickedCount(clickedCount);
                        adTrendStat.set_date(time); //Todo:獲取年月日
                        adTrendStat.set_hour(time); //Todo:獲取小時
                        adTrendStat.set_minute(time);//Todo:獲取分鐘

                        adTrend.add(adTrendStat);
                    }

                    final List<AdTrendStat> inserting = new ArrayList<AdTrendStat>();
                    final List<AdTrendStat> updating = new ArrayList<AdTrendStat>();

                    JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();

                    //表的字段timestamp、ip、userID、adID、province、city、clickedCount
                    for(final AdTrendStat trend : adTrend) {
                        final AdTrendCountHistory adTrendhistory = new AdTrendCountHistory();

                        jdbcWrapper.doQuery("SELECT clickedCount FROM adclickedtrend WHERE"
                                + " date =? AND hour = ? AND minute = ? AND AdID = ?",
                                new Object[]{trend.get_date(), trend.get_hour(),  trend.get_minute(),
                                        trend.getAdID()}, new ExecuteCallBack() {

                                            public void resultCallBack(ResultSet result) throws Exception {
                                                // TODO Auto-generated method stub
                                                if(result.next()) {
                                                    long count = result.getLong(1);
                                                    adTrendhistory.setClickedCountHistoryLong(count);
                                                    updating.add(trend);
                                                } else {                                                    
                                                    inserting.add(trend);
                                                }
                                            }
                                        });
                    }
                    //表的字段date、hour、minute、adID、clickedCount
                    List<Object[]> insertParametersList = new ArrayList<Object[]>();
                    for(AdTrendStat insertRecord : inserting) {
                        insertParametersList.add(new Object[] {
                            insertRecord.get_date(),
                            insertRecord.get_hour(),
                            insertRecord.get_minute(),
                            insertRecord.getAdID(),
                            insertRecord.getClickedCount()
                        });
                    }
                    jdbcWrapper.doBatch("INSERT INTO adclickedtrend VALUES(?, ?, ?, ?, ?)", insertParametersList);

                    //表的字段date、hour、minute、adID、clickedCount
                    List<Object[]> updateParametersList = new ArrayList<Object[]>();
                    for(AdTrendStat updateRecord : updating) {
                        updateParametersList.add(new Object[] {
                            updateRecord.getClickedCount(),
                            updateRecord.get_date(),
                            updateRecord.get_hour(),
                            updateRecord.get_minute(),
                            updateRecord.getAdID()

                        });
                    }
                    jdbcWrapper.doBatch("UPDATE adclickedtrend SET clickedCount =  ? WHERE"
                            + " date =? AND hour = ? AND minute = ? AND AdID = ?"
                            , updateParametersList);

                }
            });
            return null;
        }
    });;
    /**
     * Spark Streaming 執(zhí)行引擎也就是Driver開始運行,Driver啟動的時候是位于一條新的線程中的,當然其內部有消息循環(huán)體,用于
     * 接收應用程序本身或者Executor中的消息,
     */
    javassc.start();
    javassc.awaitTermination();
    javassc.close();


    }

    private static JavaStreamingContext createContext(String checkpointDirectory, SparkConf conf) {
        // If you do not see this printed, that means the StreamingContext has been loaded
        // from the new checkpoint
        System.out.println("Creating new context");


        // Create the context with a 5 second batch size
        JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(10));
        ssc.checkpoint(checkpointDirectory);

        return ssc;
    }
}

class JDBCWrapper {

    private static JDBCWrapper jdbcInstance = null;
    private static LinkedBlockingQueue<Connection> dbConnectionPool = new LinkedBlockingQueue<Connection>();
    static {
        try {
            Class.forName("com.mysql.jdbc.Driver");

        } catch (ClassNotFoundException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    public static JDBCWrapper getJDBCInstance() {
        if(jdbcInstance == null) {
            synchronized (JDBCWrapper.class) {
                if(jdbcInstance == null) {
                    jdbcInstance = new JDBCWrapper();

                }
            }
        }
        return jdbcInstance;        
    }
    private JDBCWrapper() {
        for(int i = 0; i < 10; i++){
            try {
                Connection conn = DriverManager.getConnection("jdbc:mysql://Master:3306/sparkstreaming","root", "root");
                dbConnectionPool.put(conn);
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }       
    }
    public synchronized Connection getConnection() {
        while(0 == dbConnectionPool.size()){
            try {
                Thread.sleep(20);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        return dbConnectionPool.poll();
    }
    public int[] doBatch(String sqlText, List<Object[]> paramsList){

        Connection conn = getConnection();
        PreparedStatement preparedStatement = null;
        int[] result = null;
        try {
            conn.setAutoCommit(false);
            preparedStatement = conn.prepareStatement(sqlText);
            for(Object[] parameters: paramsList) {
                for(int i = 0; i < parameters.length; i++){
                    preparedStatement.setObject(i + 1, parameters[i]);
                }               
                preparedStatement.addBatch();
            }
            result = preparedStatement.executeBatch();

            conn.commit();
        } catch (SQLException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
            if(preparedStatement != null) {
                try {
                    preparedStatement.close();
                } catch (SQLException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }

            if(conn != null) {
                try {
                    dbConnectionPool.put(conn);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }

        return result;  
    }
public void doQuery(String sqlText, Object[] paramsList, ExecuteCallBack callback){

        Connection conn = getConnection();
        PreparedStatement preparedStatement = null;
        ResultSet result = null;
        try {
            preparedStatement = conn.prepareStatement(sqlText);

                for(int i = 0; i < paramsList.length; i++){
                    preparedStatement.setObject(i + 1, paramsList[i]);
                }               

            result = preparedStatement.executeQuery();

            try {
                callback.resultCallBack(result);
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        } catch (SQLException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
            if(preparedStatement != null) {
                try {
                    preparedStatement.close();
                } catch (SQLException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }

            if(conn != null) {
                try {
                    dbConnectionPool.put(conn);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
    }
}
interface ExecuteCallBack {
    void resultCallBack(ResultSet result) throws Exception;
}

class UserAdClicked {
    private String timestamp;
    private String ip;
    private String userID;
    private String adID;
    private String province;
    private String city;
    private Long clickedCount;
    public String getTimestamp() {
        return timestamp;
    }
    public void setTimestamp(String timestamp) {
        this.timestamp = timestamp;
    }
    public String getIp() {
        return ip;
    }
    public void setIp(String ip) {
        this.ip = ip;
    }
    public String getUserID() {
        return userID;
    }
    public void setUserID(String userID) {
        this.userID = userID;
    }
    public String getAdID() {
        return adID;
    }
    public void setAdID(String adID) {
        this.adID = adID;
    }
    public String getProvince() {
        return province;
    }
    public void setProvince(String province) {
        this.province = province;
    }
    public String getCity() {
        return city;
    }
    public void setCity(String city) {
        this.city = city;
    }
    public Long getClickedCount() {
        return clickedCount;
    }
    public void setClickedCount(Long clickedCount) {
        this.clickedCount = clickedCount;
    }   
}

class AdClicked {
    private String timestamp;
    private String adID;
    private String province;
    private String city;
    private Long clickedCount;
    public String getTimestamp() {
        return timestamp;
    }
    public void setTimestamp(String timestamp) {
        this.timestamp = timestamp;
    }
    public String getAdID() {
        return adID;
    }
    public void setAdID(String adID) {
        this.adID = adID;
    }
    public String getProvince() {
        return province;
    }
    public void setProvince(String province) {
        this.province = province;
    }
    public String getCity() {
        return city;
    }
    public void setCity(String city) {
        this.city = city;
    }
    public Long getClickedCount() {
        return clickedCount;
    }
    public void setClickedCount(Long clickedCount) {
        this.clickedCount = clickedCount;
    }
}

class AdProvinceTopN {
    private String timestamp;
    private String adID;
    private String province;
    private Long clickedCount;
    public String getTimestamp() {
        return timestamp;
    }
    public void setTimestamp(String timestamp) {
        this.timestamp = timestamp;
    }
    public String getAdID() {
        return adID;
    }
    public void setAdID(String adID) {
        this.adID = adID;
    }
    public String getProvince() {
        return province;
    }
    public void setProvince(String province) {
        this.province = province;
    }
    public Long getClickedCount() {
        return clickedCount;
    }
    public void setClickedCount(Long clickedCount) {
        this.clickedCount = clickedCount;
    }

}

class AdTrendStat {
    private String _date;
    private String _hour;
    private String _minute;
    private String adID;
    private Long clickedCount;
    public String get_date() {
        return _date;
    }
    public void set_date(String _date) {
        this._date = _date;
    }
    public String get_hour() {
        return _hour;
    }
    public void set_hour(String _hour) {
        this._hour = _hour;
    }
    public String get_minute() {
        return _minute;
    }
    public void set_minute(String _minute) {
        this._minute = _minute;
    }
    public String getAdID() {
        return adID;
    }
    public void setAdID(String adID) {
        this.adID = adID;
    }
    public Long getClickedCount() {
        return clickedCount;
    }
    public void setClickedCount(Long clickedCount) {
        this.clickedCount = clickedCount;
    }

}

class AdTrendCountHistory{
    private Long clickedCountHistoryLong;

    public Long getClickedCountHistoryLong() {
        return clickedCountHistoryLong;
    }

    public void setClickedCountHistoryLong(Long clickedCountHistoryLong) {
        this.clickedCountHistoryLong = clickedCountHistoryLong;
    }

}

    本站是提供個人知識管理的網(wǎng)絡存儲空間,所有內容均由用戶發(fā)布,不代表本站觀點。請注意甄別內容中的聯(lián)系方式、誘導購買等信息,謹防詐騙。如發(fā)現(xiàn)有害或侵權內容,請點擊一鍵舉報。
    轉藏 分享 獻花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多