刚来公司的时候就发现大家都在使用Hadoop Streaming 来执行任务,当时还在想名字里有Streaming,Hadoop也拿来做实时计算工具了吗?后来发现原来只是我学得太少,对编程工具的理解太片面了。

那么接下来就来看看什么是Hadoop Streaming,为什么要使用Hadoop Streaming、Hadoop Streaming的原理以及怎么使用Hadoop Streaming。

什么是Hadoop Streaming

Hadoop Streaming是Hadoop提供的一个编程工具,由Hadoop提供。

为什么使用Hadoop Streaming

之前习惯了使用Hadoop框架来进行数据处理,Hadoop框架是用Java语言写的,也就是说如果想用Hadoop框架来执行MR任务,那么需要开发者会Java编程语言,这样子让不会Java语言的开发者怎么办?重新学习一门新语言吗,好像也不是不行(-,但是太麻烦啦,时间成本很高,因此Hadoop Streaming就出现啦!

Hadoop Streaming允许用户使用任何程序语言来编写mapreduce里的Mapper 和 Reducer函数,无论你会不会Java,只要会某种编程语言,都能通过Hadoop Streaming来编写MR程序(公司用的都是Python,为了合群只能先学习Python!)

Hadoop Streaming 局限性

标准输入输出

上面说到Hadoop Streaming支持任何语言来编写MR程序,这固然对开发者是十分友好的,但是它也有一定的局限性,即map/reduce函数的数据流必须遵循相应编程语言的标准输入输出(stdin、stdout),用什么编程语言实现业务逻辑,就必须要通过该语言的标准输入stdin读取数据,通过该语言的标准输出stdout输出数据。比方说如下的几种编程语言的输入输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# python标准输入输出
import sys
for line in sys.stdin:
后续操作

print (输出内容)

# shell标准输入输出
while read LINE; do
后续操作

echo "输出内容"

# c语言标准输入输出
char buffer[BUF_SIZE];
while(fgets(buffer, BUF_SIZE - 1, stdin)){
int len = strlen(buffer);
后续操作
}

printf(输出内容)

...

这种规定标准的输入输出,不能方便地处理多路输出

处理文本数据

Hadoop Streaming默认只能处理文本数据Textfile,而对于二进制数据,较好的方法是将二进制的key、value进行base64编码,转化为文本在进行操作。

多余的开销

用Java编写的MR程序直接处理框架从输入数据中得到的key/value对,而在Hadoop Streaming中Java程序不直接处理kv对,而是通过管道写到mapper的标准输入,mapper程序再从kv中解析出kv对,这个过程多了两次数据拷贝和解析(分割),这会带来一定的开销。同理,对于reducer也一样。

Hadoop Streaming 原理

image
image

Streaming原理是用Java实现一个包装用户程序的MR程序,该程序负责调用MapReduce Java接口获取key/value键值对输入,创建一个新的进程启动包装的用户程序,将数据通过管道传递给包装的用户程序处理,然后调用MapReduce Java接口将用户程序的输出切分成kv对输出。

如上图所示,Streaming Java Mapper通过管道将key/value输入传递给用户mapper的标准输入,并获取mapper的标准输出;Streaming Java Reducer调用Java接口通过InputFormat从HDFS获取输入数据,从管道将kv传递给用户reducer程序的标准输入,获取reducer的标准输出并调用Java接口通过OutputFormat输出数据。

shuffle和sort阶段,和一般的MapReduce作业流程一样,经过此阶段的操作然后到达Reducer。

怎么使用Hadoop Streaming

参数说明:

  • -input :指定作业输入,path可以是文件或者目录,可以使用*通配符,-input选项可以使用多次指定多个文件或目录作为输入
  • -output :指定作业输出目录,path必须不存在,而且执行作业的用户必须有创建该目录的权限,-output只能使用一次
  • -mapper:指定mapper可执行程序,必须指定且唯一
  • -reducer:指定reducer可执行程序,必须指定且唯一
  • -file, -cacheFile, -cacheArchive:分别用于向计算节点分发本地文件、HDFS文件和HDFS压缩文件
  • -numReduceTasks:指定reducer的个数,如果设置-numReduceTasks 0或者-reducer NONE则没有reducer程序,mapper的输出直接作为整个作业的输出
  • -combiner:指定combiner Java类,对应的Java类文件打包成jar文件后用-file分发
  • -inputformat, -outputformat:指定inputformat和outputformat Java类,用于读取输入数据和写入输出数据,分别要实现InputFormat和OutputFormat接口。如果不指定,默认使用TextInputFormat和TextOutputFormat
  • -partitioner:指定partitioner Java类,Streaming提供了一些实用的partitioner实现,参考KeyBasedFiledPartitoner和IntHashPartitioner
  • -cmdenv NAME=VALUE:给mapper和reducer程序传递额外的环境变量,NAME是变量名,VALUE是变量值
  • -mapdebug, -reducedebug:分别指定mapper和reducer程序失败时运行的debug程序
  • -verbose:指定输出详细信息,例如分发哪些文件,实际作业配置参数值等,可以用于调试
  • -jobconf || -D NAME=VALUE :指定作业参数,NAME是参数名,VALUE是参数值,可以指定的参数参考hadoop-default.xml。作业参数详解:
配置参数 参数详情
mapred.job.name 作业名
mapred.job.priority 作业优先级
mapred.job.map.capacity 最多同时运行map任务数
mapred.job.reduce.capacity 最多同时运行reduce任务数
hadoop.job.ugi 作业执行权限
mapred.map.tasks map任务个数
mapred.reduce.tasks reduce任务个数
mapred.job.groups 作业可运行的计算节点分组
mapred.task.timeout 任务没有响应(输入输出)的最大时间
mapred.compress.map.output map的输出是否压缩
mapred.map.output.compression.codec map的输出压缩方式
mapred.output.compress reduce的输出是否压缩
mapred.output.compression.codec reduce的输出压缩方式
stream.map.output.field.separator map输出分隔符

这其中特别建议用-jobconf mapred.job.name=’My Job Name’设置作业名,使用-jobconf mapred.job.priority=VERY_HIGH | HIGH | NORMAL | LOW | VERY_LOW设置作业优先级,使用-jobconf mapred.job.map.capacity=M设置同时最多运行M个map任务,使用-jobconf mapred.job.reduce.capacity=N设置同时最多运行N个reduce任务。