简介
mrjob是用来写能在hadoop运行的python程序的最简便方法。其最突出的特点就是在mrjob的帮助下,无需安装hadoop或部署任何集群,我们可以在本地机器上运行代码(进行测试)。同时,mrjob可以轻松运行于Amazon Elastic MapReduce。
为了达到简便实用的目的,一些功能被mrjob省去了。如果追求更多的功能,可以尝试Dumbo,Pydoop等package。
一.MRJob简介
下面这个单词计数程序较之之前的更加完善,它统计了字符的个数(包含空格),单词的个数,以及文本的行数
我们来实现一下:
#!/usr/local/bin/python3 #coding=utf-8 from mrjob.job import MRJob class Count(MRJob): def mapper(self,_,line): yield "chars",len(line) yield "words",len(line.split()) yield "line",1 def reducer(self,key,values): yield key,sum(values) if __name__=='__main__': Count.run()
data.txt数据:
hadoop map task start
map apache
org hadoop input output
执行:
[root@slave3 mrjob]# python3 count.py -r hadoop data.txt>1.txt
[root@slave3 mrjob]# cat 1.txt
"chars" 54
"line" 3
"words" 10
总结:
MRJob的使用一般涉及下方几个步骤
1写一个类继承MRJob
2重写mapper和reducer方法
3在main中调用MRJob.run()方法,开启整个流程
需注意:MRJob向集群提交的时候还是通过hadoop streming,即使用python写脚本,通过hadoop streming作业
二 自己实现一个单词计数程序
from mrjob.job import MRJob class Count(MRJob): def mapper(self,key,line): for word in line.split(): yield word,1 def reducer(self,word,count): yield word,sum(count) if __name__=='__main__': Count.run()
运行,三种运行方式详解:
使用hadoop集群方式运行:
[root@slave3 mrjob]# python3 count.py -r hadoop data.txt>1.txt
[root@slave3 mrjob]# cat 1.txt
"apache" 1
"hadoop" 2
"input" 1
"map" 2
"org" 1
"output" 1
"start" 1
"task" 1
总结:
三MRJob案例:topN统计
统计数据中出现次数最多的前n个数据
比如:单词出现次数最多的前两个
分析:当作业比较简单(单词计数),只需要1个map,1个reduce。但是当作业复杂的时候,就需要多个map,多个reduce,即上次reduce的结果会作为后续map的输入。
这个时候如果使用hadoop streaming实现就很繁琐,因为一个mapreduce作业需要两个脚本,多个作业脚本的数量就会成倍的增加。
在mrjob中可以通过一个类来实现多个作业,它通过定义一个MRStep来完成多步作业,通过step()方法指定先走哪一步,再走哪一步。
完成topN案例,我们这里统计top2,即出现次数最多的前两个单词以及次数
代码:
from mrjob.job import MRJob from mrjob.job import MRStep import heapq class TopNWords(MRJob): #map过程 def mapper(self,_,line): if line.strip()!="": for word in line.strip().split(): yield word,1 #combiner介于mapper和reducer之间,用于临时将mapper输出的数据进行统计 #这里就完成了对单词个数的统计 def combiner(self,word,counts): yield word,sum(counts) #reducer_sum:将key设置为None,将word和count交换位置 #这里将单词及出现的次数调换位置后以元组的形式输出:(hadoop,2) 换为 (2,hadoop) #方便后续对次数进行排序 def reducer_sum(self,word,counts): #key为:None value为:(sum(counts),word) yield None,(sum(counts),word) #reducer #利用heapq对数据进行排序,并将最大的两个数取出 #相同的键聚合到同一个reducer处理,因为所有键都是None,所以所有数据都聚合到reducer中 #最后取出元组(单词个数,单词)中最多的前两个,再次颠倒顺序输出(单词,单词个数) def top_n_reducer(self,_,word_cnts): #取出word_cnts中最大的两个,之所以要reducer_sum要交换位置,是因为nlargest是按照key进行排序的 #排序用单词的计数来进行排序,不能用单词本身进行排序 #取出最大的两个之后,返回cnt,word并调换顺序输出 for cnt,word in heapq.nlargest(2,word_cnts): yield word,cnt #重写step(),用于指定自定义的mapper,combiner,reducer方法 #即指定执行的步骤 def steps(self): return[ #第一个mapreduce MRStep( mapper=self.mapper, combiner=self.combiner, reducer=self.reducer_sum ), #第二个mapreduce:只有reduce没有map 即只有聚合的部分 MRStep( reducer=self.top_n_reducer ) ] if __name__=="__main__": TopNWords.run()
执行:
[root@slave3 mrjob]# python3 count.py -r hadoop data.txt>1.txt
[root@slave3 mrjob]# cat 1.txt
"map" 2
"hadoop" 2
其实这里可以不需要combiner,即可以将代码改为:
from mrjob.job import MRJob from mrjob.job import MRStep import heapq class TopNWords(MRJob): #map过程 def mapper(self,_,line): if line.strip()!="": for word in line.strip().split(): yield word,1 #reducer_sum:将key设置为None,将word和count交换位置 #这里将单词及出现的次数调换位置后以元组的形式输出:(hadoop,2) 换为 (2,hadoop) #方便后续对次数进行排序 def reducer_sum(self,word,counts): #key为:None value为:(sum(counts),word) yield None,(sum(counts),word) #reducer #利用heapq对数据进行排序,并将最大的两个数取出 #相同的键聚合到同一个reducer处理,因为所有键都是None,所以所有数据都聚合到reducer中 #最后取出元组(单词个数,单词)中最多的前两个,再次颠倒顺序输出(单词,单词个数) def top_n_reducer(self,_,word_cnts): #取出word_cnts中最大的两个,之所以要reducer_sum要交换位置,是因为nlargest是按照key进行排序的 #排序用单词的计数来进行排序,不能用单词本身进行排序 #取出最大的两个之后,返回cnt,word并调换顺序输出 for cnt,word in heapq.nlargest(2,word_cnts): yield word,cnt #重写step(),用于指定自定义的mapper,combiner,reducer方法 #即指定执行的步骤 def steps(self): return[ #第一个mapreduce MRStep( mapper=self.mapper, reducer=self.reducer_sum ), #第二个mapreduce:只有reduce没有map 即只有聚合的部分 MRStep( reducer=self.top_n_reducer ) ] if __name__=="__main__": TopNWords.run()
执行效果是一样的:
[root@slave3 mrjob]# python3 count.py -r hadoop data.txt>1.txt
[root@slave3 mrjob]# cat 1.txt
"map" 2
"hadoop" 2
还可以使用下面这个官方示例输出最频的单词:
参考:https://www.jianshu.com/p/21c880ee93a9
from mrjob.job import MRJob from mrjob.step import MRStep import re WORD_RE = re.compile(r"[\w']+") class MRMostUsedWord(MRJob): def steps(self): return [ MRStep(mapper=self.mapper_get_words, combiner=self.combiner_count_words, reducer=self.reducer_count_words), MRStep(reducer=self.reducer_find_max_word) ] def mapper_get_words(self, _, line): # yield each word in the line for word in WORD_RE.findall(line): yield (word.lower(), 1) def combiner_count_words(self, word, counts): # optimization: count the words we have seen so far yield (word, sum(counts)) def reducer_count_words(self, word, counts): # send all (num_occupences, word) pairs to same reducer # use sum(num_occupences) to get the total num of occupences of each word yield None, (sum(counts), word) def reducer_find_max_word(self, _, word_count_pairs): # none key in this function because in reducer_count_words we discard the key # each item of word_count_pairs is (count, word), yield one result: the value(word) of max count yield max(word_count_pairs) # never forget if __name__ == '__main__': MRMostUsedWord.run()
运行:
[root@slave3 mrjob]# python3 count.py data.txt>1.txt
[root@slave3 mrjob]# ls
1.txt count.py data.txt
[root@slave3 mrjob]# cat 1.txt
2 "map"
四MRJob案例-文件join
在数据库中想合并两张表,可以根据uid来合并
在这里,我们尝试使用mrjob来合并两个文件
这个比较难不讲了
本文作者为BUG之神,转载请注明。