hadoop mapreduce和hive中使用SequeceFile+lzo格式数据

一.  hadoop中常见的压缩和解压数据格式

随着处理的数据量越来越大,在HDFS中存储压缩数据,不仅能节省集群的存储空间,而且可以减少磁盘io。对于跨集群数据传输来说,更能节约网络带宽。常见的用于hadoop的压缩文件格式有:

压缩格式 工具 算法 文件扩展名 多文件 可分割性
DEFLATE* DEFLATE .deflate
Gzip gzip DEFLATE .gz
ZIP zip DEFLATE .zip 是,在文件范围内
bzip2 bzip2 bzip2 .bz2
LZO lzop LZO .lzo

gzip和ZIP是比较通用的压缩方式,在空间和时间的处理上比较平衡。

bzip2的压缩比gzip或ZIP更有效,但是速度较慢,会使job的瓶颈转移到CPU上。

lzo的压缩和解压速度比gzip或ZIP更快,但是压缩效率偏低。

总的来说,压缩比:bzip2 > gzip > lzo,速度:lzo > gzip > bzip2

除了采用这几种压缩格式,SequeceFile也是一种比较好的选择。SequeceFile是Hadoop API提供的一种二进制文件支持。这种二进制文件直接将对序列化到文件中。一般对小文件可以使用这种文件合并,即将文件名作为key,文件内容作为value序列化到大文件中。这种文件格式有以下好处:
1. 支持压缩,且可定制为基于Record或Block压缩(Block级压缩性能较优)
2. 可分割
3. 使用方便,因为是Hadoop框架提供的API。
4. 缺点是是需要一个合并文件的过程,且合并后的文件将不方便查看。

对于mapreduce任务来说,更关心的一点是数据的“可分割性(splitable)”。如果数据不能分割,则意味着一个map只能处理一个文件,而这个文件可能会较大,这样就无法利用整个集群的并行处理能力,还会损失job的本地性优势。

使用最快的速度压缩,还是最优的空间压缩,需要根据具体的业务,找到一个平衡点。如果业务更在意压缩和解压时间,对于压缩比不是那么在意,使用lzo是一种比较好的选择。虽然lzo(或gzip)等压缩算法本身是不支持分割的,但是sequence file是可以分块的,所以sequence file格式的文件,再配上lzo(或gzip)的压缩格式,就可实现lzo(或gzip)压缩文件方式的splitable。

由于开源协议的不同,在hadoop 0.20以后使用lzo,需要单独安装包。详见Hadoop-LZO项目主页:https://github.com/toddlipcon/hadoop-lzo

二.  hive中使用lzo压缩格式输出

下面介绍在hive的job中怎么使用lzo数据的压缩输出。

这个例子中是以aid为key,join两个表的数据。数据aid_pid和aid_spuid都是这种以t分割,n结束的格式:

10056889 -1073122434
10059859 -855371037
10067977 -485421398
10086787 -334639064

执行hive命令的shell脚本如下:

#!/bin/sh
export HIVE_HOME=/home/admin/hive
export PATH=$HIVE_HOME/bin:$PATH

hive -e " 
DROP TABLE test_aid_pid; 
CREATE EXTERNAL TABLE test_aid_pid (
     aid BIGINT,
     pid STRING )
ROW FORMAT DELIMITED
FIELDS TERMINATED BY 't'
 stored as textfile
 location '/test/baoniu/aid_pid';
"

hive -e " 
DROP TABLE test_aid_spuid; 
CREATE EXTERNAL TABLE test_aid_spuid (
     aid BIGINT,
     spuid STRING )
ROW FORMAT DELIMITED
FIELDS TERMINATED BY 't'
 stored as textfile
 location '/test/baoniu/aid_spuid';
"

