hadoop 0.20.2 中DistributedCache与Symbolic Link的使用

Hadoop的分布式缓存机制使得一个job的所有map或reduce可以访问同一份文件。在任务提交后,hadoop将由-files和-archive选项指定的文件复制到HDFS上(JobTracker的文件系统)。在任务运行前,TaskTracker从JobTracker文件系统复制文件到本地磁盘作为缓存,这样任务就可以访问这些文件。对于job来说,它并不关心文件是从哪儿来的。在使用DistributedCache时,对于本地化文件的访问,通常使用Symbolic Link来访问,这样更方便。通过 URI hdfs://namenode/test/input/file1#myfile 指定的文件在当前工作目录中被符号链接为myfile。这样job里面可直接通过myfile来访问文件,而不用关心该文件在本地的具体路径。

示例如下:
在这个程序中,我们创建了一个符号链接,即god.txt指向HDFS上的文件/test/file/file.1。这样程序里就可以直接打开god.txt进行文件读取,而不用关心HDFS上的文件/test/file/file.1本地化后的真正路径。

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount202 {

    public static void UseDistributedCacheBySymbolicLink() throws Exception {
        FileReader reader = new FileReader("god.txt");
        BufferedReader br = new BufferedReader(reader);
        String s1 = null;
        while ((s1 = br.readLine()) != null) {
            System.out.println(s1);
        }
        br.close();
        reader.close();
    }

    public static class TokenizerMapper extends
            Mapper<Object, Text, Text, IntWritable> {

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        protected void setup(Context context) throws IOException,
                InterruptedException {
            System.out.println("Now, use the distributed cache and syslink");
            try {
                UseDistributedCacheBySymbolicLink();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }

    public static class IntSumReducer extends
            Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args)
                .getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: wordcount <in> <out>");
            System.exit(2);
        }

        DistributedCache.createSymlink(conf);
        String path = "/test/file/file.1";
        Path filePath = new Path(path);
        String uriWithLink = filePath.toUri().toString() + "#" + "god.txt";
        DistributedCache.addCacheFile(new URI(uriWithLink), conf);

        Job job = new Job(conf, "word count");
        job.setJarByClass(WordCount202.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

程序运行的结果是在jobtracker中可以看到打印后的/test/file/file.1文件的内容。

如果程序中要用到很多小文件,那么使用Symbolic Link将非常方便。


另一种更简单的方式是使用Tool辅助类提交job,使用 -files file1,file2 参数,会从本地文件系统中复制指定的文件到hdfs上,这样mapreduce程序可以在任务工作目录读到这些文件。
程序示例:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Main extends Configured implements Tool {

    public static class Mapper extends
            Mapper<LongWritable, Text, Text, Text> {

        @Override
        public void map(LongWritable key, Text inValue, Context context)
                throws IOException {
               //读 test.conf 文件
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration jobConf = getConf();
        Job job = new Job(jobConf);
        job.setJarByClass(Main.class);
        job.setMapperClass(Mapper.class);
        job.setNumReduceTasks(0);
        .....

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        int res = ToolRunner.run(conf, new Main(), args);
        System.exit(res);
    }
}

提交job:

hadoop jar test.jar Main -files /tmp/test.conf -libjars $libjars 1.jar,2.jar

发表评论

电子邮件地址不会被公开。 必填项已用*标注