标签归档:hadoop

Hadoop : 一个目录下的数据只由一个map处理

有这么个需求:一个目录下的数据只能由一个map来处理。如果多个map处理了同一个目录下的数据会导致数据错乱。

刚开始google了下,以为网上都有现成的InputFormat,找到的答案类似我之前写的 “mapreduce job让一个文件只由一个map来处理“。

或者是把目录写在文件里面,作为输入:

/path/to/directory1
/path/to/directory2
/path/to/directory3

代码里面按行读取:


都不能满足需求,还是自己实现一个 OneMapOneDirectoryInputFormat 吧,也很简单:

这个InputFormat的具体使用方法就不说了。其实与“一个Hadoop程序的优化过程 – 根据文件实际大小实现CombineFileInputFormat”中的MultiFileInputFormat比较类似。

一个Hadoop程序的优化过程 – 根据文件实际大小实现CombineFileInputFormat

某日,接手了同事写的从Hadoop集群拷贝数据到另外一个集群的程序,该程序是运行在Hadoop集群上的job。这个job只有map阶段,读取hdfs目录下数据的数据,然后写入到另外一个集群。

显然,这个程序没有考虑大数据量的情况,如果输入目录下文件很多或数据量很大,就会导致map数很多。而实际上我们需要拷贝的一个数据源就有近6T,job启动起来有1w多个map,一下子整个queue的资源就占满了。虽然通过调整一些参数可以控制map数(也就是并发数),但是无法准确的控制map数,而且换个数据源又得重新配置参数。

第一个改进的版本是,加了Reduce过程,以期望通过设置Reduce数量来控制并发数。这样虽然能精确地控制并发数,但是增加了shuffle过程,实际运行中发现输入数据有倾斜(而partition的key由于业务需要无法更改),导致部分机器网络被打满,从而影响到了集群中的其他应用。即使通过 mapred.reduce.parallel.copies 参数来限制shuffle也是治标不治本。这个平白增加的shuffle过程实际上浪费了很多网络带宽和IO。

最理想的情况当然是只有map阶段,而且能够准确的控制并发数了。

于是,第二个优化版本诞生了。这个job只有map阶段,采用CombineFileInputFormat,它可以将多个小文件打包成一个InputSplit提供给一个Map处理,避免因为大量小文件问题,启动大量map。通过 mapred.max.split.size 参数可以大概地控制并发数。本以为这样就能解决问题了,结果又发现了数据倾斜的问题。这种粗略地分splits的方式,导致有的map处理的数据少,有的map处理的数据多,并不均匀。几个拖后退的map就导致job的实际运行时间长了一倍多。

看来只有让每个map处理的数据量一样多,才能完美的解决这个问题了。

第三个版本也诞生了,这次是重写了CombineFileInputFormat,自己实现getSplits方法。由于输入数据为SequenceFile格式,因此需要一个SequenceFileRecordReaderWrapper类。

实现代码如下:
CustomCombineSequenceFileInputFormat.java

MultiFileInputFormat.java

通过 multifileinputformat.max_split_num 参数就可以较为准确的控制map数量,而且会发现每个map处理的数据量很均匀。至此,问题总算解决了。

hadoop集群DataNode起不来:“DiskChecker$DiskErrorException: Invalid volume failure config value: 1”

最近把线上一个配置在拷贝到线下一台机器后,发现hadoop datanode起不来,总是报这个异常:

原因是:
dfs.datanode.failed.volumes.tolerated 这个参数直接拷贝了线上的配置为1,
其含义是:The number of volumes that are allowed to fail before a datanode stops offering service. By default any volume failure will cause a datanode to shutdown. 即datanode可以忍受的磁盘损坏的个数。

在hadoop集群中,经常会发生磁盘只读或者损坏的情况。datanode在启动时会使用dfs.datanode.data.dir下配置的文件夹(用来存储block),若是有一些不可以用且个数上面配置的值,DataNode就会启动失败。

在线上环境中fs.datanode.data.dir配置为10块盘,所以dfs.datanode.failed.volumes.tolerated设置为1,是允许有一块盘是坏的。而线下的只有一块盘,这volFailuresTolerated和volsConfigured的值都为1,所以会导致代码里面判断失败。

详见hadoop源码的FsDatasetImpl.java的182行:… --> 阅读全文

mapreduce job让一个文件只由一个map来处理

有一批数据用hadoop mapreduce job处理时,业务特点要求一个文件对应一个map来处理,如果两个或多个map处理了同一个文件,可能会有问题。开始想通过设置 dfs.blocksize 或者 mapreduce.input.fileinputformat.split.minsize/maxsize 参数来控制map的个数,后来想到其实不用这么复杂,在自定义的InputFormat里面直接让文件不要进行split就可以了。

这样,输入文件有多少个,job就会启动多少个map了。

hadoop用MultipleInputs/MultiInputFormat实现一个mapreduce job中读取不同格式的文件

hadoop中提供了 MultiOutputFormat 能将结果数据输出到不同的目录,也提供了 FileInputFormat 来一次读取多个目录的数据,但是默认一个job只能使用 job.setInputFormatClass 设置使用一个inputfomat处理一种格式的数据。如果需要实现 在一个job中同时读取来自不同目录的不同格式文件 的功能,就需要自己实现一个 MultiInputFormat 来读取不同格式的文件了(原来已经提供了MultipleInputs)。

例如:有一个mapreduce job需要同时读取两种格式的数据,一种格式是普通的文本文件,用 LineRecordReader 一行一行读取;另外一种文件是伪XML文件,用自定义的AJoinRecordReader读取。

自己实现了一个简单的 MultiInputFormat 如下:

其实原理很简单,就是在 createRecordReader 的时候,通过 ((FileSplit) split).getPath().toString() 获取到当前要处理的文件名,然后根据特征匹配,选取对应的 RecordReader 即可。xml_prefix和text_prefix可以在程序启动时通过 -D 传给Configuration。

比如某次执行打印的值如下:

这里只是通过简单的文件路径和标示符匹配来做,也可以采用更复杂的方法,比如文件名、文件后缀等。

接着在map类中,也同样可以根据不同的文件名特征进行不同的处理:


这种方式太土了,原来hadoop里面已经提供了 MultipleInputs 来实现对一个目录指定一个inputformat和对应的map处理类。