|
https://blog.csdn.net/ZMC921/article/details/74948786 由于工作需要,我用scala實(shí)現(xiàn)在已將有的目錄下面繼續(xù)寫入文件。需要重寫MultipleTextOutputFormat這個(gè)類,具體的請(qǐng)看下面代碼,需要交流可以聯(lián)系我 import java.text.SimpleDateFormat import java.util.Date import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat import org.apache.hadoop.mapred.{InvalidJobConfException, JobConf} import org.apache.hadoop.mapreduce.security.TokenCache import org.apache.spark.{HashPartitioner, SparkConf, SparkContext} import org.apache.spark.rdd.RDD; /** * 在HDFS目錄下繼續(xù)追加文件,不會(huì)覆蓋以前的文件 * Created by csw on 2017/6/23. */ object MultipleTextOutput { def main(args: Array[String]) { val filePath = "hdfs://master:9000/csw/tmp/data"; val savePath = "hdfs://master:9000/hxzj/mydata/tatol"; val conf = new SparkConf().setAppName("Spark shell") val sc = new SparkContext(conf) //讀取文件后,不進(jìn)行split操作,直接將整行內(nèi)容看作key, val rdd: RDD[(String, String)] = sc.textFile(filePath).map(x => (x, "")) //rdd必須是(key,value)形式的 RDD.rddToPairRDDFunctions(rdd).partitionBy(new HashPartitioner(1)).saveAsHadoopFile(savePath, classOf[String], classOf[String], classOf[RDDMultipleTextOutputFormat]) sc.stop() } /** * 自定義一個(gè)輸出文件類 */ case class RDDMultipleTextOutputFormat() extends MultipleTextOutputFormat[Any, Any] { val currentTime: Date = new Date() val formatter = new SimpleDateFormat("yyyy-MM-dd-HHmmss"); val dateString = formatter.format(currentTime); //自定義保存文件名 override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = { //key 和 value就是rdd中的(key,value),name是part-00000默認(rèn)的文件名 //保存的文件名稱,這里用字符串拼接系統(tǒng)生成的時(shí)間戳來區(qū)分文件名,可以自己定義 "HTLXYFY" + dateString } override def checkOutputSpecs(ignored: FileSystem, job: JobConf): Unit = { val name: String = job.get(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR) var outDir: Path = if (name == null) null else new Path(name) //當(dāng)輸出任務(wù)不等于0 且輸出的路徑為空,則拋出異常 if (outDir == null && job.getNumReduceTasks != 0) { throw new InvalidJobConfException("Output directory not set in JobConf.") } //當(dāng)有輸出任務(wù)和輸出路徑不為null時(shí) if (outDir != null) { val fs: FileSystem = outDir.getFileSystem(job) outDir = fs.makeQualified(outDir) outDir = new Path(job.getWorkingDirectory, outDir) job.set(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR, outDir.toString) TokenCache.obtainTokensForNamenodes(job.getCredentials, Array[Path](outDir), job) //下面的注釋掉,就不會(huì)出現(xiàn)這個(gè)目錄已經(jīng)存在的提示了 /* if (fs.exists(outDir)) { throw new FileAlreadyExistsException("Outputdirectory" + outDir + "alreadyexists"); } }*/ } } } } --------------------- 原文鏈接:https://blog.csdn.net/ZMC921/article/details/74948786 |
|
|