上一章说到,Hadoop Streaming能使用任何编程语言编写MR程序,使MR计算框架不再拘泥于Java这一单独的语言,但是这也有一定的局限性,比如说Hadoop Streaming需要有该编程语言的标准输入输出,对多路输出不够友好。一般reduce输出的文件格式为:part-00000,part-00001…,文件个数为reduce的任务个数。
但是有时候我们需要用到多路输出的需求,比如输出的数据可能一部分要作为下一个MR任务的输入文件,另一部分直接供下游任务抓取。那么就一定不能实现reduce多路输出吗,答案是否定的,我们可以使用 -outputformat org.apache.hadoop.mapred.lib.SuffixMultipleTextOutputFormat 或者 -outputformat
org.apache.hadoop.mapred.lib.SuffixMultipleSequenceFileOutputFormat指定使用带多输出的OutputFormat,前者对应于文本输入,后者于二进制输入。
简介
reduce输出文件格式一般为part-xxxxx-X文件,其中X是A-Z的字母之一,使用方式是在输出key,value对的时候,在value的后面追加”#X”两个字节后缀,后缀不同的key,value输出到不同的part-xxxxx-X文件,value的后缀”#X”在kv输入文件时会自动删除。
此时,需要特别注意的是,由于value之后 #X 才会识别为输出文件标示,而reduce的输出默认以”\t”分割key和value,因此,如果reduce的输出是一行中间没有”\t”,我们就需要在#X后缀前面添加”\t”使它变成value的一部分而不是key的一部分,也可以通过 -D stream.reduce.output.field.separator指定reduce输出分割符的类型。
题外话:-D stream.reduce.output.field.separator 和 -jobconf suffix.multiple.outputformat.separator 的区别
参数 |
含义 |
-D stream.reduce.output.field.separator |
reduce中key与value的分隔符 |
-jobconf suffix.multiple.outputformat.separator |
value与文件名的分割符,默认为“#”,如果value本身含有“#”,则可以通过该参数设置其他的分隔符 |
实例应用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| # run.sh ${HADOOP_BIN} streaming \ -input "${INPUT}" \ -output "${OUT_DIR}" \ -cacheArchive "${TOOL_DIR}/python2.7.2.tgz""#." \ -file "mapper_worker.sh" \ -file "reducer_worker.py" \ -mapper "python2.7.2/bin/python mapper_worker.sh" \ -reducer "python2.7.2/bin/python reducer_worker.py" \ -inputformat "org.apache.hadoop.mapred.TextInputFormat" \ -outputformat "org.apache.hadoop.mapred.lib.SuffixMultipleTextOutputFormat" \ -jobconf mapred.job.priority="NORMAL" \ -jobconf mapred.job.name="${TASK_NAME}" \ -jobconf mapred.map.tasks="${MAP_NUM}" \ -jobconf mapred.reduce.tasks="${REDUCE_NUM}" \ -jobconf mapred.max.split.size=134217728 \ -jobconf mapred.map.memory.limit="800" \ -jobconf mapred.reduce.memory.limit="500" \ -jobconf mapred.job.map.capacity="3500" \ -jobconf mapred.job.reduce.capacity="2000" \ -jobconf mapred.job.keep.files.hours=12 \ -jobconf mapred.max.map.failures.percent=1 \ -jobconf mapred.reduce.tasks.speculative.execution="false"
|
1 2 3 4 5
| # mapper_worker.py import sys
for line in sys.stdin: print line
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| # reducer_worker.py import sys
for line in sys.stdin: record = line.strip() fields = record.split('\t') if len(fields) != 7: continue vcpurl, playurl, title, poster, duration, pubtime, accept = fields duration = int(duration) pubtime = int(pubtime) accept = int(accept) if duration < 60: print '\t'.join(record) + "#A" # sys.stdout.write('%s#A\n' %(record)) elif duration < 300: print '\t'.join(record) + "#B" # sys.stdout.write('%s#B\n' %(record)) else: print '\t'.join(record) + "#C" # sys.stdout.write('%s#C\n' %(record))
|
参考文献:Hadoop Streaming实战: 多路输出
最后更新时间:
转载联系1571170555@qq.com