MapReduce是我们再进行离线大数据处理的时候经常要使用的计算模型,MapReduce的计算过程被封装的很好,我们只用通过使用Map和Reduce函数,再定义输入输出就能得到我们想要的结果。不过一般Map Reduce的输出只包含一个可视化输出文件(如下图part-r-00000文件),那么我们如果需要将这一个输出文件分为多个输出文件该怎么办呢?

一个输出
一个输出

这里就用到了MR框架中的MultipleOutputs函数(MultipleOutputs是2.0之后的新API,是对老版本中MultipleOutputs与MultipleOutputFormat的一个整合)。
先来看段完整代码熟悉一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class dateFormat {
static class MyMapper extends Mapper<LongWritable,Text,Text,Text>{
@Override
protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss");
String time = simpleDateFormat.format(new Date());
String line = value.toString().trim();
for (int i =0; i< line.length(); i++){
char newChar = line.charAt(i);
context.write(new Text(time), new Text(String.valueOf(newChar)));
}
}
}

static class MyReducer extends Reducer<Text,Text,Text,Text>{
private MultipleOutputs<Text, Text> multipleOutputs;

protected void setup(Context context){
multipleOutputs = new MultipleOutputs<Text, Text>(context);
}
@Override
protected void reduce(Text key, Iterable<Text> values,Context context)throws IOException, InterruptedException {
for (Text value : values) {
multipleOutputs.write((Text) null, value, key.toString());
}
}
protected void cleanup(Context context) throws IOException, InterruptedException{
multipleOutputs.close();
}
}

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(dateFormat.class);

job.setJobName("MultipleOutputTest");

job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
// MultipleOutputs.addNamedOutput(job, TextOutputFormat.class, NullWritable.class, Text.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
Path outPath = new Path(args[1]);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outPath)) {
fs.delete(outPath, true);
}
FileOutputFormat.setOutputPath(job, outPath);

job.waitForCompletion(true);
}
}

multipleOutputs.write的输出规范如下:
multipleOutputs.write((namedOutput,) key, value, baseOutputPath)
这里需要注意的一点在是map中的输出值会作为reduce中输出文件的文件命名(map.key-r-00000),即baseOutputPath的值确定了分组输出文件的命名规范。

namedOutput为可选项,为当前输出Job的名称,如果有多于一个multipleOutputs.write时,需指定namedOutput名称,并在Driver里添加 MultipleOutputs.addNamedOutput(job, “namedOutput” , TextOutputFormat.class, NullWritable.class, Text.class);作为作业的入口。

上述的代码段是我根据获取系统时间作为reducer的输入key值,命名时也根据map的执行时间进行命名,输出到同一文件夹之下,具体结果如下图:

分组输出到同一文件下
分组输出到同一文件下

如果需要将这些文件分别输出到不同的文件夹下,那么只需要将 multipleOutputs.write((Text) null, value, key.toString());中的key.toString()改为key+”/“,即改为multipleOutputs.write((Text) null, value, key+”/“); 即可,输出结果如下:

分组输出到不同文件夹下
分组输出到不同文件夹下