上一章说到,Hadoop Streaming能使用任何编程语言编写MR程序,使MR计算框架不再拘泥于Java这一单独的语言,但是这也有一定的局限性,比如说Hadoop Streaming需要有该编程语言的标准输入输出,对多路输出不够友好。一般reduce输出的文件格式为:part-00000,part-00001…,文件个数为reduce的任务个数。

但是有时候我们需要用到多路输出的需求,比如输出的数据可能一部分要作为下一个MR任务的输入文件,另一部分直接供下游任务抓取。那么就一定不能实现reduce多路输出吗,答案是否定的,我们可以使用 -outputformat org.apache.hadoop.mapred.lib.SuffixMultipleTextOutputFormat 或者 -outputformat
org.apache.hadoop.mapred.lib.SuffixMultipleSequenceFileOutputFormat
指定使用带多输出的OutputFormat,前者对应于文本输入,后者于二进制输入。

简介

reduce输出文件格式一般为part-xxxxx-X文件,其中X是A-Z的字母之一,使用方式是在输出key,value对的时候,在value的后面追加”#X”两个字节后缀,后缀不同的key,value输出到不同的part-xxxxx-X文件,value的后缀”#X”在kv输入文件时会自动删除。

此时,需要特别注意的是,由于value之后 #X 才会识别为输出文件标示,而reduce的输出默认以”\t”分割key和value,因此,如果reduce的输出是一行中间没有”\t”,我们就需要在#X后缀前面添加”\t”使它变成value的一部分而不是key的一部分,也可以通过 -D stream.reduce.output.field.separator指定reduce输出分割符的类型。

题外话:-D stream.reduce.output.field.separator 和 -jobconf suffix.multiple.outputformat.separator 的区别

参数 含义
-D stream.reduce.output.field.separator reduce中key与value的分隔符
-jobconf suffix.multiple.outputformat.separator value与文件名的分割符,默认为“#”,如果value本身含有“#”,则可以通过该参数设置其他的分隔符

实例应用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# run.sh
${HADOOP_BIN} streaming \
-input "${INPUT}" \
-output "${OUT_DIR}" \
-cacheArchive "${TOOL_DIR}/python2.7.2.tgz""#." \
-file "mapper_worker.sh" \
-file "reducer_worker.py" \
-mapper "python2.7.2/bin/python mapper_worker.sh" \
-reducer "python2.7.2/bin/python reducer_worker.py" \
-inputformat "org.apache.hadoop.mapred.TextInputFormat" \
-outputformat "org.apache.hadoop.mapred.lib.SuffixMultipleTextOutputFormat" \
-jobconf mapred.job.priority="NORMAL" \
-jobconf mapred.job.name="${TASK_NAME}" \
-jobconf mapred.map.tasks="${MAP_NUM}" \
-jobconf mapred.reduce.tasks="${REDUCE_NUM}" \
-jobconf mapred.max.split.size=134217728 \
-jobconf mapred.map.memory.limit="800" \
-jobconf mapred.reduce.memory.limit="500" \
-jobconf mapred.job.map.capacity="3500" \
-jobconf mapred.job.reduce.capacity="2000" \
-jobconf mapred.job.keep.files.hours=12 \
-jobconf mapred.max.map.failures.percent=1 \
-jobconf mapred.reduce.tasks.speculative.execution="false"
1
2
3
4
5
# mapper_worker.py
import sys

for line in sys.stdin:
print line
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# reducer_worker.py
import sys

for line in sys.stdin:
record = line.strip()
fields = record.split('\t')
if len(fields) != 7:
continue
vcpurl, playurl, title, poster, duration, pubtime, accept = fields
duration = int(duration)
pubtime = int(pubtime)
accept = int(accept)
if duration < 60:
print '\t'.join(record) + "#A"
# sys.stdout.write('%s#A\n' %(record))
elif duration < 300:
print '\t'.join(record) + "#B"
# sys.stdout.write('%s#B\n' %(record))
else:
print '\t'.join(record) + "#C"
# sys.stdout.write('%s#C\n' %(record))

参考文献:Hadoop Streaming实战: 多路输出