利用python进行MapReduce编程

BUG之神 381

Python利用mapreduce进行编程的方式很多,书中介绍的是在centos桌面版中安装pycarm,这种方式比较浪费内存资源。我们这里采用:在windows中编写好代码,上传到大数据集群中去运行。这种方式比较适合生产环境。

要在集群中运行pyhton代码,需要在每个节点都安装python环境

以master集群为例:

安装gcc:

[root@master ~]# yum -y install gcc

安装openssl:这是执行make源码编译所需的环境

 

[root@master ~]# yum list | grep openssl
[root@master ~]# yum -y install openssl.x86_64 openssl-devel.x86_64

安装python运行环境:

首先使用xftp工具将python包上传到master节点下的/root/app

利用python进行MapReduce编程

安装python:

[root@master app]# cd /root/app
[root@master app]# tar -zxvf Python-3.9.6.tgz
[root@master app]# ls
hadoop-2.7.7  jdk1.8.0_202  Python-3.9.6  Python-3.9.6.tgz
[root@master app]# rm -rf Python-3.9.6.tgz
[root@master app]# ls
hadoop-2.7.7  jdk1.8.0_202  Python-3.9.6
[root@master app]# cd Python-3.9.6/
[root@master Python-3.9.6]# ls
[root@master Python-3.9.6]# ./configure  #此命令用于解析python安装包 执行成功后会在当前目录中生产makefile文件

 

进入Modules目录,对Setup文件进行设置:

[root@master Python-3.9.6]# cd Modules/

需要在Setup中设置openssl的地址:

[root@master Modules]# openssl version -a

利用python进行MapReduce编程

其中openssl的地址为:/etc/pki/tls

设置ssl路径并取消注释:

[root@master Modules]# vim Setup
SSL=/etc/pki/tls

_ssl _ssl.c \

        -DUSE_SSL -I$(SSL)/include -I$(SSL)/include/openssl \

        -L$(SSL)/lib -lssl -lcrypto

利用python进行MapReduce编程

返回上一级命令,执行make命令编译源代码:

 

[root@master Modules]# cd ..
[root@master Python-3.9.6]# ls
[root@master Python-3.9.6]# make

使用make install安装编译好的源代码:

[root@master Python-3.9.6]# make install

使用whereis 验证python3是否安装成功,出现与python3相关的信息即安装成功:

[root@master Python-3.9.6]# whereis python3
python3: /usr/local/bin/python3.9 /usr/local/bin/python3.9-config /usr/local/bin/python3 /usr/local/lib/python3.9

 

配置python3环境变量,因为hadoop streaming工具提交mapreduce任务时,需要python3编译器的支持。

大数据集群默认去/usr/bin目录查找python编译器

[root@master Python-3.9.6]# cd /usr/bin
[root@master bin]# ls

 

利用python进行MapReduce编程

但是此目录中只有python2,因此在运行作业的时候可能出现语法的错误

需给python3执行命令添加软链接到此目录中,为/usr/local/bin/python3建立软链接到/usr/bin目录下并重命名为python3,命令如下:

[root@master bin]# ln -s /usr/local/bin/python3 /usr/bin/python3

此时pyton3就链接到了此目录中:

[root@master bin]# ls python*
python  python2  python2.7  python3

 

再将python3的pip工具链接到/usr/bin目录中

[root@master bin]# ln -s /usr/local/bin/pip3 /usr/bin/pip3
[root@master bin]# ls pip*
pip3

 

到此环境配置完成,请按照相同的方式完成其他三个slave节点python环境的安装


在master主机上上传以下文件到app中

利用python进行MapReduce编程

说明:为了避免程序运行出错,请直接在centos上创建并编写代码,否则可能出现文件格式导致的程序执行出错的问题:

进入/root/app目录:

cd /root/app

创建mapper.py reduce.py test.txt run.sh

1创建txt 添加内容:

[root@master ~]# vim test.txt
apple orange
apple banana
ppp nike sk

2创建mapper.py:

[root@master ~]# vim mapper.py
#!/usr/bin/python3 
#上方是设置解析器,已设置软链接,可省略
#coding:utf-8
#上方是设置编码为utf-8中文编码集
#如果不设置,本文件中的中文注释会导致mapreduce执行的过程中,中文注释不能解析的错误
import sys
# 使用sys.stdin(标准输入)获取数据集(从hdfs中一段一段的读取数据)
for line in sys.stdin:
    # 按空格分隔,返回分隔后的数据集
    words = line.split()
    # 循环数据集获取到每个词,为每个词加上标记用于Reduce统计计算
    for word in words:
        # 标准输出,写入到Reduce的输入中
        print("%s\t%s"%(word, 1))

3创建reduce.py:

[root@master ~]# vim reduce.py

 

#!/usr/bin/python3
#coding:utf-8
import sys
result = {}
# 定义字典,用于装key/value对数据
for line in sys.stdin:
    # 使用制表符分隔字符串,用于找出key/values
    q = line.split("\t")
    key = q[0]
    values = int(q[1])    
    
    # 判断result字典中是否有数据,如果有则加1,如果没有将新数据插入到字典
    if key in result:
        result[key] += 1
    else:
        result[key] = 1

