您的当前位置:首页正文

Hadoop学习(三) Map/Reduce编程

来源:花图问答

WordCount是一个简单的应用,它读入文本文件,然后统计出字符出现的频率。输入是文本文件,输出也是文本文件,它的每一行包含了一个字符和它出现的频率,用一个制表符隔开。这是《Hadoop Map/Reduce教程》中的一个入门的Map/Reduce编程例子,可以说是Map/Reduce版的Hello,World.

先随便找一个英文的文本文件,重新命名为a01.dat,通过Upload files to DFS,将a01.dat文件上传到DFS中。

在新建项目向导中,新建一个Map/Reduce项目。一个Map/Reduce项目,包含三个主要文件,一个是Map文件,一个是Reduce文件,还有一个是主文件。源代码如下:

Map.java

        import java.io.IOException;
        import java.util.*;
        import org.apache.hadoop.io.*;
        import org.apache.hadoop.mapreduce.Mapper;

        public class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
            private final static IntWritable one = new IntWritable(1);
            private Text word = new Text();
            public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                String line = value.toString();
                StringTokenizer tokenizer = new StringTokenizer(line);
                while (tokenizer.hasMoreTokens()) {
                    word.set(tokenizer.nextToken());
                    context.write(word, one);
                }
            }
         } 

Reduce.java

        import java.io.IOException;
        import org.apache.hadoop.io.*;
        import org.apache.hadoop.mapreduce.*;

        public class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
            public void reduce(Text key, Iterable<IntWritable> values, Context context) 
              throws IOException, InterruptedException {
                int sum = 0;
                for (IntWritable val : values) {
                    sum += val.get();
                }
                context.write(key, new IntWritable(sum));
            }
         }

WordCount.java

        import org.apache.hadoop.fs.Path;
        import org.apache.hadoop.conf.*;
        import org.apache.hadoop.io.*;
        import org.apache.hadoop.mapreduce.*;
        import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
        import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
        import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
        import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
                
        public class WordCount {
                
          public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            Job job = new Job(conf, "wordcount");
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            job.setMapperClass(Map.class);
            job.setReducerClass(Reduce.class);
            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);
            FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/a01.dat"));
            FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/output"));
            job.waitForCompletion(true);
            }
        }

选择Run As - Run on Hadoop

该程序将文本文件的输入,通过Map函数,转换成一组 key,value 有序对。然后根据key,合并成 key,value1,value2....,然后再通过Reducer函数,做累加操作,计算出每个单词的出现次数,生成新的 key,sum 有序对后输出。

手头上有个邮件列表,包含了几万个邮件地址,于是修改了一下map函数,统计各个邮箱的使用情况。修改后的map为:

        public void map(LongWritable key, Text value, Context context) 
            throws IOException, InterruptedException {
            String[] sarray=value.toString().split("@");
            word.set(sarray[1]);
            context.write(word, one);
        }

运行后得到以下结果:

          17230
           573
           35928
            1372
           223
            385
           143
           2228
           11021
           437
           562
            22185
            9671
           540
           222
            4106
           2676
           129
            589
            355
            285
            14607
           315
            10770
            252
            828