Hadoop为Java以外的其他语言,提供了一个友好的实现MapReduce的框架,即HadoopStreaming
Hadoop Streaming只遵循从标准输入(stdin)读取数据,使用标准输出写出数据,它还提供了丰富的参数控制来实现许多 MapReduce的复杂操作。
但是如果要实现MapReduce的复杂操作,使用Hadoop Streaming来实现就比较复杂。而MRJob模块可以很好的解决这个问题。
MRJob模块就是对Hadoop Streaming进行封装的Python模块,它是一个编写MapReduce任务的开源Python框架,因此可以不使用Hadoop Streaming命令操作MapReduce作业,更轻松、快速地编写MapReduce任务。
MRJob模块就是对Hadoop Streaming进行封装的Python模块,它具有以下特点:
代码简洁,map及reduce函数通过一个Python文件就可以搞定;
支持多步骤的MapReduce任务工作流;
支持多种运行方式,包括内嵌方式、本地环境、Hadoop、远程亚马逊;
支持亚马逊网络数据分析服务ElasticMapReduce(EMR);
调试方便,无须任何支持环境。
在每个节点都安装MRjob模块:
pip3 config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple/
pip3 install mrjob
创建MRJobWordCount.py,编写代码如下:
#!/usr/local/bin/python3 #coding=utf-8 from mrjob.job import MRJob class WordCount(MRJob):#继承MRJob类 #1.先运行mapper() #数据源将从mapper()的key和value进入,因为数据源没有key,所以只对value进行处理 def mapper(self,key,value): #str(value)将value转换为字符串 words=str(value).split(" ")#将字符串分割成数组 for word in words: yield word,1 #使用yield方法将获取到的每个元素和标记写入到临时的hdfs中 #2.再运行reducer(),在运行此函数之前,大数据集群会进行清洗操作 # 将相同的key去重,每个key对应的值做成一个数据集,然后传入到reducer函数中 #因此reducr()的key,values是一个key对应一组值,然后对相同key的值进行汇总sum(values) #最后使用yield方法将每个key以及这个key汇总的值写入到hdfs中 def reducer(self,key,values): yield key,sum(values) if __name__=="__main__": # run()函数会分别调用被重写的mapper()和reducer()函数 WordCount.run()
创建数据文件data.txt内容:
hadoop map task start
map apache
org hadoop input output
将此两个文件上传到slave3 /root/app/mrjob目录中:
(一)首先使用内嵌的方式运行试试:
[root@slave3 mrjob]# python3 MRJobWordCount.py -r inline data.txt>output
[root@slave3 mrjob]# ls
data.txt MRJobWordCount.py output
[root@slave3 mrjob]# cat output
"apache" 1
"hadoop" 2
"input" 1
"map" 2
"org" 1
"output" 1
"start" 1
"task" 1
注意也可以将-r inline去掉,因为默认的就是内嵌方式
[root@slave3 mrjob]# python3 MRJobWordCount.py data.txt>output
[root@slave3 mrjob]# cat output
"apache" 1
"hadoop" 2
"input" 1
"map" 2
"org" 1
"output" 1
"start" 1
"task" 1
- 使用本地模式运行:就是说在本地启用多线程来模拟Hadoop
[root@slave3 mrjob]# python3 ./MRJobWordCount.py -r local data.txt>output
[root@slave3 mrjob]# cat output
"apache" 1
"hadoop" 2
"input" 1
"map" 2
"org" 1
"output" 1
"start" 1
"task" 1
本地模式还可以直接这样运行:
[root@slave3 mrjob]# python3 ./MRJobWordCount.py data.txt
No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/MRJobWordCount.root.20230430.223650.119117
Running step 1 of 1...
job output is in /tmp/MRJobWordCount.root.20230430.223650.119117/output
Streaming final output from /tmp/MRJobWordCount.root.20230430.223650.119117/output...
"apache" 1
"hadoop" 2
"input" 1
"map" 2
"org" 1
"output" 1
"start" 1
"task" 1
Removing temp directory /tmp/MRJobWordCount.root.20230430.223650.119117...
(三)使用集群方式运行
将data.txt文件上传到hdfs /input目录下:
[root@slave3 ~]# cd /root/app/mrjob
[root@slave3 mrjob]# hadoop fs -mkdir /input
[root@slave3 mrjob]# hadoop fs -put data.txt /input
[root@slave3 mrjob]# hadoop fs -ls /input
Found 1 items
-rw-r--r-- 3 root supergroup 58 2023-05-01 06:05 /input/data.txt
运行词频统计文件:
[root@slave3 mrjob]#python3 ./MRJobWordCount.py -r hadoop -D stream.non.zero.exit.is.failure=false hdfs:///input/data.txt -o hdfs:///output2
参数说明:
-r hadoop 表示运行模式是在集群中运行
hdfs:///input/data.txt 输入的数据源路径
-o 表示输出路径
也可以采用以下命令运行:mrjob会自动将文件上传到hadoop集群
python3 count.py -r hadoop text.txt>1.txt
但是这样运行会报错:(应该是shell语法混乱导致的)
报错:
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 2
returned non-zero exit status 256.
查看userlogs日志会提示:
/bin/sh: run_prestart: line 1: syntax error: unexpected end of file
/bin/sh:行1,意料之外的文件结尾
解决办法:
https://benjamincongdon.me/blog/2018/02/02/MapReduce-on-Python-is-better-with-MRJob-and-EMR/
在当前用户目录/root创建一个隐藏文件.mrjob.conf 添加如下代码:
runners: hadoop: setup: - 'set -e' sh_bin: '/bin/bash -x'
[root@slave3 mrjob]# vim ~/.mrjob.conf
然后再运行:
[root@slave3 mrjob]# python3 count.py -r hadoop data.txt>1.txt
[root@slave3 mrjob]# ll
总用量 12
-rw-r--r-- 1 root root 78 5月 2 01:18 1.txt
-rwxrwxrwx 1 root root 368 5月 2 00:54 count.py
-rw-r--r-- 1 root root 58 5月 2 00:06 data.txt
[root@slave3 mrjob]# cat 1.txt
"apache" 1
"hadoop" 2
"input" 1
"map" 2
"org" 1
"output" 1
"start" 1
"task" 1
本文作者为BUG之神,转载请注明。