# 将统计结果使用标准输出写入到HDFS
for k, v in result.items():
    print("%s\t%s"%(k, v))

4创建run.sh

[root@master ~]# vim run.sh

hadoop jar /root/app/hadoop-2.7.7/share/hadoop/tools/lib/hadoop-streaming-2.7.7.jar -D stream.non.zero.exit.is.failure=false  -files ./mapper.py,./reduce.py -mapper "python3 ./mapper.py" -reducer "python3 ./reduce.py" -input /input/test.txt -output /output

 

5 添加可执行权限:

[root@master app]# chmod -R 777 mapper.py
[root@master app]# chmod -R 777 reduce.py
[root@master app]# chmod -R 777 test.txt
[root@master app]# chmod -R 777 run.sh

 

6 运行mapper.py程序

[root@master app]# more test.txt | python3 ./mapper.py

apple 1

orange 1

apple 1

banana 1

ppp 1

nike 1

sk 1

 

排序运行

[root@master app]# more test.txt | python3 ./mapper.py | sort

apple 1

apple 1

banana 1

nike 1

orange 1

ppp 1

sk 1

 

 

 

[root@master app]# more test.txt | python3 ./mapper.py | sort -k1,1

apple 1

apple 1

banana 1

nike 1

orange 1

ppp 1

sk 1

Linux sort命令参考:

https://www.cnblogs.com/liujiaxin2018/p/16490021.html

https://www.11meigui.com/2021/linux-sort.html

https://www.jianshu.com/p/c4d159a98dd8

7 mapper和reduce同时运行

[root@master app]# more test.txt | python3 ./mapper.py | ./reduce.py
apple 2
orange 1
banana 1
ppp 1
nike 1
sk 1

 

修改test.txt为如下:

利用python进行MapReduce编程

再次执行:

 

[root@master app]# more test.txt | python3 ./mapper.py | python3 ./reduce.py

apple 2

orange 1

banana 1

ppp 1

nike 1

sk 4

执行后升序排序:

[root@master app]# more test.txt | python3 ./mapper.py | python3 ./reduce.py | sort -k2

banana 1

nike 1

orange 1

ppp 1

apple 2

sk 4

 

执行后降序排序:

[root@master app]# more test.txt | python3 ./mapper.py | python3 ./reduce.py | sort -r -k2

sk 4

apple 2

ppp 1

orange 1

nike 1

banana 1

[root@master app]#

 

开启hadoop集群,在集群根目录创建input目录,并将test.txt文件上传到input目录,然后运行:

[root@master app]# start-all.sh

[root@master app]# hadoop fs -mkdir /input

[root@master app]# hadoop fs -put test.txt /input

[root@master app]# ./run.sh

[root@master app]# hadoop fs -ls /output

Found 2 items

-rw-r--r--   3 root supergroup          0 2023-02-25 21:40 /output/_SUCCESS

-rw-r--r--   3 root supergroup         44 2023-02-25 21:40 /output/part-00000

[root@master app]# hadoop fs -cat /output/part-00000

apple 2

banana 1

nike 1

orange 1

ppp 1

sk 4


hadoop运行测试命令常见错误解决:

1 错误Name node is in safe mode的解决方法

将本地文件拷贝到hdfs上去,结果上错误:Name node is in safe mode

这是因为在分布式文件系统启动的时候,开始的时候会有安全模式,当分布式文件系统处于安全模式的情况下,文件系统中的内容不允许修改也不允许删除,直到安全模式结束。安全模式主要是为了系统启动的时候检查各个DataNode上数据块的有效性,同时根据策略必要的复制或者删除部分数据块。运行期通过命令也可以进入安全模式。在实践过程中,系统启动的时候去修改和删除文件也会有安全模式不允许修改的出错提示,只需要等待一会儿即可。

可以通过以下命令来手动离开安全模式:

hadoop dfsadmin -safemode leave

用户可以通过dfsadmin -safemode value 来操作安全模式,参数value的说明如下:
enter - 进入安全模式
leave - 强制NameNode离开安全模式
get - 返回安全模式是否开启的信息
wait - 等待,一直到安全模式结束。

2 Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads()

1、streaming默认的情况下,mapper和reducer的返回值不是0,被认为异常任务,将被再次执行,默认尝试4次都不是0,整个job都将失败

java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed wi
th code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.j
ava:311)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java
:545)
at org.apache.hadoop.streaming.PipeReducer.reduce(PipeReducer.java:130)
at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:519
)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:420)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInforma
tion.java:1093)
at org.apache.hadoop.mapred.Child.main(Child.java:249)

解决办法:

执行hadoop命令时,加上参数:-D stream.non.zero.exit.is.failure=false

hadoop jar hadoop-streaming*.jar -D stream.non.zero.exit.is.failure=false

2、在执行streaming任务时,出现:Environment variable CLASSPATH not set!
解决方法:

在执行streaming时,加上选项:

-cmdenv CLASSPATH=$CLASSPATH

3 在调用HDFS的C接口时,出现:Call to JNI_CreateJavaVM failed with error: -1

原因:貌似是因为在编译的时候加上了libjvm,然后动态链接库那也加了。

解决方法,编译的时候去掉libjvmm的链接就可以了。


 

分享