小男孩‘自慰网亚洲一区二区,亚洲一级在线播放毛片,亚洲中文字幕av每天更新,黄aⅴ永久免费无码,91成人午夜在线精品,色网站免费在线观看,亚洲欧洲wwwww在线观看

分享

Spark入門實(shí)戰(zhàn)系列

 青葉i生活 2018-01-23

【注】該系列文章以及使用到安裝包/測試數(shù)據(jù) 可以在《傾情大奉送--Spark入門實(shí)戰(zhàn)系列》獲取

1、SparkSQL的發(fā)展歷程

1.1 Hive and Shark

SparkSQL的前身是Shark,給熟悉RDBMS但又不理解MapReduce的技術(shù)人員提供快速上手的工具,Hive應(yīng)運(yùn)而生,它是當(dāng)時(shí)唯一運(yùn)行在Hadoop上的SQL-on-Hadoop工具。但是MapReduce計(jì)算過程中大量的中間磁盤落地過程消耗了大量的I/O,降低的運(yùn)行效率,為了提高SQL-on-Hadoop的效率,大量的SQL-on-Hadoop工具開始產(chǎn)生,其中表現(xiàn)較為突出的是:

l MapRDrill

l ClouderaImpala

l Shark

其中Shark是伯克利實(shí)驗(yàn)室Spark生態(tài)環(huán)境的組件之一,它修改了下圖所示的右下角的內(nèi)存管理、物理計(jì)劃、執(zhí)行三個(gè)模塊,并使之能運(yùn)行在Spark引擎上,從而使得SQL查詢的速度得到10-100倍的提升。

clip_image002

1.2 SharkSparkSQL 

但是,隨著Spark的發(fā)展,對(duì)于野心勃勃的Spark團(tuán)隊(duì)來說,Shark對(duì)于Hive的太多依賴(如采用Hive的語法解析器、查詢優(yōu)化器等等),制約了SparkOne Stack Rule Them All的既定方針,制約了Spark各個(gè)組件的相互集成,所以提出了SparkSQL項(xiàng)目。SparkSQL拋棄原有Shark的代碼,汲取了Shark的一些優(yōu)點(diǎn),如內(nèi)存列存儲(chǔ)(In-Memory Columnar Storage)、Hive兼容性等,重新開發(fā)了SparkSQL代碼;由于擺脫了對(duì)Hive的依賴性,SparkSQL無論在數(shù)據(jù)兼容、性能優(yōu)化、組件擴(kuò)展方面都得到了極大的方便,真可謂“退一步,海闊天空”。

l數(shù)據(jù)兼容方面  不但兼容Hive,還可以從RDD、parquet文件、JSON文件中獲取數(shù)據(jù),未來版本甚至支持獲取RDBMS數(shù)據(jù)以及cassandraNOSQL數(shù)據(jù);

l性能優(yōu)化方面  除了采取In-Memory Columnar Storagebyte-code generation等優(yōu)化技術(shù)外、將會(huì)引進(jìn)Cost Model對(duì)查詢進(jìn)行動(dòng)態(tài)評(píng)估、獲取最佳物理計(jì)劃等等;

l組件擴(kuò)展方面  無論是SQL的語法解析器、分析器還是優(yōu)化器都可以重新定義,進(jìn)行擴(kuò)展。

201461Shark項(xiàng)目和SparkSQL項(xiàng)目的主持人Reynold Xin宣布:停止對(duì)Shark的開發(fā),團(tuán)隊(duì)將所有資源放SparkSQL項(xiàng)目上,至此,Shark的發(fā)展畫上了句話,但也因此發(fā)展出兩個(gè)直線:SparkSQLHive on Spark。

clip_image004

其中SparkSQL作為Spark生態(tài)的一員繼續(xù)發(fā)展,而不再受限于Hive,只是兼容Hive;而Hive on Spark是一個(gè)Hive的發(fā)展計(jì)劃,該計(jì)劃將Spark作為Hive的底層引擎之一,也就是說,Hive將不再受限于一個(gè)引擎,可以采用Map-Reduce、Tez、Spark等引擎。

1.3 SparkSQL的性能

Shark的出現(xiàn),使得SQL-on-Hadoop的性能比Hive有了10-100倍的提高:

clip_image006

那么,擺脫了Hive的限制,SparkSQL的性能又有怎么樣的表現(xiàn)呢?雖然沒有Shark相對(duì)于Hive那樣矚目地性能提升,但也表現(xiàn)得非常優(yōu)異:

clip_image008

為什么SparkSQL的性能會(huì)得到怎么大的提升呢?主要SparkSQL在下面幾點(diǎn)做了優(yōu)化:

