标签归档:inputformat

Hadoop : 一个目录下的数据只由一个map处理

有这么个需求:一个目录下的数据只能由一个map来处理。如果多个map处理了同一个目录下的数据会导致数据错乱。

刚开始google了下,以为网上都有现成的InputFormat,找到的答案类似我之前写的 “mapreduce job让一个文件只由一个map来处理“。

或者是把目录写在文件里面,作为输入:

/path/to/directory1
/path/to/directory2
/path/to/directory3

代码里面按行读取:


都不能满足需求,还是自己实现一个 OneMapOneDirectoryInputFormat 吧,也很简单:

这个InputFormat的具体使用方法就不说了。其实与“一个Hadoop程序的优化过程 – 根据文件实际大小实现CombineFileInputFormat”中的MultiFileInputFormat比较类似。

一个Hadoop程序的优化过程 – 根据文件实际大小实现CombineFileInputFormat

某日,接手了同事写的从Hadoop集群拷贝数据到另外一个集群的程序,该程序是运行在Hadoop集群上的job。这个job只有map阶段,读取hdfs目录下数据的数据,然后写入到另外一个集群。

显然,这个程序没有考虑大数据量的情况,如果输入目录下文件很多或数据量很大,就会导致map数很多。而实际上我们需要拷贝的一个数据源就有近6T,job启动起来有1w多个map,一下子整个queue的资源就占满了。虽然通过调整一些参数可以控制map数(也就是并发数),但是无法准确的控制map数,而且换个数据源又得重新配置参数。

第一个改进的版本是,加了Reduce过程,以期望通过设置Reduce数量来控制并发数。这样虽然能精确地控制并发数,但是增加了shuffle过程,实际运行中发现输入数据有倾斜(而partition的key由于业务需要无法更改),导致部分机器网络被打满,从而影响到了集群中的其他应用。即使通过 mapred.reduce.parallel.copies 参数来限制shuffle也是治标不治本。这个平白增加的shuffle过程实际上浪费了很多网络带宽和IO。

最理想的情况当然是只有map阶段,而且能够准确的控制并发数了。

于是,第二个优化版本诞生了。这个job只有map阶段,采用CombineFileInputFormat,它可以将多个小文件打包成一个InputSplit提供给一个Map处理,避免因为大量小文件问题,启动大量map。通过 mapred.max.split.size 参数可以大概地控制并发数。本以为这样就能解决问题了,结果又发现了数据倾斜的问题。这种粗略地分splits的方式,导致有的map处理的数据少,有的map处理的数据多,并不均匀。几个拖后退的map就导致job的实际运行时间长了一倍多。

看来只有让每个map处理的数据量一样多,才能完美的解决这个问题了。

第三个版本也诞生了,这次是重写了CombineFileInputFormat,自己实现getSplits方法。由于输入数据为SequenceFile格式,因此需要一个SequenceFileRecordReaderWrapper类。

实现代码如下:
CustomCombineSequenceFileInputFormat.java

MultiFileInputFormat.java

通过 multifileinputformat.max_split_num 参数就可以较为准确的控制map数量,而且会发现每个map处理的数据量很均匀。至此,问题总算解决了。

mapreduce job让一个文件只由一个map来处理

有一批数据用hadoop mapreduce job处理时,业务特点要求一个文件对应一个map来处理,如果两个或多个map处理了同一个文件,可能会有问题。开始想通过设置 dfs.blocksize 或者 mapreduce.input.fileinputformat.split.minsize/maxsize 参数来控制map的个数,后来想到其实不用这么复杂,在自定义的InputFormat里面直接让文件不要进行split就可以了。

这样,输入文件有多少个,job就会启动多少个map了。

hadoop用MultipleInputs/MultiInputFormat实现一个mapreduce job中读取不同格式的文件

hadoop中提供了 MultiOutputFormat 能将结果数据输出到不同的目录,也提供了 FileInputFormat 来一次读取多个目录的数据,但是默认一个job只能使用 job.setInputFormatClass 设置使用一个inputfomat处理一种格式的数据。如果需要实现 在一个job中同时读取来自不同目录的不同格式文件 的功能,就需要自己实现一个 MultiInputFormat 来读取不同格式的文件了(原来已经提供了MultipleInputs)。

例如:有一个mapreduce job需要同时读取两种格式的数据,一种格式是普通的文本文件,用 LineRecordReader 一行一行读取;另外一种文件是伪XML文件,用自定义的AJoinRecordReader读取。

自己实现了一个简单的 MultiInputFormat 如下:

其实原理很简单,就是在 createRecordReader 的时候,通过 ((FileSplit) split).getPath().toString() 获取到当前要处理的文件名,然后根据特征匹配,选取对应的 RecordReader 即可。xml_prefix和text_prefix可以在程序启动时通过 -D 传给Configuration。

比如某次执行打印的值如下:

这里只是通过简单的文件路径和标示符匹配来做,也可以采用更复杂的方法,比如文件名、文件后缀等。

接着在map类中,也同样可以根据不同的文件名特征进行不同的处理:


这种方式太土了,原来hadoop里面已经提供了 MultipleInputs 来实现对一个目录指定一个inputformat和对应的map处理类。

hadoop mapreduce和hive中使用SequeceFile+lzo格式数据

一.  hadoop中常见的压缩和解压数据格式

随着处理的数据量越来越大,在HDFS中存储压缩数据,不仅能节省集群的存储空间,而且可以减少磁盘io。对于跨集群数据传输来说,更能节约网络带宽。常见的用于hadoop的压缩文件格式有:

压缩格式 工具 算法 文件扩展名 多文件 可分割性
DEFLATE* DEFLATE .deflate
Gzip gzip DEFLATE .gz
ZIP zip DEFLATE .zip 是,在文件范围内
bzip2 bzip2 bzip2 .bz2
LZO lzop LZO .lzo

gzip和ZIP是比较通用的压缩方式,在空间和时间的处理上比较平衡。

bzip2的压缩比gzip或ZIP更有效,但是速度较慢,会使job的瓶颈转移到CPU上。

lzo的压缩和解压速度比gzip或ZIP更快,但是压缩效率偏低。

总的来说,压缩比:bzip2 gzip lzo,速度:lzo gzip bzip2

除了采用这几种压缩格式,SequeceFile也是一种比较好的选择。SequeceFile是Hadoop API提供的一种二进制文件支持。这种二进制文件直接将对序列化到文件中。一般对小文件可以使用这种文件合并,即将文件名作为key,文件内容作为value序列化到大文件中。这种文件格式有以下好处:… --> 阅读全文