小男孩‘自慰网亚洲一区二区,亚洲一级在线播放毛片,亚洲中文字幕av每天更新,黄aⅴ永久免费无码,91成人午夜在线精品,色网站免费在线观看,亚洲欧洲wwwww在线观看

分享

Spark計(jì)算結(jié)果繼續(xù)追加在HDFS目錄下,不會(huì)覆蓋之前的文件

 jasonbetter 2019-08-06

https://blog.csdn.net/ZMC921/article/details/74948786


由于工作需要,我用scala實(shí)現(xiàn)在已將有的目錄下面繼續(xù)寫入文件。需要重寫MultipleTextOutputFormat這個(gè)類,具體的請(qǐng)看下面代碼,需要交流可以聯(lián)系我

import java.text.SimpleDateFormat

import java.util.Date

import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

import org.apache.hadoop.mapred.{InvalidJobConfException, JobConf}

import org.apache.hadoop.mapreduce.security.TokenCache

import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

import org.apache.spark.rdd.RDD;

/**

  * 在HDFS目錄下繼續(xù)追加文件,不會(huì)覆蓋以前的文件

  * Created by csw on 2017/6/23.

  */

object MultipleTextOutput {

  def main(args: Array[String]) {

    val filePath = "hdfs://master:9000/csw/tmp/data";

    val savePath = "hdfs://master:9000/hxzj/mydata/tatol";

    val conf = new SparkConf().setAppName("Spark shell")

    val sc = new SparkContext(conf)

    //讀取文件后,不進(jìn)行split操作,直接將整行內(nèi)容看作key,

    val rdd: RDD[(String, String)] = sc.textFile(filePath).map(x => (x, ""))

    //rdd必須是(key,value)形式的

    RDD.rddToPairRDDFunctions(rdd).partitionBy(new HashPartitioner(1)).saveAsHadoopFile(savePath, classOf[String], classOf[String], classOf[RDDMultipleTextOutputFormat])

    sc.stop()

  }

  /**

    * 自定義一個(gè)輸出文件類

    */

  case class RDDMultipleTextOutputFormat() extends MultipleTextOutputFormat[Any, Any] {

    val currentTime: Date = new Date()

    val formatter = new SimpleDateFormat("yyyy-MM-dd-HHmmss");

    val dateString = formatter.format(currentTime);

    //自定義保存文件名

    override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = {

      //key 和 value就是rdd中的(key,value),name是part-00000默認(rèn)的文件名

      //保存的文件名稱,這里用字符串拼接系統(tǒng)生成的時(shí)間戳來區(qū)分文件名,可以自己定義

      "HTLXYFY" + dateString

    }

    override def checkOutputSpecs(ignored: FileSystem, job: JobConf): Unit = {

      val name: String = job.get(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR)

      var outDir: Path = if (name == null) null else new Path(name)

      //當(dāng)輸出任務(wù)不等于0 且輸出的路徑為空,則拋出異常

      if (outDir == null && job.getNumReduceTasks != 0) {

        throw new InvalidJobConfException("Output directory not set in JobConf.")

      }

      //當(dāng)有輸出任務(wù)和輸出路徑不為null時(shí)

      if (outDir != null) {

        val fs: FileSystem = outDir.getFileSystem(job)

        outDir = fs.makeQualified(outDir)

        outDir = new Path(job.getWorkingDirectory, outDir)

        job.set(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR, outDir.toString)

        TokenCache.obtainTokensForNamenodes(job.getCredentials, Array[Path](outDir), job)

        //下面的注釋掉,就不會(huì)出現(xiàn)這個(gè)目錄已經(jīng)存在的提示了

        /* if (fs.exists(outDir)) {

             throw new FileAlreadyExistsException("Outputdirectory"

                     + outDir + "alreadyexists");

         }

      }*/

      }

    }

  }

}

--------------------- 

原文鏈接:https://blog.csdn.net/ZMC921/article/details/74948786

    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶 評(píng)論公約

    類似文章 更多