A:內(nèi)存列存儲(chǔ)(In-Memory Columnar Storage

SparkSQL的表數(shù)據(jù)在內(nèi)存中存儲(chǔ)不是采用原生態(tài)的JVM對(duì)象存儲(chǔ)方式,而是采用內(nèi)存列存儲(chǔ),如下圖所示。

clip_image010

該存儲(chǔ)方式無論在空間占用量和讀取吞吐率上都占有很大優(yōu)勢。

對(duì)于原生態(tài)的JVM對(duì)象存儲(chǔ)方式,每個(gè)對(duì)象通常要增加12-16字節(jié)的額外開銷,對(duì)于一個(gè)270MBTPC-H lineitem table數(shù)據(jù),使用這種方式讀入內(nèi)存,要使用970MB左右的內(nèi)存空間(通常是25倍于原生數(shù)據(jù)空間);另外,使用這種方式,每個(gè)數(shù)據(jù)記錄產(chǎn)生一個(gè)JVM對(duì)象,如果是大小為200B的數(shù)據(jù)記錄,32G的堆棧將產(chǎn)生1.6億個(gè)對(duì)象,這么多的對(duì)象,對(duì)于GC來說,可能要消耗幾分鐘的時(shí)間來處理(JVM的垃圾收集時(shí)間與堆棧中的對(duì)象數(shù)量呈線性相關(guān))。顯然這種內(nèi)存存儲(chǔ)方式對(duì)于基于內(nèi)存計(jì)算的Spark來說,很昂貴也負(fù)擔(dān)不起。

對(duì)于內(nèi)存列存儲(chǔ)來說,將所有原生數(shù)據(jù)類型的列采用原生數(shù)組來存儲(chǔ),將Hive支持的復(fù)雜數(shù)據(jù)類型(如arraymap等)先序化后并接成一個(gè)字節(jié)數(shù)組來存儲(chǔ)。這樣,每個(gè)列創(chuàng)建一個(gè)JVM對(duì)象,從而導(dǎo)致可以快速的GC和緊湊的數(shù)據(jù)存儲(chǔ);額外的,還可以使用低廉CPU開銷的高效壓縮方法(如字典編碼、行長度編碼等壓縮方法)降低內(nèi)存開銷;更有趣的是,對(duì)于分析查詢中頻繁使用的聚合特定列,性能會(huì)得到很大的提高,原因就是這些列的數(shù)據(jù)放在一起,更容易讀入內(nèi)存進(jìn)行計(jì)算。

B:字節(jié)碼生成技術(shù)(bytecode generation,即CG

在數(shù)據(jù)庫查詢中有一個(gè)昂貴的操作是查詢語句中的表達(dá)式,主要是由于JVM的內(nèi)存模型引起的。比如如下一個(gè)查詢:

SELECT a + b FROM table

在這個(gè)查詢里,如果采用通用的SQL語法途徑去處理,會(huì)先生成一個(gè)表達(dá)式樹(有兩個(gè)節(jié)點(diǎn)的Add樹,參考后面章節(jié)),在物理處理這個(gè)表達(dá)式樹的時(shí)候,將會(huì)如圖所示的7個(gè)步驟:

1.  調(diào)用虛函數(shù)Add.eval(),需要確認(rèn)Add兩邊的數(shù)據(jù)類型

2.  調(diào)用虛函數(shù)a.eval(),需要確認(rèn)a的數(shù)據(jù)類型

3.  確定a的數(shù)據(jù)類型是Int,裝箱

4.  調(diào)用虛函數(shù)b.eval(),需要確認(rèn)b的數(shù)據(jù)類型

5.  確定b的數(shù)據(jù)類型是Int,裝箱

6.  調(diào)用Int類型的Add

7.  返回裝箱后的計(jì)算結(jié)果

其中多次涉及到虛函數(shù)的調(diào)用,虛函數(shù)的調(diào)用會(huì)打斷CPU的正常流水線處理,減緩執(zhí)行。

Spark1.1.0catalyst模塊的expressions增加了codegen模塊,如果使用動(dòng)態(tài)字節(jié)碼生成技術(shù)(配置spark.sql.codegen參數(shù)),SparkSQL在執(zhí)行物理計(jì)劃的時(shí)候,對(duì)匹配的表達(dá)式采用特定的代碼,動(dòng)態(tài)編譯,然后運(yùn)行。如上例子,匹配到Add方法:

clip_image012

然后,通過調(diào)用,最終調(diào)用:

clip_image014

最終實(shí)現(xiàn)效果類似如下偽代碼:

val a: Int = inputRow.getInt(0)

val b: Int = inputRow.getInt(1)

val result: Int = a + b

resultRow.setInt(0, result)

對(duì)于Spark1.1.0,對(duì)SQL表達(dá)式都作了CG優(yōu)化,具體可以參看codegen模塊。CG優(yōu)化的實(shí)現(xiàn)主要還是依靠scala2.10的運(yùn)行時(shí)放射機(jī)制(runtime reflection)。對(duì)于SQL查詢的CG優(yōu)化,可以簡單地用下圖來表示:

clip_image016

CScala代碼優(yōu)化

另外,SparkSQL在使用Scala編寫代碼的時(shí)候,盡量避免低效的、容易GC的代碼;盡管增加了編寫代碼的難度,但對(duì)于用戶來說,還是使用統(tǒng)一的接口,沒受到使用上的困難。下圖是一個(gè)Scala代碼優(yōu)化的示意圖:

clip_image018

2SparkSQL運(yùn)行架構(gòu)

類似于關(guān)系型數(shù)據(jù)庫,SparkSQL也是語句也是由Projectiona1,a2,a3)、Data SourcetableA)、Filtercondition)組成,分別對(duì)應(yīng)sql查詢過程中的Result、Data SourceOperation,也就是說SQL語句按Result-->Data Source-->Operation的次序來描述的。

