python,from mrjob.job import MRJob,,class MRWordCount(MRJob):,, def mapper(self, _, line):, for word in line.split():, yield (word, 1),, def reducer(self, key, values):, yield (key, sum(values)),,if __name__ == '__main__':, MRWordCount.run(),
`,,这个代码定义了一个MapReduce作业,mapper
函数将输入行拆分成单词,并为每个单词生成一个键值对(单词, 1)。reducer
函数则将所有相同单词的值相加,得到每个单词的总出现次数。MapReduce是一种编程模型,用于处理和生成大规模数据集,它由Google提出,并在Hadoop等分布式计算框架中得到了广泛应用,MapReduce的核心思想是将任务分成两个阶段:Map阶段和Reduce阶段,在Map阶段,输入数据被分割成小块并分配给多个Mapper进行处理;在Reduce阶段,Mapper的输出结果被汇总并传递给Reducer进行最终处理。
本文将通过一个具体的MapReduce统计样例代码来展示如何使用MapReduce进行数据处理,假设我们有一个文本文件,其中每一行包含一个单词,我们希望统计每个单词出现的次数。
MapReduce统计样例代码
1.1 Mapper类
Mapper类负责读取输入数据并将其转换为键值对形式,在这个例子中,我们将每行文本中的单词作为键,值为1。
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] words = line.split("\\s+"); for (String w : words) { word.set(w); context.write(word, one); } } }
1.2 Reducer类
Reducer类负责接收Mapper的输出,并对相同键的值进行汇总,在这个例子中,我们将相同单词的出现次数累加。
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } context.write(key, new IntWritable(sum)); } }
1.3 Driver类
Driver类负责配置作业并启动MapReduce任务,在这个例子中,我们将设置输入路径、输出路径以及Mapper和Reducer类。
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCountDriver { public static void main(String[] args) throws Exception { if (args.length != 2) { System.err.println("Usage: WordCount <input path> <output path>"); System.exit(-1); } Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "Word Count"); job.setJarByClass(WordCountDriver.class); job.setMapperClass(WordCountMapper.class); job.setCombinerClass(WordCountReducer.class); // Optional: use combiner to reduce network traffic job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
运行MapReduce任务
要运行这个MapReduce任务,我们需要将代码编译打包成一个JAR文件,然后使用Hadoop命令提交作业,假设我们已经将上述代码保存为WordCountDriver.java
,并且已经编译打包成wordcount.jar
。
hadoop jar wordcount.jar WordCountDriver /input/path /output/path
结果分析
运行完成后,我们可以查看输出目录中的文件,通常是一个名为part-r-00000
的文件,里面包含了每个单词及其出现的次数。
apple 2 banana 3 orange 1
相关问答FAQs
Q1: MapReduce中的Mapper和Reducer分别是什么?
A1: Mapper是MapReduce任务的第一个阶段,负责读取输入数据并将其转换为键值对形式,Reducer是MapReduce任务的第二个阶段,负责接收Mapper的输出,并对相同键的值进行汇总或处理。
Q2: 为什么需要Combiner?
A2: Combiner是一个可选的组件,它在Mapper和Reducer之间运行,用于减少网络传输的数据量,Combiner可以对Mapper的输出进行局部汇总,从而减少发送到Reducer的数据量,提高整体性能。
小伙伴们,上文介绍了“mapreduce mapper的key_MapReduce统计样例代码”的内容,你了解清楚吗?希望对你有所帮助,任何问题可以给我留言,让我们下期再见吧。