小男孩‘自慰网亚洲一区二区,亚洲一级在线播放毛片,亚洲中文字幕av每天更新,黄aⅴ永久免费无码,91成人午夜在线精品,色网站免费在线观看,亚洲欧洲wwwww在线观看

分享

3000門(mén)徒內(nèi)部訓(xùn)練絕密視頻(泄密版)第10課:徹底實(shí)戰(zhàn)詳解使用Java開(kāi)發(fā)Spark程序

 看風(fēng)景D人 2019-02-24

第一步:下載Eclipse IDE for JAVA Developer
第二步:解壓并啟動(dòng)Eclipse
第三步:創(chuàng)建Maven工程
第四步:使用maven-archetype-quickstart,設(shè)定一些包名
第五步:通過(guò)BuildPath把默認(rèn)的J2EE 1.5變成Java1.8
第六步:配置pom.xml,添加程序開(kāi)發(fā)時(shí)的相關(guān)依賴,并配置具體build打包的信息

POM.xml
有各種依賴的支持

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>1.6.2</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.10</artifactId>
    <version>1.6.2</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive_2.10</artifactId>
    <version>1.6.2</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.10</artifactId>
    <version>1.6.2</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka_2.10</artifactId>
    <version>1.6.2</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-graphx_2.10</artifactId>
    <version>1.6.2</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.6.0</version>
</dependency>

http://maven./org.apache.spark

package com.tom.spark.SparkApps.cores;

import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;

/**
 * 使用Java的方式開(kāi)發(fā)本地測(cè)試Spark的WordCount程序
 * @author 
 *
 */
public class WordCount {

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        /**
         * 第一步:創(chuàng)建Spark的配置對(duì)象,SparkConf,設(shè)置Spark程序的運(yùn)行時(shí)的配置信息
         * 例如通過(guò)setMaster來(lái)設(shè)置程序要連接的spark集群的Master的URL,如果設(shè)置
         * 為local,則代表Spark程序在本地運(yùn)行,特別適合于機(jī)器配置條件非常差的初學(xué)者
         * 
         */
        SparkConf conf = new SparkConf().setAppName("Spark WordCount written by Java").setMaster("local");
        /**
         * 第二步:創(chuàng)建SparkContext對(duì)象
         * SparkContext是Spark程序所有功能的唯一入口,無(wú)論采用Scala、Java、Python、R等都必須有一個(gè)SparkContext(不同的語(yǔ)言具體的類(lèi)名稱不同,Java則為JavaSparkContext)
         * SparkContext核心作用:初始化Spark應(yīng)用程序運(yùn)行所需要的核心組件,包括DAGScheduler、TaskScheduler、SchedulerBackEnd
         * 同時(shí)還會(huì)負(fù)責(zé)Spark程序往Master注冊(cè)程序等
         * SparkContext是整個(gè)Spark應(yīng)用程序中最為至關(guān)重要的一個(gè)對(duì)象 
         */
        JavaSparkContext sc = new JavaSparkContext(conf);
        /**
         * 第三步:根據(jù)具體的數(shù)據(jù)來(lái)源(HDFS、HBase、Local FS、DB、S3等)通過(guò)SparkContext來(lái)創(chuàng)建JavaRDD
         * JavaRDD的創(chuàng)建基本有三種方式:根據(jù)外部的數(shù)據(jù)來(lái)源(例如HDFS)、根據(jù)Scala集合、由其他JavaRDD操作
         * 數(shù)據(jù)會(huì)被JavaRDD劃分成為一系列的Partitions,分配到每個(gè)Partition的數(shù)據(jù)屬于一個(gè)Task的處理范疇
         */
        JavaRDD<String> lines = sc.textFile("F:/channel.txt",1);
        /**
         * 第四步:對(duì)初始的JavaRDD進(jìn)行Transformation級(jí)別的處理,例如map、filter等高階函數(shù)等的編程,來(lái)進(jìn)行具體的數(shù)據(jù)計(jì)算
         * 第4.1步:將每一行的字符串拆分成單個(gè)的單詞
         */
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>(){ //如果shiScala,由于SAM轉(zhuǎn)換,所以可以寫(xiě)成val words = lines.flatMap(_.split(" ")) 

            @Override
            public Iterable<String> call(String line) throws Exception {
                return Arrays.asList(line.split(" "));
            }
        });

        /**
         * 第4.2步:在單詞拆分的基礎(chǔ)上對(duì)每個(gè)單詞實(shí)例計(jì)數(shù)為1,也就是word => (word, 1)
         */
        JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {

            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                // TODO Auto-generated method stub
                return new Tuple2<String, Integer> (word, 1);
            }

        });
     /**
     * 第4.3步:在單詞實(shí)例計(jì)數(shù)為1基礎(chǔ)上,統(tǒng)計(jì)每個(gè)單詞在文件中出現(xiàn)的總次數(shù)
     */
        JavaPairRDD<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>(){
            //對(duì)相同的key,進(jìn)行Value的累加(包括Local和Reducer級(jí)別同時(shí)Reduce)
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                // TODO Auto-generated method stub
                return v1 + v2;
            }

        });


        wordsCount.foreach(new VoidFunction<Tuple2<String, Integer>>(){

            @Override
            public void call(Tuple2<String, Integer> pairs) throws Exception {
                // TODO Auto-generated method stub
                System.out.println(pairs._1 + " : " + pairs._2);
            }
        });

        sc.close();
    }

}

作業(yè):放在集群上跑

    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購(gòu)買(mǎi)等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶 評(píng)論公約

    類(lèi)似文章 更多