Hadoop MapReduce实现单词计数(Word Count)

hadoop mapreduce实现单词计数(word count)

 

1.map与reduce过程

1.1 map过程

首先,hadoop会把输入数据划分成等长的输入分片(input split)或分片发送到mapreduce。hadoop为每个分片创建一个map任务,由它来运行用户自定义的map函数以分析每个分片中的记录。在我们的单词计数例子中,输入是多个文件,一般一个文件对应一个分片,如果文件太大则会划分为多个分片。map函数的输入以<key, value>形式做为输入,value为文件的每一行,key为该行在文件中的偏移量(一般我们会忽视)。这里map函数起到的作用为将每一行进行分词为多个word,并在context中写入<word, 1>以代表该单词出现一次。

map过程的示意图如下:

mapper代码编写如下:

public static class tokenizermapper
      extends mapper<object, text, text, intwritable> {
  private final static intwritable one = new intwritable(1);
  private text word = new text();
  public void map(object key, text value, context context) throws ioexception, interruptedexception {
      //每次处理一行,一个mapper里的value为一行,key为该行在文件中的偏移量
      stringtokenizer iter = new stringtokenizer(value.tostring());
      while (iter.hasmoretokens()) {
          word.set(iter.nexttoken());
          // 向context中写入<word, 1>
          context.write(word, one);
          system.out.println(word);
      }
  }
}

如果我们能够并行处理分片(不一定是完全并行),且分片是小块的数据,那么处理过程将会有一个好的负载平衡。但是如果分片太小,那么管理分片与map任务创建将会耗费太多时间。对于大多数作业,理想分片大小为一个hdfs块的大小,默认是64mb。

map任务的执行节点和输入数据的存储节点相同时,hadoop的性能能达到最佳,这就是计算机系统中所谓的data locality optimization(数据局部性优化)。而最佳分片大小与块大小相同的原因就在于,它能够保证一个分片存储在单个节点上,再大就不能了。

1.2 reduce过程

接下来我们看reducer的编写。reduce任务的多少并不是由输入大小来决定,而是需要人工单独指定的(默认为1个)。和上面map不同的是,reduce任务不再具有本地读取的优势————一个reduce任务的输入往往来自于所有mapper的输出,因此map和reduce之间的数据流被称为shuffle(洗牌)。hadoop会先按照key-value对进行排序,然后将排序好的map的输出通过网络传输到reduce任务运行的节点,并在那里进行合并,然后传递到用户定义的reduce函数中。

reduce 函数示意图如下:

reducer代码编写如下:

 public static class intsumreducer
          extends reducer<text, intwritable, text, intwritable>{
      private intwritable result = new intwritable();
      public void reduce(text key, iterable<intwritable> values, context context) throws ioexception, interruptedexception{
          int sum = 0;
          for (intwritable val : values) {
              sum += val.get();
          }
          result.set(sum);
          context.write(key, result);
      }
  }

 

2.完整代码

2.1 项目架构

关于vscode+java+maven+hadoop开发环境搭建,可以参见我的博客《vscode+maven+hadoop开发环境搭建》,此处不再赘述。这里展示我们的项目架构如下:

word-count-hadoop
├─ input
│ ├─ file1
│ ├─ file2
│ └─ file3
├─ output
├─ pom.xml
├─ src
│ └─ main
│ └─ java
│ └─ wordcount.java
└─ target

wordcount.java代码如下:

import java.io.ioexception;
import java.util.stringtokenizer;
import org.apache.hadoop.fs.filesystem;
import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.fs.path;
import org.apache.hadoop.io.intwritable;
import org.apache.hadoop.io.text;
import org.apache.hadoop.mapreduce.job;
import org.apache.hadoop.mapreduce.mapper;
import org.apache.hadoop.mapreduce.reducer;
import org.apache.hadoop.mapreduce.lib.input.fileinputformat;
import org.apache.hadoop.mapreduce.lib.output.fileoutputformat;
public class wordcount{
  public static class tokenizermapper
          extends mapper<object, text, text, intwritable> {
      private final static intwritable one = new intwritable(1);
      private text word = new text();

      public void map(object key, text value, context context) throws ioexception, interruptedexception {
      //每次处理一行,一个mapper里的value为一行,key为该行在文件中的偏移量
          stringtokenizer iter = new stringtokenizer(value.tostring());
          while (iter.hasmoretokens()) {
              word.set(iter.nexttoken());
              // 向context中写入<word, 1>
              context.write(word, one);
          }
      }
  }

