|
1、使用Java開發(fā)DataFrame 2、使用Scala開發(fā)DataFrame 創(chuàng)建DataFrame的時候可以來自于其它RDD,來源于Hive表,以及其他數(shù)據(jù)來源,例如json文件 SQLContext只支持SQL一種方言(delax?),HiveContext支持SQL方言以及其它方言,通過設(shè)置都可以支持。 //F:\sparkData\people.json文件
{"name":"Michael"}
{"name":"Andy","age":31}
{"name":"Justin","age":20}
一、使用Java開發(fā)DataFrame package com.tom.spark.SparkApps.sql;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
/**
*
*/
public class DataFrameOps {
/**
* @param args
*/
public static void main(String[] args) {
//創(chuàng)建SparkConf用于讀取系統(tǒng)配置信息并設(shè)置當(dāng)前應(yīng)用程序的名字
SparkConf conf = new SparkConf().setAppName("DataFrameOps").setMaster("local");
//創(chuàng)建JavaSparkContext對象實例作為整個Driver的核心基石
JavaSparkContext sc = new JavaSparkContext(conf);
//設(shè)置日志級別為WARN
sc.setLogLevel("WARN");
//創(chuàng)建SQLContext上下文對象用于SQL的分析
SQLContext sqlContext = new SQLContext(sc);
//創(chuàng)建Data Frame,可以簡單的認(rèn)為DataFrame是一張表
DataFrame df = sqlContext.read().json("F:\\sparkData\\people.json");
//select * from table
df.show();
//desc table
df.printSchema();
//select name from table
df.select(df.col("name")).show();
//select name, age+10 from table
df.select(df.col("name"), df.col("age").plus(10)).show();
//select * from table where age > 21
df.filter(df.col("age").gt(21)).show();
//select age, count(1) from table group by age
df.groupBy("age").count().show(); //df.groupBy(df.col("age")).count().show();
}
}
以下為程序輸出: +----+-------+
| age| name|
+----+-------+
|null|Michael|
| 31| Andy|
| 20| Justin|
+----+-------+
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
+-------+----------+
| name|(age + 10)|
+-------+----------+
|Michael| null|
| Andy| 41|
| Justin| 30|
+-------+----------+
+---+----+
|age|name|
+---+----+
| 31|Andy|
+---+----+
+----+-----+
| age|count|
+----+-----+
| 31| 1|
|null| 1|
| 20| 1|
+----+-----+
二、使用Scala開發(fā)DataFrame package com.tom.spark.sql
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
/**
*
*/
object DataFrameOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("DataFrameOps").setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.json("F:\\sparkData\\people.json")
df.show()
df.printSchema()
df.select("name").show()
df.select(df("name"),df("age")+10).show()
df.filter(df("age")>21).show()
df.groupBy("age").count().show()
}
}
以下為程序輸出 +----+-------+
| age| name|
+----+-------+
|null|Michael|
| 31| Andy|
| 20| Justin|
+----+-------+
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
+-------+----------+
| name|(age + 10)|
+-------+----------+
|Michael| null|
| Andy| 41|
| Justin| 30|
+-------+----------+
+---+----+
|age|name|
+---+----+
| 31|Andy|
+---+----+
+----+-----+
| age|count|
+----+-----+
| 31| 1|
|null| 1|
| 20| 1|
+----+-----+
spark-submit可以指定–file參數(shù),可以把hive-site.xml中指定的hive文件夾添加進來 spark-submit --class com.dt.spark.sql.DataFrameOps
--files /usr/local/hive/apache-hive-1.2.1-bin/conf/hive-site.xml
--driver-class-path /usr/local/hive/apace-hive-1.2.1-bin/mysql-connector-java-5.1.35-bin.jar
--master spark://Master:7077 /root/Documents/SparkApps/WordCount.jar
|