clip_image020

 

當(dāng)執(zhí)行SparkSQL語句的順序?yàn)椋?/span>

1.對(duì)讀入的SQL語句進(jìn)行解析(Parse),分辨出SQL語句中哪些詞是關(guān)鍵詞(如SELECT、FROMWHERE),哪些是表達(dá)式、哪些是Projection、哪些是Data Source等,從而判斷SQL語句是否規(guī)范;

2.SQL語句和數(shù)據(jù)庫的數(shù)據(jù)字典(列、表、視圖等等)進(jìn)行綁定(Bind),如果相關(guān)的Projection、Data Source等都是存在的話,就表示這個(gè)SQL語句是可以執(zhí)行的;

3.一般的數(shù)據(jù)庫會(huì)提供幾個(gè)執(zhí)行計(jì)劃,這些計(jì)劃一般都有運(yùn)行統(tǒng)計(jì)數(shù)據(jù),數(shù)據(jù)庫會(huì)在這些計(jì)劃中選擇一個(gè)最優(yōu)計(jì)劃(Optimize);

4.計(jì)劃執(zhí)行(Execute),按Operation-->Data Source-->Result的次序來進(jìn)行的,在執(zhí)行過程有時(shí)候甚至不需要讀取物理表就可以返回結(jié)果,比如重新運(yùn)行剛運(yùn)行過的SQL語句,可能直接從數(shù)據(jù)庫的緩沖池中獲取返回結(jié)果。

2.1 TreeRule

SparkSQL對(duì)SQL語句的處理和關(guān)系型數(shù)據(jù)庫對(duì)SQL語句的處理采用了類似的方法,首先會(huì)將SQL語句進(jìn)行解析(Parse),然后形成一個(gè)Tree,在后續(xù)的如綁定、優(yōu)化等處理過程都是對(duì)Tree的操作,而操作的方法是采用Rule,通過模式匹配,對(duì)不同類型的節(jié)點(diǎn)采用不同的操作。在整個(gè)sql語句的處理過程中,TreeRule相互配合,完成了解析、綁定(在SparkSQL中稱為Analysis)、優(yōu)化、物理計(jì)劃等過程,最終生成可以執(zhí)行的物理計(jì)劃。

2.1.1 Tree

l  Tree的相關(guān)代碼定義在sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees

l  Logical Plans、Expressions、Physical Operators都可以使用Tree表示

l  Tree的具體操作是通過TreeNode來實(shí)現(xiàn)的

?  SparkSQL定義了catalyst.trees的日志,通過這個(gè)日志可以形象的表示出樹的結(jié)構(gòu)

?  TreeNode可以使用scala的集合操作方法(如foreach, map, flatMap, collect等)進(jìn)行操作

?  有了TreeNode,通過Tree中各個(gè)TreeNode之間的關(guān)系,可以對(duì)Tree進(jìn)行遍歷操作,如使用transformDown、transformUpRule應(yīng)用到給定的樹段,然后用結(jié)果替代舊的樹段;也可以使用transformChildrenDown、transformChildrenUp對(duì)一個(gè)給定的節(jié)點(diǎn)進(jìn)行操作,通過迭代將Rule應(yīng)用到該節(jié)點(diǎn)以及子節(jié)點(diǎn)。

l  TreeNode可以細(xì)分成三種類型的Node

?  UnaryNode 一元節(jié)點(diǎn),即只有一個(gè)子節(jié)點(diǎn)。如Limit、Filter操作

?  BinaryNode 二元節(jié)點(diǎn),即有左右子節(jié)點(diǎn)的二叉節(jié)點(diǎn)。如JionUnion操作

?  LeafNode 葉子節(jié)點(diǎn),沒有子節(jié)點(diǎn)的節(jié)點(diǎn)。主要用戶命令類操作,如SetCommand

 

2.1.2 Rule

l  Rule的相關(guān)代碼定義在sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules

l  RuleSparkSQLAnalyzerOptimizer、SparkPlan等各個(gè)組件中都有應(yīng)用到