## join
hive -e " 
DROP TABLE test_aid_pid_spuid; 
CREATE TABLE test_aid_pid_spuid (
     aid BIGINT,
     pid STRING,
     spuid STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY 't'
 stored as SEQUENCEFILE
 location '/test/baoniu/aid_pid_spuid';
"

hive -e "
set hive.exec.compress.output=true;
set mapred.output.compress=true;
set mapred.output.compression.type=BLOCK;
set mapred.output.compression.codec=org.apache.hadoop.io.compress.LzoCodec;

INSERT OVERWRITE TABLE test_aid_pid_spuid SELECT a.aid, a.pid, b.spuid
FROM test_aid_pid a 
JOIN test_aid_spuid b ON (a.aid = b.aid ) 
"

代码中stored as SEQUENCEFILE表示以sequence file 格式存储,mapred.output.compress是对输出结果进行压缩,mapred.output.compression.type指定sequence file的压缩类型,有BLOCK,RECORD,NONE三种,在mapred-default.xml中默认定义的是RECORD压缩。建议使用BLOCK方式压缩,压缩比更高。core-default.xml 中设置默认需要压缩的块大小为1M。

<property>
    <name>mapred.output.compression.type</name>
    <value>BLOCK</value>
</property>
<property>
    <name>io.seqfile.compress.blocksize</name>
    <value>1000000</value>
</property>

mapred.output.compression.codec参数指定了采用的压缩方式。这些压缩方式可以在core-site.xml中定义:

<property>
  <name>io.compression.codecs</name>
  <value>org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress
.DefaultCodec,com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo
.LzopCodec,org.apache.hadoop.io.compress.BZip2Codec</value>
</property>

用fs -text查看程序运行后的结果,可以看到启用lzo库解压的信息:

[admin@localhost hive-job]$ hadoop fs -ls /test/baoniu/aid_pid_spuid
Found 1 items
-rw-r--r--   3 admin admin        503 2012-01-09 16:48 /test/baoniu/aid_pid_spuid/000000_0
[admin@localhost hive-job]$ hadoop fs -text /test/baoniu/aid_pid_spuid/*
12/01/09 20:22:26 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library
12/01/09 20:22:26 INFO lzo.LzoCodec: Successfully loaded &amp; initialized native-lzo library [hadoop-lzo rev 2bd0d5b7bdbdc3c466d73cb9fa75e49c69ea1366]
12/01/09 20:22:26 INFO compress.LzoCodec: Bridging org.apache.hadoop.io.compress.LzoCodec to com.hadoop.compression.lzo.LzoCodec.
12/01/09 20:22:26 INFO compress.CodecPool: Got brand-new decompressor
12/01/09 20:22:26 INFO compress.CodecPool: Got brand-new decompressor
12/01/09 20:22:26 INFO compress.CodecPool: Got brand-new decompressor
12/01/09 20:22:26 INFO compress.CodecPool: Got brand-new decompressor
        10086787     -334639064      aaaaa
        10088569     -1241000785     bbbbbbbb
        10091899     -1157344890     c
        10095859     -334639064      d
        100111210     -351609022      hello
        100149787     -1425277667     world
        100156879     -166047558      ggg
        100168957     -33307697       123hhj
        100177867     -217247212      9999hhhh

现在发现一个问题:第一列有分隔符“t”

查看源代码发现,在org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat.java中是把输出空值作为key,以整行为value,所以会在第一列多一列分隔符。如果想去掉这列分隔符,需要自己重新个SequenceFileOutputFormat。我写了个简单的MyHiveSequenceFileOutputFormat,如下:

public class MyHiveSequenceFileOutputFormat extends SequenceFileOutputFormat
    implements HiveOutputFormat<WritableComparable, Writable> {

  @Override
  public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
      Class<? extends Writable> valueClass, boolean isCompressed,
      Properties tableProperties, Progressable progress) throws IOException {

    FileSystem fs = finalOutPath.getFileSystem(jc);
    final SequenceFile.Writer outStream = Utilities.createSequenceWriter(jc,
        fs, finalOutPath, Text.class, valueClass, isCompressed);

    return new RecordWriter() {
      public void write(Writable r) throws IOException {
    	  Text text = (Text)r;
    	  //示例,文中数据隔符为t,只有两列
    	  String[] kv = text.toString().split("t");
    	  outStream.append(new Text(kv[0]), new Text(kv[1]));
      }

      public void close(boolean abort) throws IOException {
        outStream.close();
      }
    };
  }
}

三. hive中的input format和output format

hive中的outputformat和inputformat都是相对于使用者的,inputformat指的是读取这张表的格式;outputformat指的是往这张hive表里面写数据的格式

上面程序中使用了stored as SEQUENCEFILE表示使用sequence file格式存储,默认调用的是hive中的HiveSequenceFileOutputFormat,如果想正常读取这个表的数据,需要使用SequenceFileInputFormat。所以下面的代码与上述代码中对test_aid_pid_spuid表的定义是等价的:

## join
hive -e "
add jar /home/admin/baoniu/hive-job/WordCount.jar;
DROP TABLE test_aid_pid_spuid; 
CREATE TABLE test_aid_pid_spuid (
     aid STRING,
     pid STRING,
     spuid STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY 't'
 stored as INPUTFORMAT 'org.apache.hadoop.mapred.SequenceFileInputFormat'
           outputformat 'org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat'
 location '/test/baoniu/aid_pid_spuid';
"

用hadoop fs -text 可以看到结果,并且一列是分隔符”t”

用select * from test_aid_pid_spuid;可以正常查看数据。

注意这个地方使用的是hadoop 0.19版的旧API:org.apache.hadoop.mapred.SequenceFileInputFormat,不能使用hadoop 0.20.2以后版本中的org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat,否则,查询hive是会报错:

hive> select * from test_aid_pid_spuid;
FAILED: Error in semantic analysis: line 1:14 Input Format must implement InputFormat test_aid_pid_spuid

这是因为hive 0.7中还是使用的hadoop 0.19的API。

如果将代码中的INPUTFORMAT改为 ‘org.apache.hadoop.mapred.TextInputFormat’,那么显然读取hive表会是乱码数据,因为TextInputFormat根本无法解析sequence file。

发现hive中有个HiveNullValueSequenceFileOutputFormat,是否可以解决(二)中提到的第一列为分隔符的情况的?HiveNullValueSequenceFileOutputFormat是将整行的值作为key,vaule为空,并且输出的key为byte格式。输出数据如下:

[admin@localhost hive-job]$ hadoop fs -text /test/baoniu/aid_pid_spuid/*
12/01/11 14:32:01 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library
12/01/11 14:32:01 INFO lzo.LzoCodec: Successfully loaded &amp; initialized native-lzo library [hadoop-lzo rev 2bd0d5b7bdbdc3c466d73cb9fa75e49c69ea1366]
12/01/11 14:32:01 INFO compress.LzoCodec: Bridging org.apache.hadoop.io.compress.LzoCodec to com.hadoop.compression.lzo.LzoCodec.
12/01/11 14:32:01 INFO compress.CodecPool: Got brand-new decompressor
12/01/11 14:32:01 INFO compress.CodecPool: Got brand-new decompressor
12/01/11 14:32:01 INFO compress.CodecPool: Got brand-new decompressor
12/01/11 14:32:01 INFO compress.CodecPool: Got brand-new decompressor
31 30 30 30 30 30 38 36 37 38 37 09 2d 33 33 34 36 33 39 30 36 34 09 61 61 61 61 61     (null)
31 30 30 30 30 30 38 38 35 36 39 09 2d 31 32 34 31 30 30 30 37 38 35 09 62 62 62 62 62 62 62 62 (null)
31 30 30 30 30 30 39 31 38 39 39 09 2d 31 31 35 37 33 34 34 38 39 30 09 63      (null)
31 30 30 30 30 30 39 35 38 35 39 09 2d 33 33 34 36 33 39 30 36 34 09 64 (null)

四.  hive中读取sequence file + lzo数据

下面这个测试程序就是读取前面生成的lzo压缩数据到另一个hive表中。

#!/bin/bash

export HIVE_HOME=/home/admin/hive
export PATH=$HIVE_HOME/bin:$PATH

hive -e " 
DROP TABLE test_lzo; 
CREATE EXTERNAL TABLE test_lzo (
        aid STRING,
        pid STRING,
        spuid STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY 't'
stored as sequencefile
location '/test/baoniu/aid_pid_spuid';

DROP TABLE test_lzo_2; 
create table test_lzo_2 (
        aid STRING,
        pid STRING,
        spuid STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY 't'
location '/test/baoniu/test_lzo_2';

insert overwrite table test_lzo_2
select * from test_lzo ;
"

hive -e "
 select * from test_lzo;
 select * from test_lzo_2;
"

需要注意的是,这里读取的数据/test/baoniu/aid_pid_spuid,第一列是有分隔符t。如果第一列没有t分隔符,那么就无法正常读取到hive表中,需要自己重写SequenceFileInputFormat。可见:用hive生成的sequence file数据默认第一列是分隔符,这种数据可以被hive正常读取用来建表处理等。如果对数据生成格式有要求,且该数据不再需要用hive处理,那么可以用MyHiveSequenceFileOutputFormat去掉第一列分隔符。

五. mapreduce中使用SequeceFile + lzo数据

使用SequenceFileInputFormat就可以正常读取SequeceFile + lzo格式数据,因为sequence file文件的头信息会记录压缩类名,因此总能找到适合的类来解码。

job.setInputFormatClass(SequenceFileInputFormat.class);

SequenceFileInputFormat会将读取到的行,以分隔符split,第一列的值为key,后面所有列的值为value。由于我们的数据中第一列是分隔符t, 这样在Mapper中得到得key为空,value为整行的内容。

更多的关于SequeceFile文件的读写可以参考:http://www.coder4.com/archives/2035

发表评论

电子邮件地址不会被公开。 必填项已用*标注