廣告點擊統(tǒng)計需求:
(1)對接kafka,獲得數(shù)據(jù)
(2)發(fā)現(xiàn)某個用戶某天對某個廣告的點擊量已經(jīng)大于等于100,寫入黑名單,進行過濾
(3)計算廣告點擊流量實時統(tǒng)計結(jié)果
(4)實時統(tǒng)計每天每個省份top3熱門廣告
(5)實時統(tǒng)計每天每個廣告在最近1小時的滑動窗口內(nèi)的點擊趨勢(每分鐘的點擊量)
主流程代碼
public static void main(String[] args) {
// 構(gòu)建Spark Streaming上下文
SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setAppName("AdClickRealTimeStatSpark");
.set("spark.streaming.receiver.writeAheadLog.enable", "true");
// spark streaming的上下文是構(gòu)建JavaStreamingContext對象
// 而不是像之前的JavaSparkContext、SQLContext/HiveContext
// 傳入的第一個參數(shù),和之前的spark上下文一樣,也是SparkConf對象;第二個參數(shù)則不太一樣
// 第二個參數(shù)是spark streaming類型作業(yè)比較有特色的一個參數(shù)
// 實時處理batch的interval
// spark streaming,每隔一小段時間,會去收集一次數(shù)據(jù)源(kafka)中的數(shù)據(jù),做成一個batch
// 每次都是處理一個batch中的數(shù)據(jù)
// 通常來說,batch interval,就是指每隔多少時間收集一次數(shù)據(jù)源中的數(shù)據(jù),然后進行處理
// 一遍spark streaming的應(yīng)用,都是設(shè)置數(shù)秒到數(shù)十秒(很少會超過1分鐘)
// 咱們這里項目中,就設(shè)置5秒鐘的batch interval
// 每隔5秒鐘,咱們的spark streaming作業(yè)就會收集最近5秒內(nèi)的數(shù)據(jù)源接收過來的數(shù)據(jù)
JavaStreamingContext jssc = new JavaStreamingContext(
conf, Durations.seconds(5));
jssc.checkpoint("hdfs://192.168.1.105:9000/streaming_checkpoint");
// 正式開始進行代碼的編寫
// 實現(xiàn)咱們需要的實時計算的業(yè)務(wù)邏輯和功能
// 創(chuàng)建針對Kafka數(shù)據(jù)來源的輸入DStream(離線流,代表了一個源源不斷的數(shù)據(jù)來源,抽象)
// 選用kafka direct api(很多好處,包括自己內(nèi)部自適應(yīng)調(diào)整每次接收數(shù)據(jù)量的特性,等等)
// 構(gòu)建kafka參數(shù)map
// 主要要放置的就是,你要連接的kafka集群的地址(broker集群的地址列表)
Map<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list",
ConfigurationManager.getProperty(Constants.KAFKA_METADATA_BROKER_LIST));
// 構(gòu)建topic set
String kafkaTopics = ConfigurationManager.getProperty(Constants.KAFKA_TOPICS);
String[] kafkaTopicsSplited = kafkaTopics.split(",");
Set<String> topics = new HashSet<String>();
for(String kafkaTopic : kafkaTopicsSplited) {
topics.add(kafkaTopic);
}
// 基于kafka direct api模式,構(gòu)建出了針對kafka集群中指定topic的輸入DStream
// 兩個值,val1,val2;val1沒有什么特殊的意義;val2中包含了kafka topic中的一條一條的實時日志數(shù)據(jù)
JavaPairInputDStream<String, String> adRealTimeLogDStream = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topics);
// adRealTimeLogDStream.repartition(1000);
// 根據(jù)動態(tài)黑名單進行數(shù)據(jù)過濾
JavaPairDStream<String, String> filteredAdRealTimeLogDStream =
filterByBlacklist(adRealTimeLogDStream);
// 生成動態(tài)黑名單
generateDynamicBlacklist(filteredAdRealTimeLogDStream);
// 業(yè)務(wù)功能一:計算廣告點擊流量實時統(tǒng)計結(jié)果(yyyyMMdd_province_city_adid,clickCount)
// 最粗
JavaPairDStream<String, Long> adRealTimeStatDStream = calculateRealTimeStat(
filteredAdRealTimeLogDStream);
// 業(yè)務(wù)功能二:實時統(tǒng)計每天每個省份top3熱門廣告
// 統(tǒng)計的稍微細(xì)一些了
calculateProvinceTop3Ad(adRealTimeStatDStream);
// 業(yè)務(wù)功能三:實時統(tǒng)計每天每個廣告在最近1小時的滑動窗口內(nèi)的點擊趨勢(每分鐘的點擊量)
// 統(tǒng)計的非常細(xì)了
// 我們每次都可以看到每個廣告,最近一小時內(nèi),每分鐘的點擊量
// 每支廣告的點擊趨勢
calculateAdClickCountByWindow(adRealTimeLogDStream);
// 構(gòu)建完spark streaming上下文之后,記得要進行上下文的啟動、等待執(zhí)行結(jié)束、關(guān)閉
jssc.start();
jssc.awaitTermination();
jssc.close();
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
計算廣告點擊流量實時統(tǒng)計結(jié)果
/**
* 計算廣告點擊流量實時統(tǒng)計
* @param filteredAdRealTimeLogDStream
* @return
*/
private static JavaPairDStream<String, Long> calculateRealTimeStat(
JavaPairDStream<String, String> filteredAdRealTimeLogDStream) {
// 業(yè)務(wù)邏輯一
// 廣告點擊流量實時統(tǒng)計
// 上面的黑名單實際上是廣告類的實時系統(tǒng)中,比較常見的一種基礎(chǔ)的應(yīng)用
// 實際上,我們要實現(xiàn)的業(yè)務(wù)功能,不是黑名單
// 計算每天各省各城市各廣告的點擊量
// 這份數(shù)據(jù),實時不斷地更新到mysql中的,J2EE系統(tǒng),是提供實時報表給用戶查看的
// j2ee系統(tǒng)每隔幾秒鐘,就從mysql中摟一次最新數(shù)據(jù),每次都可能不一樣
// 設(shè)計出來幾個維度:日期、省份、城市、廣告
// j2ee系統(tǒng)就可以非常的靈活
// 用戶可以看到,實時的數(shù)據(jù),比如2015-11-01,歷史數(shù)據(jù)
// 2015-12-01,當(dāng)天,可以看到當(dāng)天所有的實時數(shù)據(jù)(動態(tài)改變),比如江蘇省南京市
// 廣告可以進行選擇(廣告主、廣告名稱、廣告類型來篩選一個出來)
// 拿著date、province、city、adid,去mysql中查詢最新的數(shù)據(jù)
// 等等,基于這幾個維度,以及這份動態(tài)改變的數(shù)據(jù),是可以實現(xiàn)比較靈活的廣告點擊流量查看的功能的
// date province city userid adid
// date_province_city_adid,作為key;1作為value
// 通過spark,直接統(tǒng)計出來全局的點擊次數(shù),在spark集群中保留一份;在mysql中,也保留一份
// 我們要對原始數(shù)據(jù)進行map,映射成<date_province_city_adid,1>格式
// 然后呢,對上述格式的數(shù)據(jù),執(zhí)行updateStateByKey算子
// spark streaming特有的一種算子,在spark集群內(nèi)存中,維護一份key的全局狀態(tài)
JavaPairDStream<String, Long> mappedDStream = filteredAdRealTimeLogDStream.mapToPair(
new PairFunction<Tuple2<String,String>, String, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Long> call(Tuple2<String, String> tuple)
throws Exception {
String log = tuple._2;
String[] logSplited = log.split(" ");
String timestamp = logSplited[0];
Date date = new Date(Long.valueOf(timestamp));
String datekey = DateUtils.formatDateKey(date); // yyyyMMdd
String province = logSplited[1];
String city = logSplited[2];
long adid = Long.valueOf(logSplited[4]);
String key = datekey + "_" + province + "_" + city + "_" + adid;
return new Tuple2<String, Long>(key, 1L);
}
});
// 在這個dstream中,就相當(dāng)于,有每個batch rdd累加的各個key(各天各省份各城市各廣告的點擊次數(shù))
// 每次計算出最新的值,就在aggregatedDStream中的每個batch rdd中反應(yīng)出來
JavaPairDStream<String, Long> aggregatedDStream = mappedDStream.updateStateByKey(
new Function2<List<Long>, Optional<Long>, Optional<Long>>() {
private static final long serialVersionUID = 1L;
@Override
public Optional<Long> call(List<Long> values, Optional<Long> optional)
throws Exception {
// 舉例來說
// 對于每個key,都會調(diào)用一次這個方法
// 比如key是<20151201_Jiangsu_Nanjing_10001,1>,就會來調(diào)用一次這個方法7
// 10個
// values,(1,1,1,1,1,1,1,1,1,1)
// 首先根據(jù)optional判斷,之前這個key,是否有對應(yīng)的狀態(tài)
long clickCount = 0L;
// 如果說,之前是存在這個狀態(tài)的,那么就以之前的狀態(tài)作為起點,進行值的累加
if(optional.isPresent()) {
clickCount = optional.get();
}
// values,代表了,batch rdd中,每個key對應(yīng)的所有的值
for(Long value : values) {
clickCount += value;
}
return Optional.of(clickCount);
}
});
// 將計算出來的最新結(jié)果,同步一份到mysql中,以便于j2ee系統(tǒng)使用
aggregatedDStream.foreachRDD(new Function<JavaPairRDD<String,Long>, Void>() {
private static final long serialVersionUID = 1L;
@Override
public Void call(JavaPairRDD<String, Long> rdd) throws Exception {
rdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String,Long>>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Iterator<Tuple2<String, Long>> iterator)
throws Exception {
List<AdStat> adStats = new ArrayList<AdStat>();
while(iterator.hasNext()) {
Tuple2<String, Long> tuple = iterator.next();
String[] keySplited = tuple._1.split("_");
String date = keySplited[0];
String province = keySplited[1];
String city = keySplited[2];
long adid = Long.valueOf(keySplited[3]);
long clickCount = tuple._2;
AdStat adStat = new AdStat();
adStat.setDate(date);
adStat.setProvince(province);
adStat.setCity(city);
adStat.setAdid(adid);
adStat.setClickCount(clickCount);
adStats.add(adStat);
}
IAdStatDAO adStatDAO = DAOFactory.getAdStatDAO();
adStatDAO.updateBatch(adStats);
}
});
return null;
}
});
return aggregatedDStream;
}