l  Rule是一個(gè)抽象類,具體的Rule實(shí)現(xiàn)是通過RuleExecutor完成

l  Rule通過定義batchbatchs,可以簡便的、模塊化地對(duì)Tree進(jìn)行transform操作

l  Rule通過定義OnceFixedPoint,可以對(duì)Tree進(jìn)行一次操作或多次操作(如對(duì)某些Tree進(jìn)行多次迭代操作的時(shí)候,達(dá)到FixedPoint次數(shù)迭代或達(dá)到前后兩次的樹結(jié)構(gòu)沒變化才停止操作,具體參看RuleExecutor.apply

2.2 sqlContexthiveContext的運(yùn)行過程

SparkSQL有兩個(gè)分支,sqlContexthiveContext,sqlContext現(xiàn)在只支持SQL語法解析器(SQL-92語法);hiveContext現(xiàn)在支持SQL語法解析器和hivesql語法解析器,默認(rèn)為hiveSQL語法解析器,用戶可以通過配置切換成SQL語法解析器,來運(yùn)行hiveSQL不支持的語法,

2.2.1 sqlContext的運(yùn)行過程

sqlContext總的一個(gè)過程如下圖所示:

1.SQL語句經(jīng)過SqlParse解析成UnresolvedLogicalPlan

2.使用analyzer結(jié)合數(shù)據(jù)數(shù)據(jù)字典(catalog)進(jìn)行綁定,生成resolvedLogicalPlan;

3.使用optimizer對(duì)resolvedLogicalPlan進(jìn)行優(yōu)化,生成optimizedLogicalPlan;

4.使用SparkPlanLogicalPlan轉(zhuǎn)換成PhysicalPlan

5.使用prepareForExecution()PhysicalPlan轉(zhuǎn)換成可執(zhí)行物理計(jì)劃;

6.使用execute()執(zhí)行可執(zhí)行物理計(jì)劃;

7.生成SchemaRDD。

在整個(gè)運(yùn)行過程中涉及到多個(gè)SparkSQL的組件,如SqlParse、analyzer、optimizer、SparkPlan等等

clip_image022

2.2.2hiveContext的運(yùn)行過程

hiveContext總的一個(gè)過程如下圖所示:

1.SQL語句經(jīng)過HiveQl.parseSql解析成Unresolved LogicalPlan,在這個(gè)解析過程中對(duì)hiveql語句使用getAst()獲取AST樹,然后再進(jìn)行解析;

2.使用analyzer結(jié)合數(shù)據(jù)hive源數(shù)據(jù)Metastore(新的catalog)進(jìn)行綁定,生成resolved LogicalPlan;

3.使用optimizer對(duì)resolved LogicalPlan進(jìn)行優(yōu)化,生成optimized LogicalPlan,優(yōu)化前使用了ExtractPythonUdfs(catalog.PreInsertionCasts(catalog.CreateTables(analyzed)))進(jìn)行預(yù)處理;

4.使用hivePlannerLogicalPlan轉(zhuǎn)換成PhysicalPlan;

5.使用prepareForExecution()PhysicalPlan轉(zhuǎn)換成可執(zhí)行物理計(jì)劃;

6.使用execute()執(zhí)行可執(zhí)行物理計(jì)劃;

7.執(zhí)行后,使用map(_.copy)將結(jié)果導(dǎo)入SchemaRDD。

clip_image024

2.3 catalyst優(yōu)化器

SparkSQL1.1總體上由四個(gè)模塊組成:core、catalysthive、hive-Thriftserver

l  core處理數(shù)據(jù)的輸入輸出,從不同的數(shù)據(jù)源獲取數(shù)據(jù)(RDD、Parquetjson等),將查詢結(jié)果輸出成schemaRDD;

l  catalyst處理查詢語句的整個(gè)處理過程,包括解析、綁定、優(yōu)化、物理計(jì)劃等,說其是優(yōu)化器,還不如說是查詢引擎;

l  hive對(duì)hive數(shù)據(jù)的處理

l  hive-ThriftServer提供CLIJDBC/ODBC接口

在這四個(gè)模塊中,catalyst處于最核心的部分,其性能優(yōu)劣將影響整體的性能。由于發(fā)展時(shí)間尚短,還有很多不足的地方,但其插件式的設(shè)計(jì),為未來的發(fā)展留下了很大的空間。下面是catalyst的一個(gè)設(shè)計(jì)圖:

clip_image026

 

其中虛線部分是以后版本要實(shí)現(xiàn)的功能,實(shí)線部分是已經(jīng)實(shí)現(xiàn)的功能。從上圖看,catalyst主要的實(shí)現(xiàn)組件有:

lsqlParse,完成sql語句的語法解析功能,目前只提供了一個(gè)簡單的sql解析器;

lAnalyzer,主要完成綁定工作,將不同來源的Unresolved LogicalPlan和數(shù)據(jù)元數(shù)據(jù)(如hive metastore、Schema catalog)進(jìn)行綁定,生成resolved LogicalPlan

loptimizer對(duì)resolved LogicalPlan進(jìn)行優(yōu)化,生成optimized LogicalPlan;

l PlannerLogicalPlan轉(zhuǎn)換成PhysicalPlan

l CostModel,主要根據(jù)過去的性能統(tǒng)計(jì)數(shù)據(jù),選擇最佳的物理執(zhí)行計(jì)劃

這些組件的基本實(shí)現(xiàn)方法:

l 先將sql語句通過解析生成Tree,然后在不同階段使用不同的Rule應(yīng)用到Tree上,通過轉(zhuǎn)換完成各個(gè)組件的功能。

l Analyzer使用Analysis Rules,配合數(shù)據(jù)元數(shù)據(jù)(如hive metastore、Schema catalog),完善Unresolved LogicalPlan的屬性而轉(zhuǎn)換成resolved LogicalPlan;

l optimizer使用Optimization Rules,對(duì)resolved LogicalPlan進(jìn)行合并、列裁剪、過濾器下推等優(yōu)化作業(yè)而轉(zhuǎn)換成optimized LogicalPlan;

l Planner使用Planning Strategies,對(duì)optimized LogicalPlan

3、SparkSQL CLI

CLICommand-Line Interface,命令行界面)是指可在用戶提示符下鍵入可執(zhí)行指令的界面,它通常不支持鼠標(biāo),用戶通過鍵盤輸入指令,計(jì)算機(jī)接收到指令后予以執(zhí)行。Spark CLI指的是使用命令界面直接輸入SQL命令,然后發(fā)送到Spark集群進(jìn)行執(zhí)行,在界面中顯示運(yùn)行過程和最終的結(jié)果。

