mrjob详解教程

BUG之神 68

简介

mrjob是用来写能在hadoop运行的python程序的最简便方法。其最突出的特点就是在mrjob的帮助下,无需安装hadoop或部署任何集群,我们可以在本地机器上运行代码(进行测试)。同时,mrjob可以轻松运行于Amazon Elastic MapReduce。
为了达到简便实用的目的,一些功能被mrjob省去了。如果追求更多的功能,可以尝试Dumbo,Pydoop等package。


一.MRJob简介

mrjob详解教程

下面这个单词计数程序较之之前的更加完善,它统计了字符的个数(包含空格),单词的个数,以及文本的行数

mrjob详解教程

我们来实现一下:

#!/usr/local/bin/python3
#coding=utf-8
from mrjob.job import MRJob
class Count(MRJob):
    def mapper(self,_,line):
        yield "chars",len(line)
        yield "words",len(line.split())
        yield "line",1
    def reducer(self,key,values):
        yield key,sum(values)
if __name__=='__main__':
    Count.run()

data.txt数据:

hadoop map task start

map apache

org hadoop input output

执行:

[root@slave3 mrjob]# python3 count.py -r hadoop data.txt>1.txt

[root@slave3 mrjob]# cat 1.txt

"chars" 54

"line" 3

"words" 10

 

总结:

MRJob的使用一般涉及下方几个步骤

1写一个类继承MRJob

2重写mapper和reducer方法

3在main中调用MRJob.run()方法,开启整个流程

 

需注意:MRJob向集群提交的时候还是通过hadoop streming,即使用python写脚本,通过hadoop streming作业


二 自己实现一个单词计数程序

from mrjob.job import MRJob
class Count(MRJob):
    def mapper(self,key,line):
        for word in line.split():
            yield word,1
    def reducer(self,word,count):
        yield word,sum(count)
if __name__=='__main__':
    Count.run()

运行,三种运行方式详解:

mrjob详解教程

使用hadoop集群方式运行:

[root@slave3 mrjob]# python3 count.py -r hadoop  data.txt>1.txt

[root@slave3 mrjob]# cat 1.txt

"apache" 1

"hadoop" 2

"input" 1

"map" 2

"org" 1

"output" 1

"start" 1

"task" 1

总结:

mrjob详解教程


三MRJob案例:topN统计

统计数据中出现次数最多的前n个数据

比如:单词出现次数最多的前两个

 

分析:当作业比较简单(单词计数),只需要1个map,1个reduce。但是当作业复杂的时候,就需要多个map,多个reduce,即上次reduce的结果会作为后续map的输入。

 

这个时候如果使用hadoop streaming实现就很繁琐,因为一个mapreduce作业需要两个脚本,多个作业脚本的数量就会成倍的增加。

 

在mrjob中可以通过一个类来实现多个作业,它通过定义一个MRStep来完成多步作业,通过step()方法指定先走哪一步,再走哪一步。

 

完成topN案例,我们这里统计top2,即出现次数最多的前两个单词以及次数

代码:

from mrjob.job import MRJob
from mrjob.job import  MRStep
import heapq
class TopNWords(MRJob):
    #map过程
    def mapper(self,_,line):
        if line.strip()!="":
            for word in line.strip().split():
                yield word,1

    #combiner介于mapper和reducer之间,用于临时将mapper输出的数据进行统计
    #这里就完成了对单词个数的统计
    def combiner(self,word,counts):
        yield word,sum(counts)

    #reducer_sum:将key设置为None,将word和count交换位置
    #这里将单词及出现的次数调换位置后以元组的形式输出:(hadoop,2) 换为 (2,hadoop)
    #方便后续对次数进行排序
    def reducer_sum(self,word,counts):
        #key为:None value为:(sum(counts),word)
        yield None,(sum(counts),word)

    #reducer
    #利用heapq对数据进行排序,并将最大的两个数取出
    #相同的键聚合到同一个reducer处理,因为所有键都是None,所以所有数据都聚合到reducer中
    #最后取出元组(单词个数,单词)中最多的前两个,再次颠倒顺序输出(单词,单词个数)
    def top_n_reducer(self,_,word_cnts):
        #取出word_cnts中最大的两个,之所以要reducer_sum要交换位置,是因为nlargest是按照key进行排序的
        #排序用单词的计数来进行排序,不能用单词本身进行排序
        #取出最大的两个之后,返回cnt,word并调换顺序输出
        for cnt,word in heapq.nlargest(2,word_cnts):
            yield word,cnt

    #重写step(),用于指定自定义的mapper,combiner,reducer方法
    #即指定执行的步骤
    def steps(self):
        return[
            #第一个mapreduce
            MRStep(
                mapper=self.mapper,
                combiner=self.combiner,
                reducer=self.reducer_sum
            ),
            #第二个mapreduce:只有reduce没有map 即只有聚合的部分
            MRStep(
                reducer=self.top_n_reducer
            )
        ]
