小男孩‘自慰网亚洲一区二区,亚洲一级在线播放毛片,亚洲中文字幕av每天更新,黄aⅴ永久免费无码,91成人午夜在线精品,色网站免费在线观看,亚洲欧洲wwwww在线观看

分享

[MapReduce]如何向map和reduce腳本傳遞參數(shù),加載文件和目錄《轉(zhuǎn)載》

 猩猩喜歡吃香蕉 2016-01-15
 本文主要講解三個(gè)問題:
      1 使用Java編寫MapReduce程序時(shí),如何向map、reduce函數(shù)傳遞參數(shù)。
      2 使用Streaming編寫MapReduce程序(C/C++, Shell, Python)時(shí),如何向map、reduce腳本傳遞參數(shù)。
      3 使用Streaming編寫MapReduce程序(C/C++, Shell, Python)時(shí),如何向map、reduce腳本傳遞文件或文件夾。
         (1) streaming 加載本地單個(gè)文件
         (2) streaming 加載本地多個(gè)文件
         (3) streaming 加載本地目錄
         (4) streaming編程時(shí)在mapreduce腳本中讀 hdfs 文件
         (5) streaming編程時(shí)在mapreduce腳本中讀 hdfs 目錄

1.  Java編寫MapReduce程序時(shí),如何向map、reduce函數(shù)傳遞參數(shù)
我開始使用如下方式進(jìn)行傳遞.
在主類中聲明兩個(gè)靜態(tài)變量, 然后在 main 函數(shù)中給變量賦值, 試圖在 map和reduce函數(shù)中獲得變量的值。
代碼結(jié)構(gòu)類似如下:
[MapReduce] <wbr>如何向map和reduce腳本傳遞參數(shù),加載文件和目錄《轉(zhuǎn)載》
提交到集群運(yùn)行發(fā)現(xiàn)在 map 和 reduce函數(shù)中, 靜態(tài)變量MaxScore的值始終是初值1。
于是試圖在主類的靜態(tài)區(qū)中給變量賦值 (因?yàn)殪o態(tài)區(qū)中的代碼比main中的代碼要先執(zhí)行), 仍是不成功, MaxScore的值始終是初值1。
將上述代碼在 單機(jī)hadoop上運(yùn)行, 結(jié)果正常, map 函數(shù)中能獲得變量的值。
思考是這個(gè)原因: 在提交作業(yè)到hadoop集群后,mapper類和reducer類就到各個(gè) tasktracker上去運(yùn)行了, 與主類獨(dú)立, 不能交互。
因此,上述往 map 和 reduce 函數(shù)傳參數(shù)的方法實(shí)在太天真。
于是想到其它一些方法: 例如將參數(shù)寫入hdfs文件中, 然后在 mapper 和 reducer 類的 run方法中讀取文件, 并將值讀到相應(yīng)變量,這是可行的,但是方法較復(fù)雜,代碼如下:
[MapReduce] <wbr>如何向map和reduce腳本傳遞參數(shù),加載文件和目錄《轉(zhuǎn)載》
上述方法盡管可用, 但是不是常規(guī)方法, 下面介紹常用的方法:
(1) 通過 Configuration 來傳遞參數(shù)
在main函數(shù)中調(diào)用set方法設(shè)置參數(shù), 例如:
[MapReduce] <wbr>如何向map和reduce腳本傳遞參數(shù),加載文件和目錄《轉(zhuǎn)載》
在mapper中通過上下文context來獲取當(dāng)前作業(yè)的配置, 并獲取參數(shù), 例如:
[MapReduce] <wbr>如何向map和reduce腳本傳遞參數(shù),加載文件和目錄《轉(zhuǎn)載》
注: context 很有用, 能獲取當(dāng)前作業(yè)的大量信息,例如上面就獲取了任務(wù)ID.

(2) 利用org.apache.hadoop.io.DefaultStringifier類

示例:

main中:

    Configuration conf = new Configuration();

    Text maxscore = new Text("12989");

    DefaultStringifier.store(conf, maxscore ,"maxscore");

