python用于mapreduce处理的mrjob模块

BUG之神 264

Hadoop为Java以外的其他语言,提供了一个友好的实现MapReduce的框架,即HadoopStreaming

 

Hadoop Streaming只遵循从标准输入(stdin)读取数据,使用标准输出写出数据,它还提供了丰富的参数控制来实现许多 MapReduce的复杂操作。

 

但是如果要实现MapReduce的复杂操作,使用Hadoop Streaming来实现就比较复杂。而MRJob模块可以很好的解决这个问题。

 

MRJob模块就是对Hadoop Streaming进行封装的Python模块,它是一个编写MapReduce任务的开源Python框架,因此可以不使用Hadoop Streaming命令操作MapReduce作业,更轻松、快速地编写MapReduce任务。

 

MRJob模块就是对Hadoop Streaming进行封装的Python模块,它具有以下特点:

代码简洁,map及reduce函数通过一个Python文件就可以搞定;

支持多步骤的MapReduce任务工作流;

支持多种运行方式,包括内嵌方式、本地环境、Hadoop、远程亚马逊;

支持亚马逊网络数据分析服务ElasticMapReduce(EMR);

调试方便,无须任何支持环境。

 

在每个节点都安装MRjob模块:

pip3 config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple/

pip3 install mrjob

创建MRJobWordCount.py,编写代码如下:

#!/usr/local/bin/python3
#coding=utf-8
from mrjob.job import MRJob
class WordCount(MRJob):#继承MRJob类
    #1.先运行mapper()
    #数据源将从mapper()的key和value进入,因为数据源没有key,所以只对value进行处理
    def mapper(self,key,value):
        #str(value)将value转换为字符串
        words=str(value).split(" ")#将字符串分割成数组
        for word in words:
            yield word,1 #使用yield方法将获取到的每个元素和标记写入到临时的hdfs中
    #2.再运行reducer(),在运行此函数之前,大数据集群会进行清洗操作
    # 将相同的key去重,每个key对应的值做成一个数据集,然后传入到reducer函数中
    #因此reducr()的key,values是一个key对应一组值,然后对相同key的值进行汇总sum(values)
    #最后使用yield方法将每个key以及这个key汇总的值写入到hdfs中
    def reducer(self,key,values):
        yield key,sum(values)

if __name__=="__main__":
    # run()函数会分别调用被重写的mapper()和reducer()函数
    WordCount.run()

创建数据文件data.txt内容:

python用于mapreduce处理的mrjob模块

hadoop map task start
map apache
org hadoop input output

将此两个文件上传到slave3 /root/app/mrjob目录中:

python用于mapreduce处理的mrjob模块

(一)首先使用内嵌的方式运行试试:

[root@slave3 mrjob]# python3 MRJobWordCount.py  -r inline data.txt>output

[root@slave3 mrjob]# ls

data.txt  MRJobWordCount.py  output

[root@slave3 mrjob]# cat output

"apache" 1

"hadoop" 2

"input" 1

"map" 2

"org" 1

"output" 1

"start" 1

"task" 1

 

注意也可以将-r inline去掉,因为默认的就是内嵌方式

[root@slave3 mrjob]# python3 MRJobWordCount.py data.txt>output

[root@slave3 mrjob]# cat output

"apache" 1

"hadoop" 2

"input" 1

"map" 2

"org" 1

"output" 1

"start" 1

"task" 1

 

 

  • 使用本地模式运行:就是说在本地启用多线程来模拟Hadoop

[root@slave3 mrjob]# python3 ./MRJobWordCount.py -r local data.txt>output

[root@slave3 mrjob]# cat output

"apache" 1

"hadoop" 2

"input" 1

"map" 2

"org" 1

"output" 1

"start" 1

"task" 1

 

本地模式还可以直接这样运行:

[root@slave3 mrjob]# python3 ./MRJobWordCount.py data.txt

No configs found; falling back on auto-configuration

No configs specified for inline runner

Creating temp directory /tmp/MRJobWordCount.root.20230430.223650.119117

Running step 1 of 1...

job output is in /tmp/MRJobWordCount.root.20230430.223650.119117/output

Streaming final output from /tmp/MRJobWordCount.root.20230430.223650.119117/output...

"apache" 1

"hadoop" 2

"input" 1

"map" 2

"org" 1

"output" 1

"start" 1

"task" 1

Removing temp directory /tmp/MRJobWordCount.root.20230430.223650.119117...

 

(三)使用集群方式运行

将data.txt文件上传到hdfs /input目录下:

[root@slave3 ~]# cd /root/app/mrjob

[root@slave3 mrjob]# hadoop fs -mkdir /input

[root@slave3 mrjob]# hadoop fs -put data.txt /input

[root@slave3 mrjob]# hadoop fs -ls /input

Found 1 items

-rw-r--r--   3 root supergroup         58 2023-05-01 06:05 /input/data.txt

运行词频统计文件:

[root@slave3 mrjob]#python3 ./MRJobWordCount.py -r hadoop -D stream.non.zero.exit.is.failure=false  hdfs:///input/data.txt -o hdfs:///output2

参数说明:

-r hadoop 表示运行模式是在集群中运行

hdfs:///input/data.txt 输入的数据源路径

-o 表示输出路径

也可以采用以下命令运行:mrjob会自动将文件上传到hadoop集群

python3 count.py -r hadoop text.txt>1.txt

但是这样运行会报错:(应该是shell语法混乱导致的)

报错:

Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 2

returned non-zero exit status 256.

查看userlogs日志会提示:

/bin/sh: run_prestart: line 1: syntax error: unexpected end of file

/bin/sh:行1,意料之外的文件结尾

 

解决办法:

https://benjamincongdon.me/blog/2018/02/02/MapReduce-on-Python-is-better-with-MRJob-and-EMR/

在当前用户目录/root创建一个隐藏文件.mrjob.conf 添加如下代码:

runners:
  hadoop:
    setup:
      - 'set -e'
sh_bin: '/bin/bash -x'

[root@slave3 mrjob]# vim ~/.mrjob.conf

python用于mapreduce处理的mrjob模块

然后再运行:

[root@slave3 mrjob]# python3 count.py -r hadoop data.txt>1.txt

python用于mapreduce处理的mrjob模块

[root@slave3 mrjob]# ll

总用量 12

-rw-r--r-- 1 root root  78 5月   2 01:18 1.txt

-rwxrwxrwx 1 root root 368 5月   2 00:54 count.py

-rw-r--r-- 1 root root  58 5月   2 00:06 data.txt

[root@slave3 mrjob]# cat 1.txt

"apache" 1

"hadoop" 2

"input" 1

"map" 2

"org" 1

"output" 1

"start" 1

"task" 1

 

分享