小男孩‘自慰网亚洲一区二区,亚洲一级在线播放毛片,亚洲中文字幕av每天更新,黄aⅴ永久免费无码,91成人午夜在线精品,色网站免费在线观看,亚洲欧洲wwwww在线观看

分享

大數(shù)據(jù)IMF傳奇行動(dòng)絕密課程第19課:Spark高級(jí)排序徹底解密

 看風(fēng)景D人 2019-02-24

基礎(chǔ)排序算法實(shí)戰(zhàn)
二次排序算法實(shí)戰(zhàn)
更高級(jí)排序算法
排序算法內(nèi)幕解密

sc.setLogLevel("WARN")

基礎(chǔ)排序算法:

sc.textFile().flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_, 1).map(pair => (pair._2, pair._1)).sortByKey(false).map(pair => (pair._2, pair._1)).collect

所謂二次排序,就是指,排序的時(shí)候考慮兩個(gè)維度

2 3
4 1
3 2
4 3
9 7
2 1

構(gòu)造器要有val,因?yàn)橐鰝€(gè)成員
Scala實(shí)現(xiàn)

package com.tom.spark

import org.apache.spark.{SparkConf, SparkContext}

class SecondarySortKey(val first: Int, val second: Int) extends Ordered[SecondarySortKey] with Serializable{
  override def compare(other: SecondarySortKey): Int = {
    if(this.first - other.first != 0) {this.first - other.first}
    else {this.second - other.second}
  }
}

object SecondarySortKey {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("SecondarySortKey").setMaster("local")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("F:/helloSpark2.txt")
    val pairWithSortKey = lines.map(line => {
      (new SecondarySortKey(line.split(" ")(0).toInt, line.split(" ")(1).toInt), line)
    }
    )
    val sorted = pairWithSortKey.sortByKey()

    val sortedResult = sorted.map(pair => pair._2)

    sortedResult.collect().foreach(println)
  }
}

java實(shí)現(xiàn)

/**
 * SecondarySortKey.java
 */
package com.tom.spark.SparkApps.cores;

import java.io.Serializable;

import scala.math.Ordered;

/**
 * 自定義二次排序的Key
 */
public class SecondarySortKey implements Ordered<SecondarySortKey>, Serializable{

    //需要二次排序的Key
    private int first;
    private int second;

    //二次排序的公開構(gòu)造器
    public SecondarySortKey(int first, int second) {
        this.first = first;
        this.second = second;
    }


    public int getFirst() {
        return first;
    }


    public void setFirst(int first) {
        this.first = first;
    }


    public int getSecond() {
        return second;
    }


    public void setSecond(int second) {
        this.second = second;
    }


    public boolean $greater(SecondarySortKey other) {
        // TODO Auto-generated method stub
        if(this.first > other.getSecond())
            return true;
        else if(this.first == other.getFirst() && this.second > other.getSecond())
            return true;
        else return false;
    }
    public boolean $greater$eq(SecondarySortKey other) {
        // TODO Auto-generated method stub
        if($greater(other))
            return true;
        else if ( this.first == other.getFirst() && this.second == other.second)
            return true;
        else return false;
    }
    public boolean $less(SecondarySortKey other) {
        // TODO Auto-generated method stub
        return !$greater$eq(other);
    }
    public boolean $less$eq(SecondarySortKey other) {
        // TODO Auto-generated method stub
        return !$greater(other);
    }
    public int compare(SecondarySortKey other) {
        // TODO Auto-generated method stub
        if(this.first != other.getFirst())
            return this.first - other.getFirst();
        else return this.second - other.getSecond();
    }
    public int compareTo(SecondarySortKey other) {
        // TODO Auto-generated method stub
        if(this.first != other.getFirst())
            return this.first - other.getFirst();
        else return this.second - other.getSecond();
    }

    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + first;
        result = prime * result + second;
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj)
            return true;
        if (obj == null)
            return false;
        if (getClass() != obj.getClass())
            return false;
        SecondarySortKey other = (SecondarySortKey) obj;
        if (first != other.first)
            return false;
        if (second != other.second)
            return false;
        return true;
    }


    /**
     * @param args
     */
    public static void main(String[] args) {
        // TODO Auto-generated method stub

    }
}
/**
 * SecondarySortKeyApp.java
 */
package com.tom.spark.SparkApps;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

import com.tom.spark.SparkApps.cores.SecondarySortKey;

/**
 * 二次排序,具體實(shí)現(xiàn)步驟:
 * 第一步:按照Ordered和Serializable接口實(shí)現(xiàn)自定義排序的Key
 * 第二步:將要排序的二次排序的文件加載進(jìn)<Key, Value>類型的RDD
 * 第三步:使用sortByKey基于自定義的Key進(jìn)行二次排序
 * 第四步:去除掉排序的Key,只保留排序后的結(jié)果
 *
 */
public class SecondarySortKeyApp {

    /**
     * @param args
     */
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        SparkConf conf = new SparkConf().setAppName("SecondarySortKeyApp").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> line = sc.textFile("F:/helloSpark2.txt",1);
        JavaPairRDD<SecondarySortKey, String> pairs = line.mapToPair(new PairFunction<String, SecondarySortKey, String>() {

            public Tuple2<SecondarySortKey, String> call(String line)
                    throws Exception {
                // TODO Auto-generated method stub

                return new Tuple2<SecondarySortKey, String>(new SecondarySortKey(Integer.valueOf(line.split(" ")[0]), Integer.valueOf(line.split(" ")[1])), line);
            }           
        });
        JavaPairRDD<SecondarySortKey, String> sortedPairs = pairs.sortByKey(false); //完成二次排序

        //過濾掉排序后自定的Key,保留排序的結(jié)果
        JavaRDD<String> values = sortedPairs.map(new Function<Tuple2<SecondarySortKey,String>, String>() {

            public String call(Tuple2<SecondarySortKey, String> pair)
                    throws Exception {
                // TODO Auto-generated method stub
                return pair._2;
            }
        });
        values.foreach(new VoidFunction<String>() {

            public void call(String line) throws Exception {
                // TODO Auto-generated method stub
                System.out.println(line);
            }
        });
        sc.close();
    }
}

    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶 評(píng)論公約

    類似文章 更多