這樣,Text對象maxscore就以“maxscore”作為key存儲在conf對象中了,然后在map和reduce函數(shù)中調(diào)用load的方法便可以把對象讀出。

mapper獲取:

    Configuration conf = context.getConfiguration()

    Text out = DefaultStringifier.load(conf, "maxscore", Text.class);

   需要說明的是,這個(gè)需要傳遞的對象必須要先實(shí)現(xiàn)序列化的接口,Hadoop的序列化是通過Writable接口來實(shí)現(xiàn)的

(2) 參考自:http://blog.sina.com.cn/s/blog_6b7cf18f0100x9jg.html


2.  編寫 Streaming 程序時(shí),如何向map、reduce函數(shù)傳遞參數(shù)

可以通過 streaming 的 cmdenv 選項(xiàng)設(shè)置環(huán)境變量,然后在 map 和 reduce 腳本中獲取環(huán)境變量。

 

可參考 << hadoop streaming 高級編程 >>

http:///mapreduce/hadoop-streaming-advanced-programming/

 

(0) 作業(yè)提交腳本:

#!/usr/bin/env bash

max_read_count=${array[0]}

min_read_count=${array[1]}

max_write_count=${array[2]}

min_write_count=${array[3]}


hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-0.20.2-streaming.jar \

  -D mapred.reduce.tasks=1   

  -input $input \

  -output $output  \

  -mapper $mapper_script \

  -file $map_file  \

  -reducer $reducer_script \

  -file $reduce_file \

  -cmdenv "max_read_count=${array[0]}" \      # 設(shè)置環(huán)境變量   max_read_count .   

  -cmdenv "min_read_count=${array[1]}" \      多個(gè)變量時(shí)請多次使用 -cmdenv   

  -cmdenv "max_write_count=${array[2]}" \

  -cmdenv "min_write_count=${array[3]}"  

(1) Python mapper.py

#!/usr/bin/env python

import sys

import os


min_r_count = float(os.environ.get('min_read_count')) # get environment variables.

max_r_count = float(os.environ.get('max_read_count'))

min_w_count = float(os.environ.get('min_write_count'))

max_w_count = float(os.environ.get('max_write_count'))


(2)Shell mapper.sh

 

#!/usr/bin/env bash 

while read line  # 讀入行

do

   a=$line  

done

echo $min_read_count $max_read_count  get environment variables.


(3)C/C++ mapper.c

 

#include

#include

int main(int argc, char *argv[], char *env[])

{

  double min_r_count;

  int i = 0;

  for (i = 0; env[i] != NULL; i++) // env[i] 存儲了環(huán)境變量, 每項(xiàng)的值為此種形式: PATH=******, 所以需要截取變量值

 

     if( strstr(env[i], "PATH=") ) {

       char *p =NULL;

       p = strstr(env[i], "=");

       if( (p-env[i]) == 4 )

         printf("%s\n", ++p); // 獲取 PATH 環(huán)境變量

     }

      if( strstr(env[i], "min_write_count=") ) {

 

       char *p =NULL;

       p = strstr(env[i], "=");

       if( (p-env[i]) == strlen("min_write_count") )

         printf("%s\n", ++p); // 獲取  min_write_count  環(huán)境變量

     }

 

  }

  char eachLine[200]={0};

  while(fgets(eachLine, 199, stdin)) // read line from stdin

  {

     printf("%s", eachLine);

  }

}

 

 


注意:
    Hadoop執(zhí)行命令時(shí)的選項(xiàng)有順序的, 順序是 bin/hadoop command [genericOptions] [commandOptions]. 
    對于streaming, -D 屬于genericOptions, 即hadoop的通用選項(xiàng), 所以必須寫在前面.
    Streaming 的所有選項(xiàng)可以參考:  
    hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-0.20.2-streaming.jar -info

3.  編寫 Streaming 程序時(shí),如何向map、reduce函數(shù)傳遞文件或文件夾。

(1) streaming 加載本地單個(gè)文件