Spark1.1相較于Spark1.0最大的差別就在于Spark1.1增加了Spark SQL CLIThriftServer,使得Hive用戶還有用慣了命令行的RDBMS數(shù)據(jù)庫管理員較容易地上手,真正意義上進(jìn)入了SQL時(shí)代。

【注】Spark CLISpark Thrift Server實(shí)驗(yàn)環(huán)境為第二課《Spark編譯與部署(下)--Spark編譯安裝》所搭建

3.1  運(yùn)行環(huán)境說明

3.1.1 硬軟件環(huán)境

l  主機(jī)操作系統(tǒng):Windows 64位,雙核4線程,主頻2.2G10G內(nèi)存

l  虛擬軟件:VMware? Workstation 9.0.0 build-812388

l  虛擬機(jī)操作系統(tǒng):CentOS 64位,單核

l  虛擬機(jī)運(yùn)行環(huán)境:

?  JDK1.7.0_55 64

?  Hadoop2.2.0(需要編譯為64位)

?  Scala2.11.4

?  Spark1.1.0(需要編譯)

?  Hive0.13.1

3.1.2 機(jī)器網(wǎng)絡(luò)環(huán)境

集群包含三個(gè)節(jié)點(diǎn),節(jié)點(diǎn)之間可以免密碼SSH訪問,節(jié)點(diǎn)IP地址和主機(jī)名分布如下:

序號(hào)

IP地址

機(jī)器名

類型

核數(shù)/內(nèi)存

用戶名

目錄

1

192.168.0.61

hadoop1

NN/DN/RM

Master/Worker

1/3G

hadoop

/app 程序所在路徑

/app/scala-...

/app/hadoop

/app/complied

2

192.168.0.62

hadoop2

DN/NM/Worker

1/2G

hadoop

3

192.168.0.63

hadoop3

DN/NM/Worker

1/2G

hadoop

3.2 配置并啟動(dòng)

3.2.1 創(chuàng)建并配置hive-site.xml

在運(yùn)行Spark SQL CLI中需要使用到Hive Metastore,故需要在Spark中添加其uris。具體方法是在SPARK_HOME/conf目錄下創(chuàng)建hive-site.xml文件,然后在該配置文件中,添加hive.metastore.uris屬性,具體如下:

<configuration> 

  <property>

    <name>hive.metastore.uris</name>

    <value>thrift://hadoop1:9083</value>

    <description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>

  </property>

</configuration>

clip_image028

3.2.2 啟動(dòng)Hive

在使用Spark SQL CLI之前需要啟動(dòng)Hive Metastore(如果數(shù)據(jù)存放在HDFS文件系統(tǒng),還需要啟動(dòng)HadoopHDFS),使用如下命令可以使Hive Metastore啟動(dòng)后運(yùn)行在后臺(tái),可以通過jobs查詢:

$nohup hive --service metastore > metastore.log 2>&1 &

clip_image030

3.2.3 啟動(dòng)Spark集群和Spark SQL CLI

通過如下命令啟動(dòng)Spark集群和Spark SQL CLI