if __name__=="__main__":
    TopNWords.run()

执行:

[root@slave3 mrjob]# python3 count.py -r hadoop data.txt>1.txt

[root@slave3 mrjob]# cat 1.txt

"map" 2

"hadoop" 2

其实这里可以不需要combiner,即可以将代码改为:

from mrjob.job import MRJob
from mrjob.job import  MRStep
import heapq
class TopNWords(MRJob):
    #map过程
    def mapper(self,_,line):
        if line.strip()!="":
            for word in line.strip().split():
                yield word,1
    #reducer_sum:将key设置为None,将word和count交换位置
    #这里将单词及出现的次数调换位置后以元组的形式输出:(hadoop,2) 换为 (2,hadoop)
    #方便后续对次数进行排序
    def reducer_sum(self,word,counts):
        #key为:None value为:(sum(counts),word)
        yield None,(sum(counts),word)

    #reducer
    #利用heapq对数据进行排序,并将最大的两个数取出
    #相同的键聚合到同一个reducer处理,因为所有键都是None,所以所有数据都聚合到reducer中
    #最后取出元组(单词个数,单词)中最多的前两个,再次颠倒顺序输出(单词,单词个数)
    def top_n_reducer(self,_,word_cnts):
        #取出word_cnts中最大的两个,之所以要reducer_sum要交换位置,是因为nlargest是按照key进行排序的
        #排序用单词的计数来进行排序,不能用单词本身进行排序
        #取出最大的两个之后,返回cnt,word并调换顺序输出
        for cnt,word in heapq.nlargest(2,word_cnts):
            yield word,cnt

    #重写step(),用于指定自定义的mapper,combiner,reducer方法
    #即指定执行的步骤
    def steps(self):
        return[
            #第一个mapreduce
            MRStep(
                mapper=self.mapper,
                reducer=self.reducer_sum
            ),
            #第二个mapreduce:只有reduce没有map 即只有聚合的部分
            MRStep(
                reducer=self.top_n_reducer
            )
        ]
if __name__=="__main__":
    TopNWords.run()

执行效果是一样的:

[root@slave3 mrjob]# python3 count.py -r hadoop data.txt>1.txt

[root@slave3 mrjob]# cat 1.txt

"map" 2

"hadoop" 2

 

还可以使用下面这个官方示例输出最频的单词:

参考:https://www.jianshu.com/p/21c880ee93a9

from mrjob.job import MRJob
from mrjob.step import MRStep
import re

WORD_RE = re.compile(r"[\w']+")


class MRMostUsedWord(MRJob):

    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_words,
                   combiner=self.combiner_count_words,
                   reducer=self.reducer_count_words),
            MRStep(reducer=self.reducer_find_max_word)
        ]

    def mapper_get_words(self, _, line):
        # yield each word in the line
        for word in WORD_RE.findall(line):
            yield (word.lower(), 1)

    def combiner_count_words(self, word, counts):
        # optimization: count the words we have seen so far
        yield (word, sum(counts))

    def reducer_count_words(self, word, counts):
        # send all (num_occupences, word) pairs to same reducer
        # use sum(num_occupences) to get the total num of occupences of each word
        yield None, (sum(counts), word)

    def reducer_find_max_word(self, _, word_count_pairs):
        # none key in this function because in reducer_count_words we discard the key
        # each item of word_count_pairs is (count, word), yield one result: the value(word) of max count
        yield max(word_count_pairs)


# never forget
if __name__ == '__main__':
    MRMostUsedWord.run()

运行:

[root@slave3 mrjob]# python3 count.py data.txt>1.txt

[root@slave3 mrjob]# ls

1.txt  count.py  data.txt

[root@slave3 mrjob]# cat 1.txt

2 "map"

 


四MRJob案例-文件join

在数据库中想合并两张表,可以根据uid来合并

在这里,我们尝试使用mrjob来合并两个文件

这个比较难不讲了

mrjob详解教程 mrjob详解教程 mrjob详解教程 mrjob详解教程 mrjob详解教程 mrjob详解教程

分享