  public static class intsumreducer
          extends reducer<text, intwritable, text, intwritable>{
      private intwritable result = new intwritable();
      public void reduce(text key, iterable<intwritable> values, context context) throws ioexception, interruptedexception{
          int sum = 0;
          for (intwritable val : values) {
              sum += val.get();
          }
          result.set(sum);
          context.write(key, result);
      }
  }

  public static void main(string[] args) throws exception{
      configuration conf = new configuration();
      job job = job.getinstance(conf, "word_count");

      job.setjarbyclass(wordcount.class);

      job.setmapperclass(tokenizermapper.class);
      //此处的combine操作意为即第每个mapper工作完了先局部reduce一下,最后再全局reduce
      job.setcombinerclass(intsumreducer.class);
      job.setreducerclass(intsumreducer.class);

      job.setoutputkeyclass(text.class);
      job.setoutputvalueclass(intwritable.class);

      //第0个参数是输入目录,第1个参数是输出目录
      //先判断output path是否存在,如果存在则删除
      path path = new path(args[1]);// 
      filesystem filesystem = path.getfilesystem(conf);
      if (filesystem.exists(path)) {
          filesystem.delete(path, true);
      }

      //设置输入目录和输出目录
      fileinputformat.addinputpath(job, new path(args[0]));
      fileoutputformat.setoutputpath(job, new path(args[1]));
      system.exit(job.waitforcompletion(true)?0:1);
  }
}

pom.xml中记得配置hadoop的依赖环境:

    ...
<!-- 集中定义版本号 -->
<properties>
  <project.build.sourceencoding>utf-8</project.build.sourceencoding>
  <maven.compiler.source>17</maven.compiler.source>
  <maven.compiler.target>17</maven.compiler.target>
  <hadoop.version>3.3.1</hadoop.version>
</properties>
<dependencies>
  <dependency>
    <groupid>junit</groupid>
    <artifactid>junit</artifactid>
    <version>4.11</version>
    <scope>test</scope>
  </dependency>
  <!-- 导入hadoop依赖环境 -->
  <dependency>
      <groupid>org.apache.hadoop</groupid>
      <artifactid>hadoop-common</artifactid>
      <version>${hadoop.version}</version>
  </dependency>
  <dependency>
      <groupid>org.apache.hadoop</groupid>
      <artifactid>hadoop-hdfs</artifactid>
      <version>${hadoop.version}</version>
  </dependency>
  <dependency>
      <groupid>org.apache.hadoop</groupid>
      <artifactid>hadoop-mapreduce-client-core</artifactid>
      <version>${hadoop.version}</version>
  </dependency>
  <dependency>
      <groupid>org.apache.hadoop</groupid>
      <artifactid>hadoop-client</artifactid>
      <version>${hadoop.version}</version>
  </dependency>
  <dependency>
      <groupid>org.apache.hadoop</groupid>
      <artifactid>hadoop-yarn-api</artifactid>
      <version>${hadoop.version}</version>
  </dependency>
</dependencies>
...
</project>

此外,因为我们的程序自带输入参数,我们还需要在vscode的launch.json中配置输入参数intput(代表输入目录)和output(代表输出目录):

...
"args": [
  "input",
  "output"
],
...

编译运行完毕后,可以查看output文件夹下的part-r-00000文件:

david 1
goodbye 1
hello 3
tom 1
world 2

可见我们的程序正确地完成了单词计数的功能。

以上就是hadoop mapreduce实现单词计数(word count)的详细内容,更多关于hadoop mapreduce的资料请关注硕编程其它相关文章!

下一节:jdbc用idea连接sqlserver数据库的超实用教程

java编程技术

相关文章