在编写PySpark脚本时,有些列运算在spark当中并没有符合需求的方法,需要使用UDF。通过调用jar中的UDF,可以减少python与JVM的交互;用牺牲UDF部分的开发时间,尽量提高性能。本文将简述如何在PySpark脚本中调用Scala编写的UDF(以一个简单的函数为例)。
关于scala语法、spark&scala的内容可以参考:
本文依赖的开发环境为:Scala==2.11.7 / Spark & PySpark==2.1.2 / JDK 1.8
1. 安装配置Scala
虽然IDEA里可以下载安装Scala,但是可能因为服务器在国外所以速度比较慢。建议先直接手动安装。
下载解压Scala 2.11.7 for MacOS (链接:https://downloads.lightbend.com/scala/2.11.7/scala-2.11.7.tgz ) , 重命名为scala后移动到系统目录。
1 | mv scala-2.11.7 scala # 重命名 Scala 目录 |
sudo vim ~/.bash_profile
修改系统环境变量,在文件最后添加:
1 | export PATH="$PATH:/usr/local/share/scala/bin" |
保存退出,重启终端后输入scala,如果安装正常应该输出如下信息:
2. 将Scala object打包为可执行的jar文件(使用IntelliJ IDEA + Maven)
Maven是一种管理和构建Java项目的工具,由于Scala源于Java并可以运行在Java虚拟机之上,所以Scala的class文件自然也可以使用Maven进行管理和构建。Maven可以在终端中使用(brew install maven),本文主要介绍更为简便的使用IDE+Maven的方法。
2.1. 安装IDEA的Scala插件
打开IntelliJ IDEA,首先需要安装Scala的plugin。在启动窗口的configurations或者项目窗口左上角的IDEA-Preferences里找到Plugin,搜索scala然后安装如图插件。
2.2. 新建Maven项目
新建一个项目,项目类型选择Maven。下一步-输入项目名称-Finish。
项目结构如图所示。
2.3. 配置Scala Library
任务栏菜单File-Project Structure,对话框中选择Global Libraries,点击加号-Scala SDK,然后Browse选择scala文件夹。
(小tips:MacOS找不到系统隐藏文件/目录可以command+shift+G直接输入路径。)
2.4. 修改Maven项目相关设置
在项目名称处右键选择“Add Framework Support”,在弹出的对话框中选择Scala,确认版本无误后点确定。
分别右键src文件夹中main和test目录下的“java”文件夹,右键refactor-rename,勾选两个复选框,分别修改名字为scala
找到project目录下的pom.xml文件,接下来需要进行一些修改:
打开网页https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.11/2.1.2 , 找到下图文本框中的部分,全选复制。
在pom.xml中的<version>
属性下面新建一个<dependencies></dependencies>
属性,把复制的代码粘贴进去,也可以直接复制下面代码块粘贴到version后面。
1 | <dependencies> |
最终的pom文件应该长这样:
如果org.apache.spark有问题的话可以参考文章1和文章2想办法自己导入一下相关的jar包。简单来说就是:
-
下载https://archive.apache.org/dist/spark/spark-2.1.2/spark-2.1.2-bin-hadoop2.7.tgz 并解压
-
在IDEA的菜单File-Project Structure-Modules里点击右下角的加号-JARs or directory,导入刚才解压的文件夹/jar/目录下面的所有jar包,应该就不会报错了。
2.5. 新建Scala Object
我们以一个小写字母转大写的函数为例。在目录下的src-main-scala右键,新建名为com.spark.utilities的package。在新建的package右键New-Scala class,类型选择object,命名为MyUpper。
代码如下:
1 | package com.spark.utilities |
2.6. 编译.jar文件
在File-Program Structure-Artifacts点击加号-JAR-From modules with dependencies。在弹出的对话框直接OK。
此时右边应该会出现这样的一大堆东西。
我们把左边这一列上面的内容都删了,只留最下面的xxxxx output,最后应该是下面这样
点ok或者apply保存设置,回到主界面,Build-Build Artifacts-xxx:jar-Build,然后应该就开始构建jar文件了
最后在目录下面应该出现了out-artifact-xxx.jar文件,说明操作正确。
3. 在PySpark脚本中调用udf
示例代码(注意修改jar文件的路径):
1 | from pyspark.sql import SparkSession, SQLContext |
输出正确即证明环境配置、代码调用都准确无误,之后就可以用scala实现一些udf啦。