|
spark中可以通過spark sql 直接查詢hive或impala中的數(shù)據(jù),
一、啟動方法 /data/spark-1.4.0-bin-cdh4/bin/spark-sql --master spark://master:7077 --total-executor-cores 10 --executor-memory 1g --executor-cores 2
注:/data/spark-1.4.0-bin-cdh4/為spark的安裝路徑
/data/spark-1.4.0-bin-cdh4/bin/spark-sql –help 查看啟動選項
--master MASTER_URL 指定master url --executor-memory MEM 每個executor的內(nèi)存,默認(rèn)為1G --total-executor-cores NUM 所有executor的總核數(shù) -e <quoted-query-string> 直接執(zhí)行查詢SQL
-f <filename> 以文件方式批量執(zhí)行SQL
二、Spark sql對hive支持的功能
1、查詢語句:SELECT GROUP BY ORDER BY CLUSTER BY SORT BY 2、hive操作運算: 1) 關(guān)系運算:= ==, <>, <, >, >=, <= 2) 算術(shù)運算:+, -, *, /, % 3) 邏輯運算:AND, &&, OR, || 4) 復(fù)雜的數(shù)據(jù)結(jié)構(gòu) 5) 數(shù)學(xué)函數(shù):(sign, ln, cos, etc) 6) 字符串函數(shù): 3、 UDF 4、 UDAF
5、 用戶定義的序列化格式 6、join操作:JOIN {LEFT|RIGHT|FULL} OUTER JOIN LEFT SEMI JOIN CROSS JOIN 7、 unions操作: 8、 子查詢: SELECT col FROM ( SELECT a + b AS col from t1) t2 9、Sampling 10、 Explain 11、 分區(qū)表 12、 視圖 13、 hive ddl功能:CREATE TABLE、CREATE TABLE AS SELECT、ALTER TABLE
14、 支持的數(shù)據(jù)類型:TINYINT SMALLINT INT BIGINT BOOLEAN FLOAT DOUBLE STRING BINARY TIMESTAMPDATE ARRAY MAP STRUCT
三、Spark sql 在客戶端編程方式進行查詢數(shù)據(jù) 1、啟動spark-shell ./spark-shell --master spark://master:7077 --total-executor-cores 10 --executor-memory 1g --executor-cores 2 2、編寫程序 val sqlContext = new org.apache.spark.sql.SQLContext(sc) val df = sqlContext.read.json("../examples/src/main/resources/people.json") 查看所有數(shù)據(jù):df.show() 查看表結(jié)構(gòu):df.printSchema() 只看name列:df.select("name").show() 對數(shù)據(jù)運算:df.select(df("name"), df("age") + 1).show() 過濾數(shù)據(jù):df.filter(df("age") > 21).show()
分組統(tǒng)計:df.groupBy("age").count().show()
1、查詢txt數(shù)據(jù) import sqlContext.implicits._ case class Person(name: String, age: Int) val people = sc.textFile("../examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF() people.registerTempTable("people") val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19") 2、parquet文件 val df = sqlContext.read.load("../examples/src/main/resources/users.parquet") 3、hdfs文件
val df = sqlContext.read.load("hdfs://namenode.hadoop:9000/user/hive/warehouse/spark_test.db/test_parquet/part-r-00001.gz.parquet") 4、保存查詢結(jié)果數(shù)據(jù) val df = sqlContext.read.load("../examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet“)
四、Spark sql性能調(diào)優(yōu)
緩存數(shù)據(jù)表:sqlContext.cacheTable("tableName")
取消緩存表:sqlContext.uncacheTable("tableName")
spark.sql.inMemoryColumnarStorage.compressedtrue 當(dāng)設(shè)置為true時,Spark SQL將為基于數(shù)據(jù)統(tǒng)計信息的每列自動選擇一個壓縮算法。 spark.sql.inMemoryColumnarStorage.batchSize 10000 柱狀緩存的批數(shù)據(jù)大小。更大的批數(shù)據(jù)可以提高內(nèi)存的利用率以及壓縮效率,但有OOMs的風(fēng)險
|
|
|