程序从hadoop 0.19.2升级到0.20.2后用户自定义PathFilter的问题

最近hadop程序从0.19.2升级到0.20.2后,发现某个程序运行不起来,一直提示输入hdfs路径不存在,可是输入路径确实又是存在的。本来以为hadoop 0.20.2的API会向后兼容,以前的程序不改也应该能运行,事实上对于某些API的处理如PathFilter还是有些差别。

代码中是这样定义了一个commonFilePathFilter:

public static class commonFilePathFilter implements PathFilter {

        public boolean accept(Path path) {

            System.out.println("path.toString() = " + path.toString());
            System.out.println("path.getName() = " + path.getName());

            String[] filters = accept_file_name.split("|");
            boolean result = false;
            for(int i = 0;i<filters.length;i++){
                if((path.getName().indexOf(filters[i]) == -1)){
                    result = false;
                }
                else{
                    result = true;
                    break;
                }
            }
            return result;
        }
    }

以上代码在hadoop 0.19.2中运行输出是:

path.toString() = hdfs://hdp001.sqa:9000/app/xml_inc/20110119113001/common-part-00003
path.getName() = common-part-00003

但是在hadoop 0.20.2中运行会报错:

path.toString() = /app/xml_inc/20110119113001
path.getName() = 20110119113001
Exception in thread "main" org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://hdp001.sqa:9000/test/input
        at org.apache.hadoop.mapreduce.lib.input.FileInputFormat
.listStatus(FileInputFormat.java:224)
        at org.apache.hadoop.mapreduce.lib.input.FileInputFormat
.getSplits(FileInputFormat.java:241)
        at org.apache.hadoop.mapred.JobClient.writeNewSplits(JobClient.java:885)
        at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:779)
        at org.apache.hadoop.mapreduce.Job.submit(Job.java:432)
        at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:447)
        at WordCount202.main(WordCount202.java:120)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
        at sun.reflect.DelegatingMethodAccessorImpl.
invoke(DelegatingMethodAccessorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:597)
        at org.apache.hadoop.util.RunJar.main(RunJar.java:156)

在同事房明的帮助下,阅读hadoop源代码了解到,在hadoop 0.20.2中会首先判断当前文件夹。对于我这个程序来说,如果文件夹的名字不包含common|mall|bizorder字符串,运行就会有问题了。如下是FileInputFormat.java源代码的一部分:

/** List input directories.
   * Subclasses may override to, e.g., select only files matching a regular
   * expression.
   *
   * @param job the job to list input paths for
   * @return array of FileStatus objects
   * @throws IOException if zero items.
   */
  protected List<FileStatus> listStatus(JobContext job
                                        ) throws IOException {
    List<FileStatus> result = new ArrayList<FileStatus>();
    Path[] dirs = getInputPaths(job);
    if (dirs.length == 0) {
      throw new IOException("No input paths specified in job");
    }

    List<IOException> errors = new ArrayList<IOException>();

    // creates a MultiPathFilter with the hiddenFileFilter and the
    // user provided one (if any).
    List<PathFilter> filters = new ArrayList<PathFilter>();
    filters.add(hiddenFileFilter);
    PathFilter jobFilter = getInputPathFilter(job);
    if (jobFilter != null) {
      filters.add(jobFilter);
    }
    PathFilter inputFilter = new MultiPathFilter(filters);

    for (int i=0; i < dirs.length; ++i) {
      Path p = dirs[i];
      FileSystem fs = p.getFileSystem(job.getConfiguration());
      FileStatus[] matches = fs.globStatus(p, inputFilter);
      if (matches == null) {
        errors.add(new IOException("Input path does not exist: " + p));
      } else if (matches.length == 0) {
        errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
      } else {
        for (FileStatus globStat: matches) {
          if (globStat.isDir()) {
            for(FileStatus stat: fs.listStatus(globStat.getPath(),
                inputFilter)) {
              result.add(stat);
            }
          } else {
            result.add(globStat);
          }
        }
      }
    }

    if (!errors.isEmpty()) {
      throw new InvalidInputException(errors);
    }
    LOG.info("Total input paths to process : " + result.size());
    return result;
  }

可以看到,在第32行会先检查这个目录,如果符合过滤条件,接着在第39行会根据hiddenFileFilter和用户自定义的PathFilter检查目录下面的所有文件。hiddenFileFilter会过滤掉以_和.开头的文件名。

明白这点后,在我们自己的commonFilePathFilter中,可以先根据FileStatus对象来判断当前路径是否目录,如果是目录就返回false;否则,按照先前的做法判断文件名中是否包含特定的common|mall|bizorder字符串。完整的测试代码如下:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount202 {

    static String accept_file_name="common|mall|bizorder";

    public static class TokenizerMapper extends
            Mapper<Object, Text, Text, IntWritable> {

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        @Override
        public void setup(Context context) throws IOException, InterruptedException {

             FileSplit fileSplit = (FileSplit)context.getInputSplit();
             String input_fname = fileSplit.getPath().toString();
             System.out.println("setup in map");
             System.out.println("setup in map input_file:" + input_fname);
        }

        @Override
        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }

    public static class IntSumReducer extends
            Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();

        @Override
        public void reduce(Text key, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static class commonFilePathFilter implements PathFilter {

        static FileSystem fs;
        static Configuration conf;

        static void setConf(Configuration _conf){
            conf = _conf;
        }

        public boolean accept(Path path) {
            //System.out.println("path.toString() = " + path.toString());
            //System.out.println("path.getName() = " + path.getName());

            if (accept_file_name.equals("all")||accept_file_name.equals("") ){
                return true;
            }

            FileStatus fstatus = null;
            try {
                fs = FileSystem.get(conf);
                fstatus = fs.getFileStatus(path);
            } catch (IOException e) {
                e.printStackTrace();
            }

            if(!fstatus.isDir()){

                String[] filters = accept_file_name.split("|");
                boolean result = false;
                for(int i = 0;i<filters.length;i++){
                    if((path.getName().indexOf(filters[i]) == -1)){
                        result = false;
                    }
                    else{
                        result = true;
                        break;
                    }
                }
                return result;
            }

            return true;
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: wordcount <in> <out>");
            System.exit(2);
        }

        Job job = new Job(conf);
        job.setJarByClass(WordCount202.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setNumReduceTasks(6);

        commonFilePathFilter.setConf(job.getConfiguration());
        FileInputFormat.setInputPathFilter(job, commonFilePathFilter.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

看来以后遇到问题了还得多读源代码啊。

发表评论

电子邮件地址不会被公开。 必填项已用*标注