$cd /app/hadoop/spark-1.1.0

$sbin/start-all.sh

$bin/spark-sql --master spark://hadoop1:7077 --executor-memory 1g

在集群監(jiān)控頁面可以看到啟動(dòng)了SparkSQL應(yīng)用程序:

clip_image032

這時(shí)就可以使用HQL語句對(duì)Hive數(shù)據(jù)進(jìn)行查詢,另外可以使用COMMAND,如使用set進(jìn)行設(shè)置參數(shù):默認(rèn)情況下,SparkSQL Shuffle的時(shí)候是200個(gè)partition,可以使用如下命令修改該參數(shù):

SET spark.sql.shuffle.partitions=20;

運(yùn)行同一個(gè)查詢語句,參數(shù)改變后,Taskpartition)的數(shù)量就由200變成了20。

clip_image034

3.2.4 命令參數(shù)

通過bin/spark-sql --help可以查看CLI命令參數(shù):

clip_image036

clip_image038

其中[options] CLI啟動(dòng)一個(gè)SparkSQL應(yīng)用程序的參數(shù),如果不設(shè)置--master的話,將在啟動(dòng)spark-sql的機(jī)器以local方式運(yùn)行,只能通過http://機(jī)器名:4040進(jìn)行監(jiān)控;這部分參數(shù),可以參照Spark1.0.0 應(yīng)用程序部署工具spark-submit 的參數(shù)。

[cli option]CLI的參數(shù),通過這些參數(shù)CLI可以直接運(yùn)行SQL文件、進(jìn)入命令行運(yùn)行SQL命令等等,類似以前的Shark的用法。需要注意的是CLI不是使用JDBC連接,所以不能連接到ThriftServer;但可以配置conf/hive-site.xml連接到HiveMetastore,然后對(duì)Hive數(shù)據(jù)進(jìn)行查詢。

3.3 實(shí)戰(zhàn)Spark SQL CLI

3.3.1 獲取訂單每年的銷售單數(shù)、銷售總額

第一步   設(shè)置任務(wù)個(gè)數(shù),在這里修改為20個(gè)

spark-sql>SET spark.sql.shuffle.partitions=20;

clip_image040

第二步   運(yùn)行SQL語句

spark-sql>use hive;

clip_image042

spark-sql>select c.theyear,count(distinct a.ordernumber),sum(b.amount) from tbStock a join tbStockDetail  b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear order by c.theyear;

clip_image044

第三步   查看運(yùn)行結(jié)果

clip_image046

clip_image048

3.3.2 計(jì)算所有訂單每年的總金額

第一步   執(zhí)行SQL語句

spark-sql>select c.theyear,count(distinct a.ordernumber),sum(b.amount) from tbStock a join tbStockDetail  b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear order by c.theyear;

clip_image050

第二步   執(zhí)行結(jié)果

使用CLI執(zhí)行結(jié)果如下:

clip_image052

clip_image054

3.3.3 計(jì)算所有訂單每年最大金額訂單的銷售額

第一步   執(zhí)行SQL語句

spark-sql>select c.theyear,max(d.sumofamount) from tbDate c join (select a.dateid,a.ordernumber,sum(b.amount) as sumofamount from tbStock a join tbStockDetail  b on a.ordernumber=b.ordernumber group by a.dateid,a.ordernumber ) d  on c.dateid=d.dateid group by c.theyear sort by c.theyear;

clip_image056

第二步   執(zhí)行結(jié)果

使用CLI執(zhí)行結(jié)果如下:

clip_image058

 

clip_image060

4、Spark Thrift Server

ThriftServer是一個(gè)JDBC/ODBC接口,用戶可以通過JDBC/ODBC連接ThriftServer來訪問SparkSQL的數(shù)據(jù)。ThriftServer在啟動(dòng)的時(shí)候,會(huì)啟動(dòng)了一個(gè)SparkSQL的應(yīng)用程序,而通過JDBC/ODBC連接進(jìn)來的客戶端共同分享這個(gè)SparkSQL應(yīng)用程序的資源,也就是說不同的用戶之間可以共享數(shù)據(jù);ThriftServer啟動(dòng)時(shí)還開啟一個(gè)偵聽器,等待JDBC客戶端的連接和提交查詢。所以,在配置ThriftServer的時(shí)候,至少要配置ThriftServer的主機(jī)名和端口,如果要使用Hive數(shù)據(jù)的話,還要提供Hive Metastoreuris。

【注】Spark CLISpark Thrift Server實(shí)驗(yàn)環(huán)境為第二課《Spark編譯與部署(下)--Spark編譯安裝》所搭建

4.1 配置并啟動(dòng)

4.1.1 創(chuàng)建并配置hive-site.xml

第一步   創(chuàng)建hive-site.xml配置文件