streaming 支持 -file 選項(xiàng), 可以把 -file 后面的本地文件(注意是本地文件)打包成作業(yè)提交的一部分, 即打包到作業(yè)的jar文件當(dāng)中, 這樣在mapreduce腳本中就可以像訪問本地文件一樣訪問打包的文件了.

實(shí)例:

作業(yè)提交文件 run.sh

 

[MapReduce] <wbr>如何向map和reduce腳本傳遞參數(shù),加載文件和目錄《轉(zhuǎn)載》

mapper.py

[MapReduce] <wbr>如何向map和reduce腳本傳遞參數(shù),加載文件和目錄《轉(zhuǎn)載》

注意:在提交作業(yè)時(shí)使用的是 -file logs/wbscoretest.log 指定需要加載的文件. 在 map 腳本中只需要直接讀取文件 wbscoretest.log 即可, 不需要寫 logs/wbscoretest.log, 因?yàn)橹患虞d了文件 wbscoretest.log, 而不會加載 logs 目錄和 

wbscoretest.log 文件.

(2) streaming 加載本地多個(gè)文件 

[MapReduce] <wbr>如何向map和reduce腳本傳遞參數(shù),加載文件和目錄《轉(zhuǎn)載》

 

(3) streaming 加載本地目錄 ( 若加載多個(gè)目錄,用逗號隔開,-files dir1, dir2, dir3 )

使用streaming的 -file 選項(xiàng)不能加載本地目錄, 我實(shí)驗(yàn)是如此.

我們可以使用 hadoop 的通用選項(xiàng) -files 來加載本地目錄, 加載成功后在mapreduce腳本中可以像訪問本地目錄一樣訪問加載的目錄.

實(shí)際應(yīng)用中,我們在編寫 分詞MapReduce作業(yè)時(shí)需要加載分詞詞典,就使用該方法.

作業(yè)提交腳本:  

[MapReduce] <wbr>如何向map和reduce腳本傳遞參數(shù),加載文件和目錄《轉(zhuǎn)載》

map 腳本: 讀取目錄下的文件.

[MapReduce] <wbr>如何向map和reduce腳本傳遞參數(shù),加載文件和目錄《轉(zhuǎn)載》


加載多個(gè)目錄:

[MapReduce] <wbr>如何向map和reduce腳本傳遞參數(shù),加載文件和目錄《轉(zhuǎn)載》


注意:多個(gè)目錄之間用逗號隔開,且不能有空格,否則會出錯(cuò),這個(gè)限制太蛋疼了。

例如:[MapReduce] <wbr>如何向map和reduce腳本傳遞參數(shù),加載文件和目錄《轉(zhuǎn)載》

 

(4) streaming編程時(shí)在mapreduce腳本中讀 hdfs 文件

使用 -files 選項(xiàng), 后面跟需要讀的 hdfs 文件路徑. 這樣在 mapreduce 腳本中就可以直接通過文件名來訪問該文件.

作業(yè)提交腳本:

[MapReduce] <wbr>如何向map和reduce腳本傳遞參數(shù),加載文件和目錄《轉(zhuǎn)載》

map腳本: 

[MapReduce] <wbr>如何向map和reduce腳本傳遞參數(shù),加載文件和目錄《轉(zhuǎn)載》

如果需要加載大文件, 我們可以將文件先上傳到 hdfs 中, 然后在 mapreduce 腳本中讀取 hdfs 文件.

 

 

 

(5) streaming編程時(shí)在mapreduce腳本中讀 hdfs 目錄

使用 -files 選項(xiàng), 后面跟需要讀的 hdfs 目錄. 這樣在 mapreduce 腳本中就可以像訪問本地目錄一樣訪問該目錄.

作業(yè)提交腳本:

[MapReduce] <wbr>如何向map和reduce腳本傳遞參數(shù),加載文件和目錄《轉(zhuǎn)載》

map腳本:  直接讀取 tmp_kentzhan 目錄.

[MapReduce] <wbr>如何向map和reduce腳本傳遞參數(shù),加載文件和目錄《轉(zhuǎn)載》

    本站是提供個(gè)人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多