需求分析和技術架構廣告點擊系統(tǒng)實時分析 廣告來自于廣告或者移動App等,廣告需要設定在具體的廣告位,當用戶點擊廣告的時候,一般都會通過ajax或Socket往后臺發(fā)送日志數(shù)據(jù),在這里我們是要做基于SparkStreaming做實時在線統(tǒng)計。那么數(shù)據(jù)就需要放進消息系統(tǒng)(Kafka)中,我們的Spark Streaming應用程序就會去Kafka中Pull數(shù)據(jù)過來進行計算和消費,并把計算后的數(shù)據(jù)放入到持久化系統(tǒng)中(MySQL) 廣告點擊系統(tǒng)實時分析的意義:因為可以在線實時的看見廣告的投放效果,就為廣告的更大規(guī)模的投入和調整打下了堅實的基礎,從而為公司帶來最大化的經濟回報。 核心需求: 1、實時黑名單動態(tài)過濾出有效的用戶廣告點擊行為:因為黑名單用戶可能隨時出現(xiàn),所以需要動態(tài)更新; 2、在線計算廣告點擊流量; 3、Top3熱門廣告; 4、每個廣告流量趨勢; 5、廣告點擊用戶的區(qū)域分布分析 6、最近一分鐘的廣告點擊量; 7、整個廣告點擊Spark Streaming處理程序7*24小時運行; 數(shù)據(jù)格式: 時間、用戶、廣告、城市等 技術細節(jié): 在線計算用戶點擊的次數(shù)分析,屏蔽IP等; 使用updateStateByKey或者mapWithState進行不同地區(qū)廣告點擊排名的計算; Spark Streaming+Spark SQL+Spark Core等綜合分析數(shù)據(jù); 使用Window類型的操作; 高可用和性能調優(yōu)等等; 流量趨勢,一般會結合DB等; Spark Core 
/**
*
*/
package com.tom.spark.SparkApps.sparkstreaming;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
/**
* 數(shù)據(jù)生成代碼,Kafka Producer產生數(shù)據(jù)
*/
public class MockAdClickedStat {
/**
* @param args
*/
public static void main(String[] args) {
final Random random = new Random();
final String[] provinces = new String[]{"Guangdong", "Zhejiang", "Jiangsu", "Fujian"};
final Map<String, String[]> cities = new HashMap<String, String[]>();
cities.put("Guangdong", new String[]{"Guangzhou", "Shenzhen", "Dongguan"});
cities.put("Zhejiang", new String[]{"Hangzhou", "Wenzhou", "Ningbo"});
cities.put("Jiangsu", new String[]{"Nanjing", "Suzhou", "Wuxi"});
cities.put("Fujian", new String[]{"Fuzhou", "Xiamen", "Sanming"});
final String[] ips = new String[] {
"192.168.112.240",
"192.168.112.239",
"192.168.112.245",
"192.168.112.246",
"192.168.112.247",
"192.168.112.248",
"192.168.112.249",
"192.168.112.250",
"192.168.112.251",
"192.168.112.252",
"192.168.112.253",
"192.168.112.254",
};
/**
* Kafka相關的基本配置信息
*/
Properties kafkaConf = new Properties();
kafkaConf.put("serializer.class", "kafka.serializer.StringEncoder");
kafkaConf.put("metadeta.broker.list", "Master:9092,Worker1:9092,Worker2:9092");
ProducerConfig producerConfig = new ProducerConfig(kafkaConf);
final Producer<Integer, String> producer = new Producer<Integer, String>(producerConfig);
new Thread(new Runnable() {
public void run() {
while(true) {
//在線處理廣告點擊流的基本數(shù)據(jù)格式:timestamp、ip、userID、adID、province、city
Long timestamp = new Date().getTime();
String ip = ips[random.nextInt(12)]; //可以采用網(wǎng)絡上免費提供的ip庫
int userID = random.nextInt(10000);
int adID = random.nextInt(100);
String province = provinces[random.nextInt(4)];
String city = cities.get(province)[random.nextInt(3)];
String clickedAd = timestamp + "\t" + ip + "\t" + userID + "\t" + adID + "\t" + province + "\t" + city;
producer.send(new KeyedMessage<Integer, String>("AdClicked", clickedAd));
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}).start();
}
}
package com.tom.spark.SparkApps.sparkstreaming;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
import org.apache.spark.streaming.kafka.KafkaUtils;
import com.google.common.base.Optional;
import scala.Tuple2;
/**
* 數(shù)據(jù)處理,Kafka消費者
*/
public class AdClickedStreamingStats {
/**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
//好處:1、checkpoint 2、工廠
final SparkConf conf = new SparkConf().setAppName("SparkStreamingOnKafkaDirect").setMaster("hdfs://Master:7077/");
final String checkpointDirectory = "hdfs://Master:9000/library/SparkStreaming/CheckPoint_Data";
JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
public JavaStreamingContext create() {
// TODO Auto-generated method stub
return createContext(checkpointDirectory, conf);
}
};
/**
* 可以從失敗中恢復Driver,不過還需要指定Driver這個進程運行在Cluster,并且在提交應用程序的時候制定--supervise;
*/
JavaStreamingContext javassc = JavaStreamingContext.getOrCreate(checkpointDirectory, factory);
/**
* 第三步:創(chuàng)建Spark Streaming輸入數(shù)據(jù)來源input Stream:
* 1、數(shù)據(jù)輸入來源可以基于File、HDFS、Flume、Kafka、Socket等
* 2、在這里我們指定數(shù)據(jù)來源于網(wǎng)絡Socket端口,Spark Streaming連接上該端口并在運行的時候一直監(jiān)聽該端口的數(shù)據(jù)
* (當然該端口服務首先必須存在),并且在后續(xù)會根據(jù)業(yè)務需要不斷有數(shù)據(jù)產生(當然對于Spark Streaming
* 應用程序的運行而言,有無數(shù)據(jù)其處理流程都是一樣的)
* 3、如果經常在每間隔5秒鐘沒有數(shù)據(jù)的話不斷啟動空的Job其實會造成調度資源的浪費,因為并沒有數(shù)據(jù)需要發(fā)生計算;所以
* 實際的企業(yè)級生成環(huán)境的代碼在具體提交Job前會判斷是否有數(shù)據(jù),如果沒有的話就不再提交Job;
*/
//創(chuàng)建Kafka元數(shù)據(jù)來讓Spark Streaming這個Kafka Consumer利用
Map<String, String> kafkaParameters = new HashMap<String, String>();
kafkaParameters.put("metadata.broker.list", "Master:9092,Worker1:9092,Worker2:9092");
Set<String> topics = new HashSet<String>();
topics.add("SparkStreamingDirected");
JavaPairInputDStream<String, String> adClickedStreaming = KafkaUtils.createDirectStream(javassc,
String.class, String.class,
StringDecoder.class, StringDecoder.class,
kafkaParameters,
topics);
/**因為要對黑名單進行過濾,而數(shù)據(jù)是在RDD中的,所以必然使用transform這個函數(shù);
* 但是在這里我們必須使用transformToPair,原因是讀取進來的Kafka的數(shù)據(jù)是Pair<String,String>類型,
* 另一個原因是過濾后的數(shù)據(jù)要進行進一步處理,所以必須是讀進的Kafka數(shù)據(jù)的原始類型
*
* 在此再次說明,每個Batch Duration中實際上講輸入的數(shù)據(jù)就是被一個且僅被一個RDD封裝的,你可以有多個
* InputDStream,但其實在產生job的時候,這些不同的InputDStream在Batch Duration中就相當于Spark基于HDFS
* 數(shù)據(jù)操作的不同文件來源而已罷了。
*/
JavaPairDStream<String, String> filteredadClickedStreaming = adClickedStreaming.transformToPair(new Function<JavaPairRDD<String,String>, JavaPairRDD<String,String>>() {
public JavaPairRDD<String, String> call(
JavaPairRDD<String, String> rdd) throws Exception {
/**
* 在線黑名單過濾思路步驟:
* 1、從數(shù)據(jù)庫中獲取黑名單轉換成RDD,即新的RDD實例封裝黑名單數(shù)據(jù);
* 2、然后把代表黑名單的RDD的實例和Batch Duration產生的RDD進行Join操作,
* 準確的說是進行l(wèi)eftOuterJoin操作,也就是說使用Batch Duration產生的RDD和代表黑名單的RDD實例進行
* leftOuterJoin操作,如果兩者都有內容的話,就會是true,否則的話就是false
*
* 我們要留下的是leftOuterJoin結果為false;
*
*/
final List<String> blackListNames = new ArrayList<String>();
JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();
jdbcWrapper.doQuery("SELECT * FROM blacklisttable", null, new ExecuteCallBack() {
public void resultCallBack(ResultSet result) throws Exception {
while(result.next()){
blackListNames.add(result.getString(1));
}
}
});
List<Tuple2<String, Boolean>> blackListTuple = new ArrayList<Tuple2<String,Boolean>>();
for(String name : blackListNames) {
blackListTuple.add(new Tuple2<String, Boolean>(name, true));
}
List<Tuple2<String, Boolean>> blacklistFromListDB = blackListTuple; //數(shù)據(jù)來自于查詢的黑名單表并且映射成為<String, Boolean>
JavaSparkContext jsc = new JavaSparkContext(rdd.context());
/**
* 黑名單的表中只有userID,但是如果要進行join操作的話就必須是Key-Value,所以在這里我們需要
* 基于數(shù)據(jù)表中的數(shù)據(jù)產生Key-Value類型的數(shù)據(jù)集合
*/
JavaPairRDD<String, Boolean> blackListRDD = jsc.parallelizePairs(blacklistFromListDB);
/**
* 進行操作的時候肯定是基于userID進行join,所以必須把傳入的rdd進行mapToPair操作轉化成為符合格式的RDD
*
*/
JavaPairRDD<String, Tuple2<String, String>> rdd2Pair = rdd.mapToPair(new PairFunction<Tuple2<String,String>, String, Tuple2<String, String>>() {
public Tuple2<String, Tuple2<String, String>> call(
Tuple2<String, String> t) throws Exception {
// TODO Auto-generated method stub
String userID = t._2.split("\t")[2];
return new Tuple2<String, Tuple2<String,String>>(userID, t);
}
});
JavaPairRDD<String, Tuple2<Tuple2<String, String>, Optional<Boolean>>> joined = rdd2Pair.leftOuterJoin(blackListRDD);
JavaPairRDD<String, String> result = joined.filter(new Function<Tuple2<String,Tuple2<Tuple2<String,String>,Optional<Boolean>>>, Boolean>() {
public Boolean call(
Tuple2<String, Tuple2<Tuple2<String, String>, Optional<Boolean>>> tuple)
throws Exception {
// TODO Auto-generated method stub
Optional<Boolean> optional = tuple._2._2;
if(optional.isPresent() && optional.get()){
return false;
} else {
return true;
}
}
}).mapToPair(new PairFunction<Tuple2<String,Tuple2<Tuple2<String,String>,Optional<Boolean>>>, String, String>() {
public Tuple2<String, String> call(
Tuple2<String, Tuple2<Tuple2<String, String>, Optional<Boolean>>> t)
throws Exception {
// TODO Auto-generated method stub
return t._2._1;
}
});
return result;
}
});
//廣告點擊的基本數(shù)據(jù)格式:timestamp、ip、userID、adID、province、city
JavaPairDStream<String, Long> pairs = filteredadClickedStreaming.mapToPair(new PairFunction<Tuple2<String,String>, String, Long>() {
public Tuple2<String, Long> call(Tuple2<String, String> t) throws Exception {
String[] splited=t._2.split("\t");
String timestamp = splited[0]; //YYYY-MM-DD
String ip = splited[1];
String userID = splited[2];
String adID = splited[3];
String province = splited[4];
String city = splited[5];
String clickedRecord = timestamp + "_" +ip + "_"+userID+"_"+adID+"_"
+province +"_"+city;
return new Tuple2<String, Long>(clickedRecord, 1L);
}
});
/**
* 第4.3步:在單詞實例計數(shù)為1基礎上,統(tǒng)計每個單詞在文件中出現(xiàn)的總次數(shù)
*/
JavaPairDStream<String, Long> adClickedUsers= pairs.reduceByKey(new Function2<Long, Long, Long>() {
public Long call(Long i1, Long i2) throws Exception{
return i1 + i2;
}
});
/*判斷有效的點擊,復雜化的采用機器學習訓練模型進行在線過濾 簡單的根據(jù)ip判斷1天不超過100次;也可以通過一個batch duration的點擊次數(shù)
判斷是否非法廣告點擊,通過一個batch來判斷是不完整的,還需要一天的數(shù)據(jù)也可以每一個小時來判斷。*/
JavaPairDStream<String, Long> filterClickedBatch = adClickedUsers.filter(new Function<Tuple2<String,Long>, Boolean>() {
public Boolean call(Tuple2<String, Long> v1) throws Exception {
if (1 < v1._2){
//更新一些黑名單的數(shù)據(jù)庫表
return false;
} else {
return true;
}
}
});
//filterClickedBatch.print();
//寫入數(shù)據(jù)庫
filterClickedBatch.foreachRDD(new Function<JavaPairRDD<String,Long>, Void>() {
public Void call(JavaPairRDD<String, Long> rdd) throws Exception {
rdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String,Long>>>() {
public void call(Iterator<Tuple2<String, Long>> partition) throws Exception {
//使用數(shù)據(jù)庫連接池的高效讀寫數(shù)據(jù)庫的方式將數(shù)據(jù)寫入數(shù)據(jù)庫mysql
//例如一次插入 1000條 records,使用insertBatch 或 updateBatch
//插入的用戶數(shù)據(jù)信息:userID,adID,clickedCount,time
//這里面有一個問題,可能出現(xiàn)兩條記錄的key是一樣的,此時需要更新累加操作
List<UserAdClicked> userAdClickedList = new ArrayList<UserAdClicked>();
while(partition.hasNext()) {
Tuple2<String, Long> record = partition.next();
String[] splited = record._1.split("\t");
UserAdClicked userClicked = new UserAdClicked();
userClicked.setTimestamp(splited[0]);
userClicked.setIp(splited[1]);
userClicked.setUserID(splited[2]);
userClicked.setAdID(splited[3]);
userClicked.setProvince(splited[4]);
userClicked.setCity(splited[5]);
userAdClickedList.add(userClicked);
}
final List<UserAdClicked> inserting = new ArrayList<UserAdClicked>();
final List<UserAdClicked> updating = new ArrayList<UserAdClicked>();
JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();
//表的字段timestamp、ip、userID、adID、province、city、clickedCount
for(final UserAdClicked clicked : userAdClickedList) {
jdbcWrapper.doQuery("SELECT clickedCount FROM adclicked WHERE"
+ " timestamp =? AND userID = ? AND adID = ?",
new Object[]{clicked.getTimestamp(), clicked.getUserID(),
clicked.getAdID()}, new ExecuteCallBack() {
public void resultCallBack(ResultSet result) throws Exception {
// TODO Auto-generated method stub
if(result.next()) {
long count = result.getLong(1);
clicked.setClickedCount(count);
updating.add(clicked);
} else {
inserting.add(clicked);
clicked.setClickedCount(1L);
}
}
});
}
//表的字段timestamp、ip、userID、adID、province、city、clickedCount
List<Object[]> insertParametersList = new ArrayList<Object[]>();
for(UserAdClicked insertRecord : inserting) {
insertParametersList.add(new Object[] {
insertRecord.getTimestamp(),
insertRecord.getIp(),
insertRecord.getUserID(),
insertRecord.getAdID(),
insertRecord.getProvince(),
insertRecord.getCity(),
insertRecord.getClickedCount()
});
}
jdbcWrapper.doBatch("INSERT INTO adclicked VALUES(?, ?, ?, ?, ?, ?, ?)", insertParametersList);
//表的字段timestamp、ip、userID、adID、province、city、clickedCount
List<Object[]> updateParametersList = new ArrayList<Object[]>();
for(UserAdClicked updateRecord : updating) {
updateParametersList.add(new Object[] {
updateRecord.getTimestamp(),
updateRecord.getIp(),
updateRecord.getUserID(),
updateRecord.getAdID(),
updateRecord.getProvince(),
updateRecord.getCity(),
updateRecord.getClickedCount() + 1
});
}
jdbcWrapper.doBatch("UPDATE adclicked SET clickedCount = ? WHERE"
+ " timestamp =? AND ip = ? AND userID = ? AND adID = ? "
+ "AND province = ? AND city = ?", updateParametersList);
}
});
return null;
}
});
//再次過濾,從數(shù)據(jù)庫中讀取數(shù)據(jù)過濾黑名單
JavaPairDStream<String, Long> blackListBasedOnHistory = filterClickedBatch.filter(new Function<Tuple2<String,Long>, Boolean>() {
public Boolean call(Tuple2<String, Long> v1) throws Exception {
//廣告點擊的基本數(shù)據(jù)格式:timestamp,ip,userID,adID,province,city
String[] splited = v1._1.split("\t"); //提取key值
String date =splited[0];
String userID =splited[2];
String adID =splited[3];
//查詢一下數(shù)據(jù)庫同一個用戶同一個廣告id點擊量超過50次列入黑名單
//接下來 根據(jù)date、userID、adID條件去查詢用戶點擊廣告的數(shù)據(jù)表,獲得總的點擊次數(shù)
//這個時候基于點擊次數(shù)判斷是否屬于黑名單點擊
int clickedCountTotalToday = 81 ;
if (clickedCountTotalToday > 50) {
return true;
}else {
return false ;
}
}
});
//map操作,找出用戶的id
JavaDStream<String> blackListuserIDBasedInBatchOnhistroy =blackListBasedOnHistory.map(new Function<Tuple2<String,Long>, String>() {
public String call(Tuple2<String, Long> v1) throws Exception {
// TODO Auto-generated method stub
return v1._1.split("\t")[2];
}
});
//有一個問題,數(shù)據(jù)可能重復,在一個partition里面重復,這個好辦;
//但多個partition不能保證一個用戶重復,需要對黑名單的整個rdd進行去重操作。
//rdd去重了,partition也就去重了,一石二鳥,一箭雙雕// 找出了黑名單,下一步就寫入黑名單數(shù)據(jù)庫表中
JavaDStream<String> blackListUniqueuserBasedInBatchOnhistroy = blackListuserIDBasedInBatchOnhistroy.transform(new Function<JavaRDD<String>, JavaRDD<String>>() {
public JavaRDD<String> call(JavaRDD<String> rdd) throws Exception {
// TODO Auto-generated method stub
return rdd.distinct();
}
});
// 下一步寫入到數(shù)據(jù)表中
blackListUniqueuserBasedInBatchOnhistroy.foreachRDD(new Function<JavaRDD<String>, Void>() {
public Void call(JavaRDD<String> rdd) throws Exception {
rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
public void call(Iterator<String> t) throws Exception {
// TODO Auto-generated method stub
//插入的用戶信息可以只包含:useID
//此時直接插入黑名單數(shù)據(jù)表即可。
//寫入數(shù)據(jù)庫
List<Object[]> blackList = new ArrayList<Object[]>();
while(t.hasNext()) {
blackList.add(new Object[]{t.next()});
}
JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();
jdbcWrapper.doBatch("INSERT INTO blacklisttable values (?)", blackList);
}
});
return null;
}
});
/**廣告點擊累計動態(tài)更新,每個updateStateByKey都會在Batch Duration的時間間隔的基礎上進行廣告點擊次數(shù)的更新,
* 更新之后我們一般都會持久化到外部存儲設備上,在這里我們存儲到MySQL數(shù)據(jù)庫中
*/
JavaPairDStream<String, Long> updateStateByKeyDSteam = filteredadClickedStreaming.mapToPair(new PairFunction<Tuple2<String,String>, String, Long>() {
public Tuple2<String, Long> call(Tuple2<String, String> t)
throws Exception {
String[] splited=t._2.split("\t");
String timestamp = splited[0]; //YYYY-MM-DD
String ip = splited[1];
String userID = splited[2];
String adID = splited[3];
String province = splited[4];
String city = splited[5];
String clickedRecord = timestamp + "_" +ip + "_"+userID+"_"+adID+"_"
+province +"_"+city;
return new Tuple2<String, Long>(clickedRecord, 1L);
}
}).updateStateByKey(new Function2<List<Long>, Optional<Long>, Optional<Long>>() {
public Optional<Long> call(List<Long> v1, Optional<Long> v2)
throws Exception {
// v1:當前的Key在當前的Batch Duration中出現(xiàn)的次數(shù)的集合,例如{1,1,1,。。。,1}
// v2:當前的Key在以前的Batch Duration中積累下來的結果;
Long clickedTotalHistory = 0L;
if(v2.isPresent()){
clickedTotalHistory = v2.get();
}
for(Long one : v1) {
clickedTotalHistory += one;
}
return Optional.of(clickedTotalHistory);
}
});
updateStateByKeyDSteam.foreachRDD(new Function<JavaPairRDD<String,Long>, Void>() {
public Void call(JavaPairRDD<String, Long> rdd) throws Exception {
rdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String,Long>>>() {
public void call(Iterator<Tuple2<String, Long>> partition) throws Exception {
//使用數(shù)據(jù)庫連接池的高效讀寫數(shù)據(jù)庫的方式將數(shù)據(jù)寫入數(shù)據(jù)庫mysql
//例如一次插入 1000條 records,使用insertBatch 或 updateBatch
//插入的用戶數(shù)據(jù)信息:timestamp、adID、province、city
//這里面有一個問題,可能出現(xiàn)兩條記錄的key是一樣的,此時需要更新累加操作
List<AdClicked> AdClickedList = new ArrayList<AdClicked>();
while(partition.hasNext()) {
Tuple2<String, Long> record = partition.next();
String[] splited = record._1.split("\t");
AdClicked adClicked = new AdClicked();
adClicked.setTimestamp(splited[0]);
adClicked.setAdID(splited[1]);
adClicked.setProvince(splited[2]);
adClicked.setCity(splited[3]);
adClicked.setClickedCount(record._2);
AdClickedList.add(adClicked);
}
final List<AdClicked> inserting = new ArrayList<AdClicked>();
final List<AdClicked> updating = new ArrayList<AdClicked>();
JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();
//表的字段timestamp、ip、userID、adID、province、city、clickedCount
for(final AdClicked clicked : AdClickedList) {
jdbcWrapper.doQuery("SELECT clickedCount FROM adclickedcount WHERE"
+ " timestamp = ? AND adID = ? AND province = ? AND city = ?",
new Object[]{clicked.getTimestamp(), clicked.getAdID(),
clicked.getProvince(), clicked.getCity()}, new ExecuteCallBack() {
public void resultCallBack(ResultSet result) throws Exception {
// TODO Auto-generated method stub
if(result.next()) {
long count = result.getLong(1);
clicked.setClickedCount(count);
updating.add(clicked);
} else {
inserting.add(clicked);
clicked.setClickedCount(1L);
}
}
});
}
//表的字段timestamp、ip、userID、adID、province、city、clickedCount
List<Object[]> insertParametersList = new ArrayList<Object[]>();
for(AdClicked insertRecord : inserting) {
insertParametersList.add(new Object[] {
insertRecord.getTimestamp(),
insertRecord.getAdID(),
insertRecord.getProvince(),
insertRecord.getCity(),
insertRecord.getClickedCount()
});
}
jdbcWrapper.doBatch("INSERT INTO adclickedcount VALUES(?, ?, ?, ?, ?)", insertParametersList);
//表的字段timestamp、ip、userID、adID、province、city、clickedCount
List<Object[]> updateParametersList = new ArrayList<Object[]>();
for(AdClicked updateRecord : updating) {
updateParametersList.add(new Object[] {
updateRecord.getClickedCount(),
updateRecord.getTimestamp(),
updateRecord.getAdID(),
updateRecord.getProvince(),
updateRecord.getCity()
});
}
jdbcWrapper.doBatch("UPDATE adclickedcount SET clickedCount = ? WHERE"
+ " timestamp =? AND adID = ? AND province = ? AND city = ?", updateParametersList);
}
});
return null;
}
});
/**
* 對廣告點擊進行TopN計算,計算出每天每個省份Top5排名的廣告
* 因為我們直接對RDD進行操作,所以使用了transfomr算子;
*/
updateStateByKeyDSteam.transform(new Function<JavaPairRDD<String,Long>, JavaRDD<Row>>() {
public JavaRDD<Row> call(JavaPairRDD<String, Long> rdd) throws Exception {
JavaRDD<Row> rowRDD = rdd.mapToPair(new PairFunction<Tuple2<String,Long>, String, Long>() {
public Tuple2<String, Long> call(Tuple2<String, Long> t)
throws Exception {
// TODO Auto-generated method stub
String[] splited=t._1.split("_");
String timestamp = splited[0]; //YYYY-MM-DD
String adID = splited[3];
String province = splited[4];
String clickedRecord = timestamp + "_" + adID + "_" + province;
return new Tuple2<String, Long>(clickedRecord, t._2);
}
}).reduceByKey(new Function2<Long, Long, Long>() {
public Long call(Long v1, Long v2) throws Exception {
// TODO Auto-generated method stub
return v1 + v2;
}
}).map(new Function<Tuple2<String,Long>, Row>() {
public Row call(Tuple2<String, Long> v1) throws Exception {
// TODO Auto-generated method stub
String[] splited=v1._1.split("_");
String timestamp = splited[0]; //YYYY-MM-DD
String adID = splited[3];
String province = splited[4];
return RowFactory.create(timestamp, adID, province, v1._2);
}
});
StructType structType = DataTypes.createStructType(Arrays.asList(
DataTypes.createStructField("timestamp", DataTypes.StringType, true),
DataTypes.createStructField("adID", DataTypes.StringType, true),
DataTypes.createStructField("province", DataTypes.StringType, true),
DataTypes.createStructField("clickedCount", DataTypes.LongType, true)
));
HiveContext hiveContext = new HiveContext(rdd.context());
DataFrame df = hiveContext.createDataFrame(rowRDD, structType);
df.registerTempTable("topNTableSource");
DataFrame result = hiveContext.sql("SELECT timestamp, adID, province, clickedCount, FROM"
+ " (SELECT timestamp, adID, province,clickedCount, "
+ "ROW_NUMBER() OVER(PARTITION BY province ORDER BY clickeCount DESC) rank "
+ "FROM topNTableSource) subquery "
+ "WHERE rank <= 5");
return result.toJavaRDD();
}
}).foreachRDD(new Function<JavaRDD<Row>, Void>() {
public Void call(JavaRDD<Row> rdd) throws Exception {
// TODO Auto-generated method stub
rdd.foreachPartition(new VoidFunction<Iterator<Row>>() {
public void call(Iterator<Row> t) throws Exception {
// TODO Auto-generated method stub
List<AdProvinceTopN> adProvinceTopN = new ArrayList<AdProvinceTopN>();
while(t.hasNext()) {
Row row = t.next();
AdProvinceTopN item = new AdProvinceTopN();
item.setTimestamp(row.getString(0));
item.setAdID(row.getString(1));
item.setProvince(row.getString(2));
item.setClickedCount(row.getLong(3));
adProvinceTopN.add(item);
}
// final List<AdProvinceTopN> inserting = new ArrayList<AdProvinceTopN>();
// final List<AdProvinceTopN> updating = new ArrayList<AdProvinceTopN>();
JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();
Set<String> set = new HashSet<String>();
for(AdProvinceTopN item: adProvinceTopN){
set.add(item.getTimestamp() + "_" + item.getProvince());
}
//表的字段timestamp、adID、province、clickedCount
ArrayList<Object[]> deleteParametersList = new ArrayList<Object[]>();
for(String deleteRecord : set) {
String[] splited = deleteRecord.split("_");
deleteParametersList.add(new Object[]{
splited[0],
splited[1]
});
}
jdbcWrapper.doBatch("DELETE FROM adprovincetopn WHERE timestamp = ? AND province = ?", deleteParametersList);
//表的字段timestamp、ip、userID、adID、province、city、clickedCount
List<Object[]> insertParametersList = new ArrayList<Object[]>();
for(AdProvinceTopN insertRecord : adProvinceTopN) {
insertParametersList.add(new Object[] {
insertRecord.getClickedCount(),
insertRecord.getTimestamp(),
insertRecord.getAdID(),
insertRecord.getProvince()
});
}
jdbcWrapper.doBatch("INSERT INTO adprovincetopn VALUES (?, ?, ?, ?)", insertParametersList);
}
});
return null;
}
});
/**
* 計算過去半個小時內廣告點擊的趨勢
* 廣告點擊的基本數(shù)據(jù)格式:timestamp、ip、userID、adID、province、city
*/
filteredadClickedStreaming.mapToPair(new PairFunction<Tuple2<String,String>, String, Long>() {
public Tuple2<String, Long> call(Tuple2<String, String> t)
throws Exception {
String splited[] = t._2.split("\t");
String adID = splited[3];
String time = splited[0]; //Todo:后續(xù)需要重構代碼實現(xiàn)時間戳和分鐘的轉換提取。此處需要提取出該廣告的點擊分鐘單位
return new Tuple2<String, Long>(time + "_" + adID, 1L);
}
}).reduceByKeyAndWindow(new Function2<Long, Long, Long>() {
public Long call(Long v1, Long v2) throws Exception {
// TODO Auto-generated method stub
return v1 + v2;
}
}, new Function2<Long, Long, Long>() {
public Long call(Long v1, Long v2) throws Exception {
// TODO Auto-generated method stub
return v1 - v2;
}
}, Durations.minutes(30), Durations.milliseconds(5)).foreachRDD(new Function<JavaPairRDD<String,Long>, Void>() {
public Void call(JavaPairRDD<String, Long> rdd) throws Exception {
// TODO Auto-generated method stub
rdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String,Long>>>() {
public void call(Iterator<Tuple2<String, Long>> partition)
throws Exception {
List<AdTrendStat> adTrend = new ArrayList<AdTrendStat>();
// TODO Auto-generated method stub
while(partition.hasNext()) {
Tuple2<String, Long> record = partition.next();
String[] splited = record._1.split("_");
String time = splited[0];
String adID = splited[1];
Long clickedCount = record._2;
/**
* 在插入數(shù)據(jù)到數(shù)據(jù)庫的時候具體需要哪些字段?time、adID、clickedCount;
* 而我們通過J2EE技術進行趨勢繪圖的時候肯定是需要年、月、日、時、分這個維度的,所以我們在這里需要
* 年月日、小時、分鐘這些時間維度;
*/
AdTrendStat adTrendStat = new AdTrendStat();
adTrendStat.setAdID(adID);
adTrendStat.setClickedCount(clickedCount);
adTrendStat.set_date(time); //Todo:獲取年月日
adTrendStat.set_hour(time); //Todo:獲取小時
adTrendStat.set_minute(time);//Todo:獲取分鐘
adTrend.add(adTrendStat);
}
final List<AdTrendStat> inserting = new ArrayList<AdTrendStat>();
final List<AdTrendStat> updating = new ArrayList<AdTrendStat>();
JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();
//表的字段timestamp、ip、userID、adID、province、city、clickedCount
for(final AdTrendStat trend : adTrend) {
final AdTrendCountHistory adTrendhistory = new AdTrendCountHistory();
jdbcWrapper.doQuery("SELECT clickedCount FROM adclickedtrend WHERE"
+ " date =? AND hour = ? AND minute = ? AND AdID = ?",
new Object[]{trend.get_date(), trend.get_hour(), trend.get_minute(),
trend.getAdID()}, new ExecuteCallBack() {
public void resultCallBack(ResultSet result) throws Exception {
// TODO Auto-generated method stub
if(result.next()) {
long count = result.getLong(1);
adTrendhistory.setClickedCountHistoryLong(count);
updating.add(trend);
} else {
inserting.add(trend);
}
}
});
}
//表的字段date、hour、minute、adID、clickedCount
List<Object[]> insertParametersList = new ArrayList<Object[]>();
for(AdTrendStat insertRecord : inserting) {
insertParametersList.add(new Object[] {
insertRecord.get_date(),
insertRecord.get_hour(),
insertRecord.get_minute(),
insertRecord.getAdID(),
insertRecord.getClickedCount()
});
}
jdbcWrapper.doBatch("INSERT INTO adclickedtrend VALUES(?, ?, ?, ?, ?)", insertParametersList);
//表的字段date、hour、minute、adID、clickedCount
List<Object[]> updateParametersList = new ArrayList<Object[]>();
for(AdTrendStat updateRecord : updating) {
updateParametersList.add(new Object[] {
updateRecord.getClickedCount(),
updateRecord.get_date(),
updateRecord.get_hour(),
updateRecord.get_minute(),
updateRecord.getAdID()
});
}
jdbcWrapper.doBatch("UPDATE adclickedtrend SET clickedCount = ? WHERE"
+ " date =? AND hour = ? AND minute = ? AND AdID = ?"
, updateParametersList);
}
});
return null;
}
});;
/**
* Spark Streaming 執(zhí)行引擎也就是Driver開始運行,Driver啟動的時候是位于一條新的線程中的,當然其內部有消息循環(huán)體,用于
* 接收應用程序本身或者Executor中的消息,
*/
javassc.start();
javassc.awaitTermination();
javassc.close();
}
private static JavaStreamingContext createContext(String checkpointDirectory, SparkConf conf) {
// If you do not see this printed, that means the StreamingContext has been loaded
// from the new checkpoint
System.out.println("Creating new context");
// Create the context with a 5 second batch size
JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(10));
ssc.checkpoint(checkpointDirectory);
return ssc;
}
}
class JDBCWrapper {
private static JDBCWrapper jdbcInstance = null;
private static LinkedBlockingQueue<Connection> dbConnectionPool = new LinkedBlockingQueue<Connection>();
static {
try {
Class.forName("com.mysql.jdbc.Driver");
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public static JDBCWrapper getJDBCInstance() {
if(jdbcInstance == null) {
synchronized (JDBCWrapper.class) {
if(jdbcInstance == null) {
jdbcInstance = new JDBCWrapper();
}
}
}
return jdbcInstance;
}
private JDBCWrapper() {
for(int i = 0; i < 10; i++){
try {
Connection conn = DriverManager.getConnection("jdbc:mysql://Master:3306/sparkstreaming","root", "root");
dbConnectionPool.put(conn);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public synchronized Connection getConnection() {
while(0 == dbConnectionPool.size()){
try {
Thread.sleep(20);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
return dbConnectionPool.poll();
}
public int[] doBatch(String sqlText, List<Object[]> paramsList){
Connection conn = getConnection();
PreparedStatement preparedStatement = null;
int[] result = null;
try {
conn.setAutoCommit(false);
preparedStatement = conn.prepareStatement(sqlText);
for(Object[] parameters: paramsList) {
for(int i = 0; i < parameters.length; i++){
preparedStatement.setObject(i + 1, parameters[i]);
}
preparedStatement.addBatch();
}
result = preparedStatement.executeBatch();
conn.commit();
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
if(preparedStatement != null) {
try {
preparedStatement.close();
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if(conn != null) {
try {
dbConnectionPool.put(conn);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
return result;
}
public void doQuery(String sqlText, Object[] paramsList, ExecuteCallBack callback){
Connection conn = getConnection();
PreparedStatement preparedStatement = null;
ResultSet result = null;
try {
preparedStatement = conn.prepareStatement(sqlText);
for(int i = 0; i < paramsList.length; i++){
preparedStatement.setObject(i + 1, paramsList[i]);
}
result = preparedStatement.executeQuery();
try {
callback.resultCallBack(result);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
if(preparedStatement != null) {
try {
preparedStatement.close();
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if(conn != null) {
try {
dbConnectionPool.put(conn);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
interface ExecuteCallBack {
void resultCallBack(ResultSet result) throws Exception;
}
class UserAdClicked {
private String timestamp;
private String ip;
private String userID;
private String adID;
private String province;
private String city;
private Long clickedCount;
public String getTimestamp() {
return timestamp;
}
public void setTimestamp(String timestamp) {
this.timestamp = timestamp;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public String getUserID() {
return userID;
}
public void setUserID(String userID) {
this.userID = userID;
}
public String getAdID() {
return adID;
}
public void setAdID(String adID) {
this.adID = adID;
}
public String getProvince() {
return province;
}
public void setProvince(String province) {
this.province = province;
}
public String getCity() {
return city;
}
public void setCity(String city) {
this.city = city;
}
public Long getClickedCount() {
return clickedCount;
}
public void setClickedCount(Long clickedCount) {
this.clickedCount = clickedCount;
}
}
class AdClicked {
private String timestamp;
private String adID;
private String province;
private String city;
private Long clickedCount;
public String getTimestamp() {
return timestamp;
}
public void setTimestamp(String timestamp) {
this.timestamp = timestamp;
}
public String getAdID() {
return adID;
}
public void setAdID(String adID) {
this.adID = adID;
}
public String getProvince() {
return province;
}
public void setProvince(String province) {
this.province = province;
}
public String getCity() {
return city;
}
public void setCity(String city) {
this.city = city;
}
public Long getClickedCount() {
return clickedCount;
}
public void setClickedCount(Long clickedCount) {
this.clickedCount = clickedCount;
}
}
class AdProvinceTopN {
private String timestamp;
private String adID;
private String province;
private Long clickedCount;
public String getTimestamp() {
return timestamp;
}
public void setTimestamp(String timestamp) {
this.timestamp = timestamp;
}
public String getAdID() {
return adID;
}
public void setAdID(String adID) {
this.adID = adID;
}
public String getProvince() {
return province;
}
public void setProvince(String province) {
this.province = province;
}
public Long getClickedCount() {
return clickedCount;
}
public void setClickedCount(Long clickedCount) {
this.clickedCount = clickedCount;
}
}
class AdTrendStat {
private String _date;
private String _hour;
private String _minute;
private String adID;
private Long clickedCount;
public String get_date() {
return _date;
}
public void set_date(String _date) {
this._date = _date;
}
public String get_hour() {
return _hour;
}
public void set_hour(String _hour) {
this._hour = _hour;
}
public String get_minute() {
return _minute;
}
public void set_minute(String _minute) {
this._minute = _minute;
}
public String getAdID() {
return adID;
}
public void setAdID(String adID) {
this.adID = adID;
}
public Long getClickedCount() {
return clickedCount;
}
public void setClickedCount(Long clickedCount) {
this.clickedCount = clickedCount;
}
}
class AdTrendCountHistory{
private Long clickedCountHistoryLong;
public Long getClickedCountHistoryLong() {
return clickedCountHistoryLong;
}
public void setClickedCountHistoryLong(Long clickedCountHistoryLong) {
this.clickedCountHistoryLong = clickedCountHistoryLong;
}
}
|