【注】該系列文章以及使用到安裝包/測試數(shù)據(jù) 可以在《傾情大奉送--Spark入門實(shí)戰(zhàn)系列》獲取1、SparkSQL的發(fā)展歷程1.1 Hive and SharkSparkSQL的前身是Shark,給熟悉RDBMS但又不理解MapReduce的技術(shù)人員提供快速上手的工具,Hive應(yīng)運(yùn)而生,它是當(dāng)時(shí)唯一運(yùn)行在Hadoop上的SQL-on-Hadoop工具。但是MapReduce計(jì)算過程中大量的中間磁盤落地過程消耗了大量的I/O,降低的運(yùn)行效率,為了提高SQL-on-Hadoop的效率,大量的SQL-on-Hadoop工具開始產(chǎn)生,其中表現(xiàn)較為突出的是: l MapR的Drill l Cloudera的Impala l Shark 其中Shark是伯克利實(shí)驗(yàn)室Spark生態(tài)環(huán)境的組件之一,它修改了下圖所示的右下角的內(nèi)存管理、物理計(jì)劃、執(zhí)行三個(gè)模塊,并使之能運(yùn)行在Spark引擎上,從而使得SQL查詢的速度得到10-100倍的提升。 1.2 Shark和SparkSQL但是,隨著Spark的發(fā)展,對(duì)于野心勃勃的Spark團(tuán)隊(duì)來說,Shark對(duì)于Hive的太多依賴(如采用Hive的語法解析器、查詢優(yōu)化器等等),制約了Spark的One Stack Rule Them All的既定方針,制約了Spark各個(gè)組件的相互集成,所以提出了SparkSQL項(xiàng)目。SparkSQL拋棄原有Shark的代碼,汲取了Shark的一些優(yōu)點(diǎn),如內(nèi)存列存儲(chǔ)(In-Memory Columnar Storage)、Hive兼容性等,重新開發(fā)了SparkSQL代碼;由于擺脫了對(duì)Hive的依賴性,SparkSQL無論在數(shù)據(jù)兼容、性能優(yōu)化、組件擴(kuò)展方面都得到了極大的方便,真可謂“退一步,海闊天空”。 l數(shù)據(jù)兼容方面 不但兼容Hive,還可以從RDD、parquet文件、JSON文件中獲取數(shù)據(jù),未來版本甚至支持獲取RDBMS數(shù)據(jù)以及cassandra等NOSQL數(shù)據(jù); l性能優(yōu)化方面 除了采取In-Memory Columnar Storage、byte-code generation等優(yōu)化技術(shù)外、將會(huì)引進(jìn)Cost Model對(duì)查詢進(jìn)行動(dòng)態(tài)評(píng)估、獲取最佳物理計(jì)劃等等; l組件擴(kuò)展方面 無論是SQL的語法解析器、分析器還是優(yōu)化器都可以重新定義,進(jìn)行擴(kuò)展。 2014年6月1日Shark項(xiàng)目和SparkSQL項(xiàng)目的主持人Reynold Xin宣布:停止對(duì)Shark的開發(fā),團(tuán)隊(duì)將所有資源放SparkSQL項(xiàng)目上,至此,Shark的發(fā)展畫上了句話,但也因此發(fā)展出兩個(gè)直線:SparkSQL和Hive on Spark。 其中SparkSQL作為Spark生態(tài)的一員繼續(xù)發(fā)展,而不再受限于Hive,只是兼容Hive;而Hive on Spark是一個(gè)Hive的發(fā)展計(jì)劃,該計(jì)劃將Spark作為Hive的底層引擎之一,也就是說,Hive將不再受限于一個(gè)引擎,可以采用Map-Reduce、Tez、Spark等引擎。 1.3 SparkSQL的性能Shark的出現(xiàn),使得SQL-on-Hadoop的性能比Hive有了10-100倍的提高: 那么,擺脫了Hive的限制,SparkSQL的性能又有怎么樣的表現(xiàn)呢?雖然沒有Shark相對(duì)于Hive那樣矚目地性能提升,但也表現(xiàn)得非常優(yōu)異: 為什么SparkSQL的性能會(huì)得到怎么大的提升呢?主要SparkSQL在下面幾點(diǎn)做了優(yōu)化: A:內(nèi)存列存儲(chǔ)(In-Memory Columnar Storage) SparkSQL的表數(shù)據(jù)在內(nèi)存中存儲(chǔ)不是采用原生態(tài)的JVM對(duì)象存儲(chǔ)方式,而是采用內(nèi)存列存儲(chǔ),如下圖所示。 該存儲(chǔ)方式無論在空間占用量和讀取吞吐率上都占有很大優(yōu)勢。 對(duì)于原生態(tài)的JVM對(duì)象存儲(chǔ)方式,每個(gè)對(duì)象通常要增加12-16字節(jié)的額外開銷,對(duì)于一個(gè)270MB的TPC-H lineitem table數(shù)據(jù),使用這種方式讀入內(nèi)存,要使用970MB左右的內(nèi)存空間(通常是2~5倍于原生數(shù)據(jù)空間);另外,使用這種方式,每個(gè)數(shù)據(jù)記錄產(chǎn)生一個(gè)JVM對(duì)象,如果是大小為200B的數(shù)據(jù)記錄,32G的堆棧將產(chǎn)生1.6億個(gè)對(duì)象,這么多的對(duì)象,對(duì)于GC來說,可能要消耗幾分鐘的時(shí)間來處理(JVM的垃圾收集時(shí)間與堆棧中的對(duì)象數(shù)量呈線性相關(guān))。顯然這種內(nèi)存存儲(chǔ)方式對(duì)于基于內(nèi)存計(jì)算的Spark來說,很昂貴也負(fù)擔(dān)不起。 對(duì)于內(nèi)存列存儲(chǔ)來說,將所有原生數(shù)據(jù)類型的列采用原生數(shù)組來存儲(chǔ),將Hive支持的復(fù)雜數(shù)據(jù)類型(如array、map等)先序化后并接成一個(gè)字節(jié)數(shù)組來存儲(chǔ)。這樣,每個(gè)列創(chuàng)建一個(gè)JVM對(duì)象,從而導(dǎo)致可以快速的GC和緊湊的數(shù)據(jù)存儲(chǔ);額外的,還可以使用低廉CPU開銷的高效壓縮方法(如字典編碼、行長度編碼等壓縮方法)降低內(nèi)存開銷;更有趣的是,對(duì)于分析查詢中頻繁使用的聚合特定列,性能會(huì)得到很大的提高,原因就是這些列的數(shù)據(jù)放在一起,更容易讀入內(nèi)存進(jìn)行計(jì)算。 B:字節(jié)碼生成技術(shù)(bytecode generation,即CG) 在數(shù)據(jù)庫查詢中有一個(gè)昂貴的操作是查詢語句中的表達(dá)式,主要是由于JVM的內(nèi)存模型引起的。比如如下一個(gè)查詢: SELECT a + b FROM table 在這個(gè)查詢里,如果采用通用的SQL語法途徑去處理,會(huì)先生成一個(gè)表達(dá)式樹(有兩個(gè)節(jié)點(diǎn)的Add樹,參考后面章節(jié)),在物理處理這個(gè)表達(dá)式樹的時(shí)候,將會(huì)如圖所示的7個(gè)步驟: 1. 調(diào)用虛函數(shù)Add.eval(),需要確認(rèn)Add兩邊的數(shù)據(jù)類型 2. 調(diào)用虛函數(shù)a.eval(),需要確認(rèn)a的數(shù)據(jù)類型 3. 確定a的數(shù)據(jù)類型是Int,裝箱 4. 調(diào)用虛函數(shù)b.eval(),需要確認(rèn)b的數(shù)據(jù)類型 5. 確定b的數(shù)據(jù)類型是Int,裝箱 6. 調(diào)用Int類型的Add 7. 返回裝箱后的計(jì)算結(jié)果 其中多次涉及到虛函數(shù)的調(diào)用,虛函數(shù)的調(diào)用會(huì)打斷CPU的正常流水線處理,減緩執(zhí)行。 Spark1.1.0在catalyst模塊的expressions增加了codegen模塊,如果使用動(dòng)態(tài)字節(jié)碼生成技術(shù)(配置spark.sql.codegen參數(shù)),SparkSQL在執(zhí)行物理計(jì)劃的時(shí)候,對(duì)匹配的表達(dá)式采用特定的代碼,動(dòng)態(tài)編譯,然后運(yùn)行。如上例子,匹配到Add方法: 然后,通過調(diào)用,最終調(diào)用: 最終實(shí)現(xiàn)效果類似如下偽代碼: val a: Int = inputRow.getInt(0) val b: Int = inputRow.getInt(1) val result: Int = a + b resultRow.setInt(0, result) 對(duì)于Spark1.1.0,對(duì)SQL表達(dá)式都作了CG優(yōu)化,具體可以參看codegen模塊。CG優(yōu)化的實(shí)現(xiàn)主要還是依靠scala2.10的運(yùn)行時(shí)放射機(jī)制(runtime reflection)。對(duì)于SQL查詢的CG優(yōu)化,可以簡單地用下圖來表示: C:Scala代碼優(yōu)化 另外,SparkSQL在使用Scala編寫代碼的時(shí)候,盡量避免低效的、容易GC的代碼;盡管增加了編寫代碼的難度,但對(duì)于用戶來說,還是使用統(tǒng)一的接口,沒受到使用上的困難。下圖是一個(gè)Scala代碼優(yōu)化的示意圖: 2、 SparkSQL運(yùn)行架構(gòu)類似于關(guān)系型數(shù)據(jù)庫,SparkSQL也是語句也是由Projection(a1,a2,a3)、Data Source(tableA)、Filter(condition)組成,分別對(duì)應(yīng)sql查詢過程中的Result、Data Source、Operation,也就是說SQL語句按Result-->Data Source-->Operation的次序來描述的。
當(dāng)執(zhí)行SparkSQL語句的順序?yàn)椋?/span> 1.對(duì)讀入的SQL語句進(jìn)行解析(Parse),分辨出SQL語句中哪些詞是關(guān)鍵詞(如SELECT、FROM、WHERE),哪些是表達(dá)式、哪些是Projection、哪些是Data Source等,從而判斷SQL語句是否規(guī)范; 2.將SQL語句和數(shù)據(jù)庫的數(shù)據(jù)字典(列、表、視圖等等)進(jìn)行綁定(Bind),如果相關(guān)的Projection、Data Source等都是存在的話,就表示這個(gè)SQL語句是可以執(zhí)行的; 3.一般的數(shù)據(jù)庫會(huì)提供幾個(gè)執(zhí)行計(jì)劃,這些計(jì)劃一般都有運(yùn)行統(tǒng)計(jì)數(shù)據(jù),數(shù)據(jù)庫會(huì)在這些計(jì)劃中選擇一個(gè)最優(yōu)計(jì)劃(Optimize); 4.計(jì)劃執(zhí)行(Execute),按Operation-->Data Source-->Result的次序來進(jìn)行的,在執(zhí)行過程有時(shí)候甚至不需要讀取物理表就可以返回結(jié)果,比如重新運(yùn)行剛運(yùn)行過的SQL語句,可能直接從數(shù)據(jù)庫的緩沖池中獲取返回結(jié)果。 2.1 Tree和RuleSparkSQL對(duì)SQL語句的處理和關(guān)系型數(shù)據(jù)庫對(duì)SQL語句的處理采用了類似的方法,首先會(huì)將SQL語句進(jìn)行解析(Parse),然后形成一個(gè)Tree,在后續(xù)的如綁定、優(yōu)化等處理過程都是對(duì)Tree的操作,而操作的方法是采用Rule,通過模式匹配,對(duì)不同類型的節(jié)點(diǎn)采用不同的操作。在整個(gè)sql語句的處理過程中,Tree和Rule相互配合,完成了解析、綁定(在SparkSQL中稱為Analysis)、優(yōu)化、物理計(jì)劃等過程,最終生成可以執(zhí)行的物理計(jì)劃。 2.1.1 Treel Tree的相關(guān)代碼定義在sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees l Logical Plans、Expressions、Physical Operators都可以使用Tree表示 l Tree的具體操作是通過TreeNode來實(shí)現(xiàn)的 ? SparkSQL定義了catalyst.trees的日志,通過這個(gè)日志可以形象的表示出樹的結(jié)構(gòu) ? TreeNode可以使用scala的集合操作方法(如foreach, map, flatMap, collect等)進(jìn)行操作 ? 有了TreeNode,通過Tree中各個(gè)TreeNode之間的關(guān)系,可以對(duì)Tree進(jìn)行遍歷操作,如使用transformDown、transformUp將Rule應(yīng)用到給定的樹段,然后用結(jié)果替代舊的樹段;也可以使用transformChildrenDown、transformChildrenUp對(duì)一個(gè)給定的節(jié)點(diǎn)進(jìn)行操作,通過迭代將Rule應(yīng)用到該節(jié)點(diǎn)以及子節(jié)點(diǎn)。 l TreeNode可以細(xì)分成三種類型的Node: ? UnaryNode 一元節(jié)點(diǎn),即只有一個(gè)子節(jié)點(diǎn)。如Limit、Filter操作 ? BinaryNode 二元節(jié)點(diǎn),即有左右子節(jié)點(diǎn)的二叉節(jié)點(diǎn)。如Jion、Union操作 ? LeafNode 葉子節(jié)點(diǎn),沒有子節(jié)點(diǎn)的節(jié)點(diǎn)。主要用戶命令類操作,如SetCommand
2.1.2 Rulel Rule的相關(guān)代碼定義在sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules l Rule在SparkSQL的Analyzer、Optimizer、SparkPlan等各個(gè)組件中都有應(yīng)用到 l Rule是一個(gè)抽象類,具體的Rule實(shí)現(xiàn)是通過RuleExecutor完成 l Rule通過定義batch和batchs,可以簡便的、模塊化地對(duì)Tree進(jìn)行transform操作 l Rule通過定義Once和FixedPoint,可以對(duì)Tree進(jìn)行一次操作或多次操作(如對(duì)某些Tree進(jìn)行多次迭代操作的時(shí)候,達(dá)到FixedPoint次數(shù)迭代或達(dá)到前后兩次的樹結(jié)構(gòu)沒變化才停止操作,具體參看RuleExecutor.apply) 2.2 sqlContext和hiveContext的運(yùn)行過程SparkSQL有兩個(gè)分支,sqlContext和hiveContext,sqlContext現(xiàn)在只支持SQL語法解析器(SQL-92語法);hiveContext現(xiàn)在支持SQL語法解析器和hivesql語法解析器,默認(rèn)為hiveSQL語法解析器,用戶可以通過配置切換成SQL語法解析器,來運(yùn)行hiveSQL不支持的語法, 2.2.1 sqlContext的運(yùn)行過程sqlContext總的一個(gè)過程如下圖所示: 1.SQL語句經(jīng)過SqlParse解析成UnresolvedLogicalPlan; 2.使用analyzer結(jié)合數(shù)據(jù)數(shù)據(jù)字典(catalog)進(jìn)行綁定,生成resolvedLogicalPlan; 3.使用optimizer對(duì)resolvedLogicalPlan進(jìn)行優(yōu)化,生成optimizedLogicalPlan; 4.使用SparkPlan將LogicalPlan轉(zhuǎn)換成PhysicalPlan; 5.使用prepareForExecution()將PhysicalPlan轉(zhuǎn)換成可執(zhí)行物理計(jì)劃; 6.使用execute()執(zhí)行可執(zhí)行物理計(jì)劃; 7.生成SchemaRDD。 在整個(gè)運(yùn)行過程中涉及到多個(gè)SparkSQL的組件,如SqlParse、analyzer、optimizer、SparkPlan等等 2.2.2hiveContext的運(yùn)行過程hiveContext總的一個(gè)過程如下圖所示: 1.SQL語句經(jīng)過HiveQl.parseSql解析成Unresolved LogicalPlan,在這個(gè)解析過程中對(duì)hiveql語句使用getAst()獲取AST樹,然后再進(jìn)行解析; 2.使用analyzer結(jié)合數(shù)據(jù)hive源數(shù)據(jù)Metastore(新的catalog)進(jìn)行綁定,生成resolved LogicalPlan; 3.使用optimizer對(duì)resolved LogicalPlan進(jìn)行優(yōu)化,生成optimized LogicalPlan,優(yōu)化前使用了ExtractPythonUdfs(catalog.PreInsertionCasts(catalog.CreateTables(analyzed)))進(jìn)行預(yù)處理; 4.使用hivePlanner將LogicalPlan轉(zhuǎn)換成PhysicalPlan; 5.使用prepareForExecution()將PhysicalPlan轉(zhuǎn)換成可執(zhí)行物理計(jì)劃; 6.使用execute()執(zhí)行可執(zhí)行物理計(jì)劃; 7.執(zhí)行后,使用map(_.copy)將結(jié)果導(dǎo)入SchemaRDD。 2.3 catalyst優(yōu)化器SparkSQL1.1總體上由四個(gè)模塊組成:core、catalyst、hive、hive-Thriftserver: l core處理數(shù)據(jù)的輸入輸出,從不同的數(shù)據(jù)源獲取數(shù)據(jù)(RDD、Parquet、json等),將查詢結(jié)果輸出成schemaRDD; l catalyst處理查詢語句的整個(gè)處理過程,包括解析、綁定、優(yōu)化、物理計(jì)劃等,說其是優(yōu)化器,還不如說是查詢引擎; l hive對(duì)hive數(shù)據(jù)的處理 l hive-ThriftServer提供CLI和JDBC/ODBC接口 在這四個(gè)模塊中,catalyst處于最核心的部分,其性能優(yōu)劣將影響整體的性能。由于發(fā)展時(shí)間尚短,還有很多不足的地方,但其插件式的設(shè)計(jì),為未來的發(fā)展留下了很大的空間。下面是catalyst的一個(gè)設(shè)計(jì)圖:
其中虛線部分是以后版本要實(shí)現(xiàn)的功能,實(shí)線部分是已經(jīng)實(shí)現(xiàn)的功能。從上圖看,catalyst主要的實(shí)現(xiàn)組件有: lsqlParse,完成sql語句的語法解析功能,目前只提供了一個(gè)簡單的sql解析器; lAnalyzer,主要完成綁定工作,將不同來源的Unresolved LogicalPlan和數(shù)據(jù)元數(shù)據(jù)(如hive metastore、Schema catalog)進(jìn)行綁定,生成resolved LogicalPlan; loptimizer對(duì)resolved LogicalPlan進(jìn)行優(yōu)化,生成optimized LogicalPlan; l Planner將LogicalPlan轉(zhuǎn)換成PhysicalPlan; l CostModel,主要根據(jù)過去的性能統(tǒng)計(jì)數(shù)據(jù),選擇最佳的物理執(zhí)行計(jì)劃 這些組件的基本實(shí)現(xiàn)方法: l 先將sql語句通過解析生成Tree,然后在不同階段使用不同的Rule應(yīng)用到Tree上,通過轉(zhuǎn)換完成各個(gè)組件的功能。 l Analyzer使用Analysis Rules,配合數(shù)據(jù)元數(shù)據(jù)(如hive metastore、Schema catalog),完善Unresolved LogicalPlan的屬性而轉(zhuǎn)換成resolved LogicalPlan; l optimizer使用Optimization Rules,對(duì)resolved LogicalPlan進(jìn)行合并、列裁剪、過濾器下推等優(yōu)化作業(yè)而轉(zhuǎn)換成optimized LogicalPlan; l Planner使用Planning Strategies,對(duì)optimized LogicalPlan 3、SparkSQL CLICLI(Command-Line Interface,命令行界面)是指可在用戶提示符下鍵入可執(zhí)行指令的界面,它通常不支持鼠標(biāo),用戶通過鍵盤輸入指令,計(jì)算機(jī)接收到指令后予以執(zhí)行。Spark CLI指的是使用命令界面直接輸入SQL命令,然后發(fā)送到Spark集群進(jìn)行執(zhí)行,在界面中顯示運(yùn)行過程和最終的結(jié)果。 Spark1.1相較于Spark1.0最大的差別就在于Spark1.1增加了Spark SQL CLI和ThriftServer,使得Hive用戶還有用慣了命令行的RDBMS數(shù)據(jù)庫管理員較容易地上手,真正意義上進(jìn)入了SQL時(shí)代。 【注】Spark CLI和Spark Thrift Server實(shí)驗(yàn)環(huán)境為第二課《Spark編譯與部署(下)--Spark編譯安裝》所搭建 3.1 運(yùn)行環(huán)境說明3.1.1 硬軟件環(huán)境l 主機(jī)操作系統(tǒng):Windows 64位,雙核4線程,主頻2.2G,10G內(nèi)存 l 虛擬軟件:VMware? Workstation 9.0.0 build-812388 l 虛擬機(jī)操作系統(tǒng):CentOS 64位,單核 l 虛擬機(jī)運(yùn)行環(huán)境: ? JDK:1.7.0_55 64位 ? Hadoop:2.2.0(需要編譯為64位) ? Scala:2.11.4 ? Spark:1.1.0(需要編譯) ? Hive:0.13.1 3.1.2 機(jī)器網(wǎng)絡(luò)環(huán)境集群包含三個(gè)節(jié)點(diǎn),節(jié)點(diǎn)之間可以免密碼SSH訪問,節(jié)點(diǎn)IP地址和主機(jī)名分布如下:
3.2 配置并啟動(dòng)3.2.1 創(chuàng)建并配置hive-site.xml在運(yùn)行Spark SQL CLI中需要使用到Hive Metastore,故需要在Spark中添加其uris。具體方法是在SPARK_HOME/conf目錄下創(chuàng)建hive-site.xml文件,然后在該配置文件中,添加hive.metastore.uris屬性,具體如下: <configuration> <property> <name>hive.metastore.uris</name> <value>thrift://hadoop1:9083</value> <description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description> </property> </configuration> 3.2.2 啟動(dòng)Hive在使用Spark SQL CLI之前需要啟動(dòng)Hive Metastore(如果數(shù)據(jù)存放在HDFS文件系統(tǒng),還需要啟動(dòng)Hadoop的HDFS),使用如下命令可以使Hive Metastore啟動(dòng)后運(yùn)行在后臺(tái),可以通過jobs查詢: $nohup hive --service metastore > metastore.log 2>&1 & 3.2.3 啟動(dòng)Spark集群和Spark SQL CLI通過如下命令啟動(dòng)Spark集群和Spark SQL CLI: $cd /app/hadoop/spark-1.1.0 $sbin/start-all.sh $bin/spark-sql --master spark://hadoop1:7077 --executor-memory 1g 在集群監(jiān)控頁面可以看到啟動(dòng)了SparkSQL應(yīng)用程序: 這時(shí)就可以使用HQL語句對(duì)Hive數(shù)據(jù)進(jìn)行查詢,另外可以使用COMMAND,如使用set進(jìn)行設(shè)置參數(shù):默認(rèn)情況下,SparkSQL Shuffle的時(shí)候是200個(gè)partition,可以使用如下命令修改該參數(shù): SET spark.sql.shuffle.partitions=20; 運(yùn)行同一個(gè)查詢語句,參數(shù)改變后,Task(partition)的數(shù)量就由200變成了20。 3.2.4 命令參數(shù)通過bin/spark-sql --help可以查看CLI命令參數(shù): 其中[options] 是CLI啟動(dòng)一個(gè)SparkSQL應(yīng)用程序的參數(shù),如果不設(shè)置--master的話,將在啟動(dòng)spark-sql的機(jī)器以local方式運(yùn)行,只能通過http://機(jī)器名:4040進(jìn)行監(jiān)控;這部分參數(shù),可以參照Spark1.0.0 應(yīng)用程序部署工具spark-submit 的參數(shù)。 [cli option]是CLI的參數(shù),通過這些參數(shù)CLI可以直接運(yùn)行SQL文件、進(jìn)入命令行運(yùn)行SQL命令等等,類似以前的Shark的用法。需要注意的是CLI不是使用JDBC連接,所以不能連接到ThriftServer;但可以配置conf/hive-site.xml連接到Hive的Metastore,然后對(duì)Hive數(shù)據(jù)進(jìn)行查詢。 3.3 實(shí)戰(zhàn)Spark SQL CLI3.3.1 獲取訂單每年的銷售單數(shù)、銷售總額第一步 設(shè)置任務(wù)個(gè)數(shù),在這里修改為20個(gè) spark-sql>SET spark.sql.shuffle.partitions=20; 第二步 運(yùn)行SQL語句 spark-sql>use hive; spark-sql>select c.theyear,count(distinct a.ordernumber),sum(b.amount) from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear order by c.theyear; 第三步 查看運(yùn)行結(jié)果 3.3.2 計(jì)算所有訂單每年的總金額第一步 執(zhí)行SQL語句 spark-sql>select c.theyear,count(distinct a.ordernumber),sum(b.amount) from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear order by c.theyear; 第二步 執(zhí)行結(jié)果 使用CLI執(zhí)行結(jié)果如下: 3.3.3 計(jì)算所有訂單每年最大金額訂單的銷售額spark-sql>select c.theyear,max(d.sumofamount) from tbDate c join (select a.dateid,a.ordernumber,sum(b.amount) as sumofamount from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber group by a.dateid,a.ordernumber ) d on c.dateid=d.dateid group by c.theyear sort by c.theyear; 使用CLI執(zhí)行結(jié)果如下:
4、Spark Thrift ServerThriftServer是一個(gè)JDBC/ODBC接口,用戶可以通過JDBC/ODBC連接ThriftServer來訪問SparkSQL的數(shù)據(jù)。ThriftServer在啟動(dòng)的時(shí)候,會(huì)啟動(dòng)了一個(gè)SparkSQL的應(yīng)用程序,而通過JDBC/ODBC連接進(jìn)來的客戶端共同分享這個(gè)SparkSQL應(yīng)用程序的資源,也就是說不同的用戶之間可以共享數(shù)據(jù);ThriftServer啟動(dòng)時(shí)還開啟一個(gè)偵聽器,等待JDBC客戶端的連接和提交查詢。所以,在配置ThriftServer的時(shí)候,至少要配置ThriftServer的主機(jī)名和端口,如果要使用Hive數(shù)據(jù)的話,還要提供Hive Metastore的uris。 【注】Spark CLI和Spark Thrift Server實(shí)驗(yàn)環(huán)境為第二課《Spark編譯與部署(下)--Spark編譯安裝》所搭建 4.1 配置并啟動(dòng)4.1.1 創(chuàng)建并配置hive-site.xml第一步 創(chuàng)建hive-site.xml配置文件 在$SPARK_HOME/conf目錄下修改hive-site.xml配置文件(如果在Spark SQL CLI中已經(jīng)添加,可以省略): $cd /app/hadoop/spark-1.1.0/conf $sudo vi hive-site.xml 第二步 修改配置文件 設(shè)置hadoop1為Metastore服務(wù)器,hadoop2為Thrift Server服務(wù)器,配置內(nèi)容如下: <configuration> <property> <name>hive.metastore.uris</name> <value>thrift://hadoop1:9083</value> <description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description> </property>
<property> <name>hive.server2.thrift.min.worker.threads</name> <value>5</value> <description>Minimum number of Thrift worker threads</description> </property>
<property> <name>hive.server2.thrift.max.worker.threads</name> <value>500</value> <description>Maximum number of Thrift worker threads</description> </property>
<property> <name>hive.server2.thrift.port</name> <value>10000</value> <description>Port number of HiveServer2 Thrift interface. Can be overridden by setting $HIVE_SERVER2_THRIFT_PORT</description> </property>
<property> <name>hive.server2.thrift.bind.host</name> <value>hadoop2</value> <description>Bind host on which to run the HiveServer2 Thrift interface.Can be overridden by setting$HIVE_SERVER2_THRIFT_BIND_HOST</description> </property> </configuration> 4.1.2 啟動(dòng)Hive在hadoop1節(jié)點(diǎn)中,在后臺(tái)啟動(dòng)Hive Metastore(如果數(shù)據(jù)存放在HDFS文件系統(tǒng),還需要啟動(dòng)Hadoop的HDFS): $nohup hive --service metastore > metastore.log 2>&1 & 4.1.3 啟動(dòng)Spark集群和Thrift Server在hadoop1節(jié)點(diǎn)啟動(dòng)Spark集群 $cd /app/hadoop/spark-1.1.0/sbin $./start-all.sh 在hadoop2節(jié)點(diǎn)上進(jìn)入SPARK_HOME/sbin目錄,使用如下命令啟動(dòng)Thrift Server $cd /app/hadoop/spark-1.1.0/sbin $./start-thriftserver.sh --master spark://hadoop1:7077 --executor-memory 1g 注意:Thrift Server需要按照配置在hadoop2啟動(dòng)! 在集群監(jiān)控頁面可以看到啟動(dòng)了SparkSQL應(yīng)用程序: 4.1.4 命令參數(shù)使用sbin/start-thriftserver.sh --help可以查看ThriftServer的命令參數(shù): $sbin/start-thriftserver.sh --help Usage: ./sbin/start-thriftserver [options] [thrift server options] Thrift server options: Use value for given property 其中[options] 是Thrift Server啟動(dòng)一個(gè)SparkSQL應(yīng)用程序的參數(shù),如果不設(shè)置--master的話,將在啟動(dòng)Thrift Server的機(jī)器以local方式運(yùn)行,只能通過http://機(jī)器名:4040進(jìn)行監(jiān)控;這部分參數(shù),可以參照Spark1.0.0 應(yīng)用程序部署工具spark-submit 的參數(shù)。在集群中提供Thrift Server的話,一定要配置master、executor-memory等參數(shù)。 [thrift server options]是Thrift Server的參數(shù),可以使用-dproperty=value的格式來定義;在實(shí)際應(yīng)用上,因?yàn)閰?shù)比較多,通常使用conf/hive-site.xml配置。 4.2 實(shí)戰(zhàn)Thrift Server4.2.1 遠(yuǎn)程客戶端連接可以在任意節(jié)點(diǎn)啟動(dòng)bin/beeline,用!connect jdbc:hive2://hadoop2:10000連接ThriftServer,因?yàn)闆]有采用權(quán)限管理,所以用戶名用運(yùn)行bin/beeline的用戶hadoop,密碼為空: $cd /app/hadoop/spark-1.1.0/bin $./beeline beeline>!connect jdbc:hive2://hadoop2:10000 4.2.2 基本操作第一步 顯示hive數(shù)據(jù)庫所有表 beeline>show database; beeline>use hive; beeline>show tables; 第二步 創(chuàng)建表testThrift beeline>create table testThrift(field1 String , field2 Int); beeline>show tables; 第三步 把tbStockDetail表中金額大于3000插入到testThrift表中 beeline>insert into table testThrift select ordernumber,amount from tbStockDetail where amount>3000; beeline>select * from testThrift; 第四步 重新創(chuàng)建testThrift表中,把年度最大訂單插入該表中 beeline>drop table testThrift; beeline>create table testThrift (field1 String , field2 Int); beeline>insert into table testThrift select c.theyear,max(d.sumofamount) from tbDate c join (select a.dateid,a.ordernumber,sum(b.amount) as sumofamount from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber group by a.dateid,a.ordernumber ) d on c.dateid=d.dateid group by c.theyear sort by c.theyear; beeline>select * from testThrift; 4.2.3 計(jì)算所有訂單每年的訂單數(shù)spark-sql>select c.theyear, count(distinct a.ordernumber) from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear order by c.theyear; Stage監(jiān)控頁面: 查看Details for Stage 28 4.2.4 計(jì)算所有訂單月銷售額前十名spark-sql>select c.theyear,c.themonth,sum(b.amount) as sumofamount from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear,c.themonth order by sumofamount desc limit 10; Stage監(jiān)控頁面: 在其第一個(gè)Task中,從本地讀入數(shù)據(jù) 在后面的Task是從內(nèi)存中獲取數(shù)據(jù) 4.2.5 緩存表數(shù)據(jù)第一步 緩存數(shù)據(jù) beeline>cache table tbStock; beeline>select count(*) from tbStock; 第二步 運(yùn)行4.2.4中的“計(jì)算所有訂單月銷售額前十名” beeline>select count(*) from tbStock; 本次計(jì)算劃給11.233秒,查看webUI,數(shù)據(jù)已經(jīng)緩存,緩存率為100%: 第三步 在另外節(jié)點(diǎn)再次運(yùn)行 在hadoop3節(jié)點(diǎn)啟動(dòng)bin/beeline,用!connect jdbc:hive2://hadoop2:10000連接ThriftServer,然后直接運(yùn)行對(duì)tbStock計(jì)數(shù)(注意沒有進(jìn)行數(shù)據(jù)庫的切換): 用時(shí)0.343秒,再查看webUI中的stage: Locality Level是PROCESS,顯然是使用了緩存表。 從上可以看出,ThriftServer可以連接多個(gè)JDBC/ODBC客戶端,并相互之間可以共享數(shù)據(jù)。順便提一句,ThriftServer啟動(dòng)后處于監(jiān)聽狀態(tài),用戶可以使用ctrl+c退出ThriftServer;而beeline的退出使用!q命令。 4.2.6 在IDEA中JDBC訪問有了ThriftServer,開發(fā)人員可以非常方便的使用JDBC/ODBC來訪問SparkSQL。下面是一個(gè)scala代碼,查詢表tbStockDetail,返回amount>3000的單據(jù)號(hào)和交易金額: 第一步 在IDEA創(chuàng)建class6包和類JDBCofSparkSQL 參見《Spark編程模型(下)--IDEA搭建及實(shí)戰(zhàn)》在IDEA中創(chuàng)建class6包并新建類JDBCofSparkSQL。該類中查詢tbStockDetail金額大于3000的訂單: package class6 import java.sql.DriverManager
object JDBCofSparkSQL { def main(args: Array[String]) { Class.forName("org.apache.hive.jdbc.HiveDriver") val conn = DriverManager.getConnection("jdbc:hive2://hadoop2:10000/hive", "hadoop", "") try { val statement = conn.createStatement val rs = statement.executeQuery("select ordernumber,amount from tbStockDetail where amount>3000") while (rs.next) { val ordernumber = rs.getString("ordernumber") val amount = rs.getString("amount") println("ordernumber = %s, amount = %s".format(ordernumber, amount)) } } catch { case e: Exception => e.printStackTrace } conn.close } } 第二步 查看運(yùn)行結(jié)果 在IDEA中可以觀察到,在運(yùn)行日志窗口中沒有運(yùn)行過程的日志,只顯示查詢結(jié)果 第三步 查看監(jiān)控結(jié)果 從Spark監(jiān)控界面中觀察到,該Job有一個(gè)編號(hào)為6的Stage,該Stage有2個(gè)Task,分別運(yùn)行在hadoop1和hadoop2節(jié)點(diǎn),獲取數(shù)據(jù)為NODE_LOCAL方式。 在hadoop2中觀察Thrift Server運(yùn)行日志如下:
|
|
|