欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

MapReduce入门

程序员文章站 2024-03-08 09:10:22
...

1.MapReduce简介

MapReduce是一种分布式计算框架,用于大规模数据集的并行计算。

2.MapReduce框架

  • MapReduce框架中主要包括两个函数: Map和Reduce,这两个函数也是程序猿需要进行编程的函数。
  • MapReduce采用 “分而治之” 思想,将一个分布式文件系统中的大规模数据集切分成许多独立的切片,然后这些切片可以由多个map、reduce任务进行处理
  • MapReduce设计的一个理念就是“计算向数据靠拢”,而不是“数据向计算靠拢”,原因是, 移动数据需要大量的网络传输开销

3.MapReduce编程模型

MapReduce由两个阶段组成:MapReduce

  • map()函数以key/value键值对作为输入,经过map函数处理后产生另外一系列key/value键值对作为中间输出存储写入到磁盘中。
  • reduce()函数以key及对应的value列表作为输入,经合并相同的key的value值后,产生另外一系列key/value对作为最终输出写入HDFS中
  • 在map()函数输出后到reduce()函数输入前,这期间的过程叫做shuffle阶段,这个过程是MapReduce计算框架主要优化的地方。在下文会详细介绍。
  • 指定三个组件分别是 InputFormat、Partitioner 和 OutputFormat, 它们均需要用户根据自己的应用需求配置
    1.InputFormat指定输入文件格式。将输入数据切分成若干个 split,且将每个 split 中的数据解析成一个个 map() 函数 要求的 key/value 对。
    2.Partitioner确定 map() 函数产生的每个 key/value 对发给哪个 Reduce 任务函数处理。 有几个reduce任务就会有几个Partition
    3.OutputFormat指定输出文件格式,即每个 key/value 对以何种形式保存到输出文件中。

4.MapReduce计算流程

MapReduce入门

输入分片(input split)

在进行map计算之前,MapReduce会根据输入文件计算输入分片(input split),每个输入分片(input split)由一个map任务处理,输入分片(input split)存储的并非数据本身,而是一个分片长度和一个记录数据的位置的数组,输入分片(input split)往往和hdfs的block(块)关系很密切。
举例: 假如我们设定hdfs的DataNode的大小是128mb,如果我们输入有三个文件,大小分别是3mb、130mb和200mb,那么MapReduce会把3mb文件分为一个输入分片(input split),130mb则是两个输入分片(input split)而200mb也是两个输入分片(input split),换句话说我们如果不在map计算前做输入分片调整,例如合并小文件,那么就会有5个map任务将执行,而且每个map执行的数据大小不均,这个也是MapReduce优化计算的一个关键点。

map阶段

map函数处理逻辑由程序猿编写;一般map操作都是本地化操作,也就是在数据存储节点上进行。

combiner阶段

combiner阶段是程序员可以选择的,combiner其实也是一种reduce操作,因此我们看见WordCount类里是用reduce进行加载的。Combiner是一个本地化的reduce操作,它是map运算的后续操作,主要是在map计算出中间文件前做一个简单的合并重复key值的操作,例如下文对文件里的单词频率做统计,map计算时候如果碰到一个单词就会记录为1,但是这篇文章里相同的单词可能会出现n多次,那么map输出文件冗余就会很多,因此在reduce计算前对相同的key做一个合并操作,那么文件会变小,这样就提高了宽带的传输效率,毕竟hadoop计算力宽带资源往往是计算的瓶颈也是最为宝贵的资源,但是combiner操作是有风险的,使用它的原则是combiner的输入不会影响到reduce计算的最终输入,例如:如果计算只是求总数,最大值,最小值可以使用combiner,但是求中位值计算中使用combiner的话,最终的reduce计算结果就会出错。

shuffle阶段(包括Partition, Sort, Spill, Meger, Combiner,Copy, Memery, Disk……)
  1. MemoryBuffer:内存缓冲区,每个map的结果和partition处理的key/value结果都保存在缓存中。缓冲区默认大小为100M,溢写阈值为100M* 0.8=80M
  2. Spill:内存缓冲区达到阈值时,溢写spill线程锁住这80M
    的缓冲区,开始将数据写出到本地磁盘中,然后释放内存
    。每次溢写都生成一个数据文件。溢出的数据到磁盘前会对数据进行key排序sort以及合并combiner。发送相同Reduce的key数量,会拼接到一起,减少partition的索引数量。
  3. Partitioner:决定数据由哪个Reducer处理。可以采用hash法进行分区。
  4. Sort:缓冲区数据按照key进行排序。
  5. Disk:将数据写入磁盘中
reduce阶段

和map函数一样也是由程序猿编写的,最终结果是存储在hdfs上的。

5.HDFS block和MapReduce split之间的联系?

Block:HDFS中最小的数据存储单位,默认是128M;Split:MapReduce中最小的计算单元,默认与Block一一对应。
两者的对应关系是任意的,可有用户控制。

6.例子 WordCount(Python编写)

部分数据截图
MapReduce入门
map函数

import re

data_path = './data/The_man_of_property.txt'

p = re.compile(r'\w+')  # 正则表达式,确保切分的为一个正确的单词

with open(data_path, 'r', encoding='utf-8') as f:   # 打开文件
    for line in f.readlines():
        word_list = line.strip().split(' ')     # 按照空格切分单词,生成一个列表
        for word in word_list:      # 对每一个单词进行处理
            re_word = p.findall(word)   # 利用正则表达式对每一个单词进行处理,去除标点符号等
            if len(re_word)==0:
                continue
            word = re_word[0].lower()   # 将单词小写
            print("%s,%s" % (word, 1))  # 将每个单词以键值对的形式输出

reduce函数

data_path = './data/reduce_test'

cur_word = None
sum = 0

with open(data_path,'r',encoding='utf-8') as f:
    for line in f.readlines():
        word , val = line.strip().split(',')
        if cur_word == None:
            cur_word = word
        if cur_word != word:
            print("%s,%s" % (word, sum))
            sum = 0
            cur_word = word
        sum = sum + int(val)
    print("%s,%s"%(word,sum))	# 将每个单词计数

Python环境运行脚本

HADOOP_CMD="/usr/local/src/hadoop-2.6.1/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.1/share/hadoop/tools/lib/hadoop-streaming-2.6.1.jar"

INPUT_FILE_PATH_1="/data/The_man_of_property.txt"
OUTPUT_PATH="/output/wc"

$HADOOP_CMD fs -rm -r -skipTrash $OUTPUT_PATH

# Step 1.
$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $INPUT_FILE_PATH_1 \
-output $OUTPUT_PATH \
-mapper "python map_t.py" \
-reducer "python reducer.py" \
-file ./map_t.py \
-file ./reducer.py