$SPARK_HOME/conf目錄下修改hive-site.xml配置文件(如果在Spark SQL CLI中已經(jīng)添加,可以省略):

$cd /app/hadoop/spark-1.1.0/conf

$sudo vi hive-site.xml

clip_image062

第二步   修改配置文件

設(shè)置hadoop1Metastore服務(wù)器,hadoop2Thrift Server服務(wù)器,配置內(nèi)容如下:

<configuration>

  <property>

    <name>hive.metastore.uris</name>

    <value>thrift://hadoop1:9083</value>

    <description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>

  </property>

 

  <property>

    <name>hive.server2.thrift.min.worker.threads</name>

    <value>5</value>

    <description>Minimum number of Thrift worker threads</description>

  </property>

 

  <property>

    <name>hive.server2.thrift.max.worker.threads</name>

    <value>500</value>

    <description>Maximum number of Thrift worker threads</description>

  </property>

 

  <property>

    <name>hive.server2.thrift.port</name>

    <value>10000</value>

    <description>Port number of HiveServer2 Thrift interface. Can be overridden by setting $HIVE_SERVER2_THRIFT_PORT</description>

  </property>

 

  <property>

    <name>hive.server2.thrift.bind.host</name>

    <value>hadoop2</value>

    <description>Bind host on which to run the HiveServer2 Thrift interface.Can be overridden by setting$HIVE_SERVER2_THRIFT_BIND_HOST</description>

  </property>

</configuration>

clip_image064

4.1.2 啟動(dòng)Hive

hadoop1節(jié)點(diǎn)中,在后臺(tái)啟動(dòng)Hive Metastore(如果數(shù)據(jù)存放在HDFS文件系統(tǒng),還需要啟動(dòng)HadoopHDFS):

$nohup hive --service metastore > metastore.log 2>&1 &

clip_image066

4.1.3 啟動(dòng)Spark集群和Thrift Server

hadoop1節(jié)點(diǎn)啟動(dòng)Spark集群

$cd /app/hadoop/spark-1.1.0/sbin

$./start-all.sh

hadoop2節(jié)點(diǎn)上進(jìn)入SPARK_HOME/sbin目錄,使用如下命令啟動(dòng)Thrift Server

$cd /app/hadoop/spark-1.1.0/sbin

$./start-thriftserver.sh --master spark://hadoop1:7077 --executor-memory 1g

clip_image068

注意Thrift Server需要按照配置在hadoop2啟動(dòng)!

在集群監(jiān)控頁面可以看到啟動(dòng)了SparkSQL應(yīng)用程序:

clip_image070

4.1.4 命令參數(shù)

使用sbin/start-thriftserver.sh --help可以查看ThriftServer的命令參數(shù):

$sbin/start-thriftserver.sh --help Usage: ./sbin/start-thriftserver [options] [thrift server options]

        Thrift server options: Use value for given property

clip_image072

clip_image074

其中[options] Thrift Server啟動(dòng)一個(gè)SparkSQL應(yīng)用程序的參數(shù),如果不設(shè)置--master的話,將在啟動(dòng)Thrift Server的機(jī)器以local方式運(yùn)行,只能通過http://機(jī)器名:4040進(jìn)行監(jiān)控;這部分參數(shù),可以參照Spark1.0.0 應(yīng)用程序部署工具spark-submit 的參數(shù)。在集群中提供Thrift Server的話,一定要配置master、executor-memory等參數(shù)。

[thrift server options]Thrift Server的參數(shù),可以使用-dproperty=value的格式來定義;在實(shí)際應(yīng)用上,因?yàn)閰?shù)比較多,通常使用conf/hive-site.xml配置。

4.2 實(shí)戰(zhàn)Thrift Server

4.2.1 遠(yuǎn)程客戶端連接

可以在任意節(jié)點(diǎn)啟動(dòng)bin/beeline,用!connect jdbc:hive2://hadoop2:10000連接ThriftServer,因?yàn)闆]有采用權(quán)限管理,所以用戶名用運(yùn)行bin/beeline的用戶hadoop,密碼為空:

$cd /app/hadoop/spark-1.1.0/bin

$./beeline

beeline>!connect jdbc:hive2://hadoop2:10000

clip_image076

4.2.2 基本操作

第一步   顯示hive數(shù)據(jù)庫所有表

beeline>show database;

beeline>use hive;

beeline>show tables;

clip_image078

第二步   創(chuàng)建表testThrift

beeline>create table testThrift(field1 String , field2 Int);

beeline>show tables;

clip_image080

第三步   tbStockDetail表中金額大于3000插入到testThrift表中

beeline>insert into table testThrift select ordernumber,amount from tbStockDetail  where amount>3000;

beeline>select * from testThrift;

clip_image082

第四步   重新創(chuàng)建testThrift表中,把年度最大訂單插入該表中

beeline>drop table testThrift;

beeline>create table testThrift (field1 String , field2 Int);

