hadoop实例WordCount程序一步一步运行

虽说现在用Eclipse下开发hadoop程序很方便了,但是命令行方式对于小程序开发验证很方便。这是初学hadoop时的笔记,记录下来以备查。

1. 经典的WordCound程序(WordCount.java),可参见 hadoop0.18文档
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WordCount extends Configured implements Tool {

public static class MapClass extends MapReduceBase implements Mapper {

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter)
throws IOException {
String line = value.toString();
StringTokenizer itr = new StringTokenizer(line);
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
output.collect(word, one);
}
}
}

/**
* A reducer class that just emits the sum of the input values.
*/
public static class Reduce extends MapReduceBase implements Reducer {

public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter)
throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}

static int printUsage() {
System.out.println(“wordcount [-m ] [-r ] “);
ToolRunner.printGenericCommandUsage(System.out);
return -1;
}

/**
* The main driver for word count map/reduce program. Invoke this method to submit the map/reduce job.
*
* @throws IOException
* When there is communication problems with the job tracker.
*/
public int run(String[] args) throws Exception {
JobConf conf = new JobConf(getConf(), WordCount.class);
conf.setJobName(“wordcount”);

// the keys are words (strings)
conf.setOutputKeyClass(Text.class);
// the values are counts (ints)
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(MapClass.

class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
List

other_args = new ArrayList();
for (int i = 0; i < args.length; ++i) { try { if ("-m".equals(args[i])) { conf.setNumMapTasks(Integer.parseInt(args[++i])); } else if ("-r".equals(args[i])) { conf.setNumReduceTasks(Integer.parseInt(args[++i])); } else { other_args.add(args[i]); } } catch (NumberFormatException except) { System.out.println("ERROR: Integer expected instead of " + args[i]); return printUsage(); } catch (ArrayIndexOutOfBoundsException except) { System.out.println("ERROR: Required parameter missing from " + args[i - 1]); return printUsage(); } } // Make sure there are exactly 2 parameters left. if (other_args.size() != 2) { System.out.println("ERROR: Wrong number of parameters: " + other_args.size() + " instead of 2."); return printUsage(); } FileInputFormat.setInputPaths(conf, other_args.get(0)); FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1))); JobClient.runJob(conf); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new WordCount(), args); System.exit(res); } }[/code] 2. 保证hadoop集群是配置好了的,单机的也好。新建一个目录,比如 /home/admin/WordCount 编译WordCount.java程序。 [code lang="bash"]javac -classpath /home/admin/hadoop/hadoop-0.19.1-core.jar WordCount.java -d /home/admin/WordCount[/code] 3. 编译完后在/home/admin/WordCount目录会发现三个class文件 WordCount.class,WordCount$Map.class,WordCount$Reduce.class。 cd 进入 /home/admin/WordCount目录,然后执行: [code lang="bash"]jar cvf WordCount.jar *.class[/code] 就会生成 WordCount.jar 文件。 4. 构造一些输入数据 input1.txt和input2.txt的文件里面是一些单词。如下: [code lang="bash"][admin@host WordCount]$ cat input1.txt Hello, i love china are you ok? [admin@host WordCount]$ cat input2.txt hello, i love word You are ok[/code] 在hadoop上新建目录,和put程序运行所需要的输入文件: [code lang="bash"]hadoop fs -mkdir /tmp/input hadoop fs -mkdir /tmp/output hadoop fs -put input1.txt /tmp/input/ hadoop fs -put input2.txt /tmp/input/[/code] 5. 运行程序,会显示job运行时的一些信息。 [code lang="bash"][admin@host WordCount]$ hadoop jar WordCount.jar WordCount /tmp/input /tmp/output 10/09/1622:49:43 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 10/09/1622:49:43 INFO mapred.FileInputFormat: Total input paths to process :2 10/09/1622:49:43 INFO mapred.JobClient: Running job: job_201008171228_76165 10/09/1622:49:44 INFO mapred.JobClient: map 0% reduce 0% 10/09/1622:49:47 INFO mapred.JobClient: map 100% reduce 0% 10/09/1622:49:54 INFO mapred.JobClient: map 100% reduce 100% 10/09/1622:49:55 INFO mapred.JobClient: Job complete: job_201008171228_76165 10/09/1622:49:55 INFO mapred.JobClient: Counters: 16 10/09/1622:49:55 INFO mapred.JobClient: File Systems 10/09/1622:49:55 INFO mapred.JobClient: HDFS bytes read=62 10/09/1622:49:55 INFO mapred.JobClient: HDFS bytes written=73 10/09/1622:49:55 INFO mapred.JobClient: Local bytes read=152 10/09/1622:49:55 INFO mapred.JobClient: Local bytes written=366 10/09/1622:49:55 INFO mapred.JobClient: Job Counters 10/09/1622:49:55 INFO mapred.JobClient: Launched reduce tasks=1 10/09/1622:49:55 INFO mapred.JobClient: Rack-local map tasks=2 10/09/1622:49:55 INFO mapred.JobClient: Launched map tasks=2 10/09/1622:49:55 INFO mapred.JobClient: Map-Reduce Framework 10/09/1622:49:55 INFO mapred.JobClient: Reduce input groups=11 10/09/1622:49:55 INFO mapred.JobClient: Combine output records=14 10/09/1622:49:55 INFO mapred.JobClient: Map input records=4 10/09/1622:49:55 INFO mapred.JobClient: Reduce output records=11 10/09/1622:49:55 INFO mapred.JobClient: Map output bytes=118 10/09/1622:49:55 INFO mapred.JobClient: Map input bytes=62 10/09/1622:49:55 INFO mapred.JobClient: Combine input records=14 10/09/1622:49:55 INFO mapred.JobClient: Map output records=14 10/09/1622:49:55 INFO mapred.JobClient: Reduce input records=14[/code] 6. 查看运行结果 [code lang="bash"][admin@host WordCount]$ hadoop fs -ls /tmp/output/ Found 2 items drwxr-x---- admin admin 02010-09-1622:43/tmp/output/_logs -rw-r-----1 admin admin 1022010-09-1622:44/tmp/output/part-00000 [admin@host WordCount]$ hadoop fs -cat /tmp/output/part-00000 Hello, 1 You 1 are 2 china 1 hello, 1 i 2 love 2 ok 1 ok?1 word 1 you 1[/code]

发表评论

电子邮件地址不会被公开。 必填项已用*标注