beeline>insert into table testThrift select c.theyear,max(d.sumofamount) from tbDate c join (select a.dateid,a.ordernumber,sum(b.amount) as sumofamount from tbStock a join tbStockDetail  b on a.ordernumber=b.ordernumber group by a.dateid,a.ordernumber ) d  on c.dateid=d.dateid group by c.theyear sort by c.theyear;

beeline>select * from testThrift;

clip_image084

4.2.3 計(jì)算所有訂單每年的訂單數(shù)

第一步   執(zhí)行SQL語句

spark-sql>select c.theyear, count(distinct a.ordernumber) from tbStock a join tbStockDetail  b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear order by c.theyear;

第二步   執(zhí)行結(jié)果

clip_image086

Stage監(jiān)控頁面:

clip_image088

查看Details for Stage 28

clip_image090

4.2.4 計(jì)算所有訂單月銷售額前十名

第一步   執(zhí)行SQL語句

spark-sql>select c.theyear,c.themonth,sum(b.amount) as sumofamount from tbStock a join tbStockDetail  b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear,c.themonth order by sumofamount desc limit 10;

第二步   執(zhí)行結(jié)果

clip_image092

Stage監(jiān)控頁面:

clip_image094

在其第一個(gè)Task中,從本地讀入數(shù)據(jù)

clip_image096

在后面的Task是從內(nèi)存中獲取數(shù)據(jù)

clip_image098

4.2.5 緩存表數(shù)據(jù)

第一步   緩存數(shù)據(jù)

beeline>cache table tbStock;

beeline>select count(*) from tbStock;

clip_image100

第二步   運(yùn)行4.2.4中的“計(jì)算所有訂單月銷售額前十名”

beeline>select count(*) from tbStock;

clip_image102

本次計(jì)算劃給11.233秒,查看webUI,數(shù)據(jù)已經(jīng)緩存,緩存率為100%

clip_image104

第三步   在另外節(jié)點(diǎn)再次運(yùn)行

hadoop3節(jié)點(diǎn)啟動(dòng)bin/beeline,用!connect jdbc:hive2://hadoop2:10000連接ThriftServer,然后直接運(yùn)行對(duì)tbStock計(jì)數(shù)(注意沒有進(jìn)行數(shù)據(jù)庫的切換):

clip_image106

用時(shí)0.343秒,再查看webUI中的stage

clip_image108

Locality LevelPROCESS,顯然是使用了緩存表。

從上可以看出,ThriftServer可以連接多個(gè)JDBC/ODBC客戶端,并相互之間可以共享數(shù)據(jù)。順便提一句,ThriftServer啟動(dòng)后處于監(jiān)聽狀態(tài),用戶可以使用ctrl+c退出ThriftServer;而beeline的退出使用!q命令。

4.2.6 IDEAJDBC訪問

有了ThriftServer,開發(fā)人員可以非常方便的使用JDBC/ODBC來訪問SparkSQL。下面是一個(gè)scala代碼,查詢表tbStockDetail,返回amount>3000的單據(jù)號(hào)和交易金額:

第一步   IDEA創(chuàng)建class6包和類JDBCofSparkSQL

參見《Spark編程模型(下)--IDEA搭建及實(shí)戰(zhàn)》在IDEA中創(chuàng)建class6包并新建類JDBCofSparkSQL。該類中查詢tbStockDetail金額大于3000的訂單:

package class6

import java.sql.DriverManager

 

object JDBCofSparkSQL {

  def main(args: Array[String]) {

    Class.forName("org.apache.hive.jdbc.HiveDriver")

    val conn = DriverManager.getConnection("jdbc:hive2://hadoop2:10000/hive", "hadoop", "")

    try {

      val statement = conn.createStatement

val rs = statement.executeQuery("select ordernumber,amount from tbStockDetail  where amount>3000")

      while (rs.next) {

        val ordernumber = rs.getString("ordernumber")

        val amount = rs.getString("amount")

        println("ordernumber = %s, amount = %s".format(ordernumber, amount))

      }

    } catch {

      case e: Exception => e.printStackTrace

    }

    conn.close

  }

}

第二步   查看運(yùn)行結(jié)果

IDEA中可以觀察到,在運(yùn)行日志窗口中沒有運(yùn)行過程的日志,只顯示查詢結(jié)果

clip_image110

第三步   查看監(jiān)控結(jié)果

Spark監(jiān)控界面中觀察到,該Job有一個(gè)編號(hào)為6Stage,該Stage2個(gè)Task,分別運(yùn)行在hadoop1hadoop2節(jié)點(diǎn),獲取數(shù)據(jù)為NODE_LOCAL方式。

clip_image112

clip_image114

clip_image116

hadoop2中觀察Thrift Server運(yùn)行日志如下:

clip_image118

 

    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶 評(píng)論公約

    類似文章 更多