在编写PySpark脚本时,有些列运算在spark当中并没有符合需求的方法,需要使用UDF。通过调用jar中的UDF,可以减少python与JVM的交互;用牺牲UDF部分的开发时间,尽量提高性能。本文将简述如何在PySpark脚本中调用Scala编写的UDF(以一个简单的函数为例)。

关于scala语法、spark&scala的内容可以参考:

本文依赖的开发环境为:Scala==2.11.7 / Spark & PySpark==2.1.2 / JDK 1.8

1. 安装配置Scala

虽然IDEA里可以下载安装Scala,但是可能因为服务器在国外所以速度比较慢。建议先直接手动安装。

下载解压Scala 2.11.7 for MacOS (链接:https://downloads.lightbend.com/scala/2.11.7/scala-2.11.7.tgz ) , 重命名为scala后移动到系统目录。

1
2
mv scala-2.11.7 scala     # 重命名 Scala 目录
mv scala /usr/local/share # 移动到系统文件夹

sudo vim ~/.bash_profile修改系统环境变量,在文件最后添加:

1
export PATH="$PATH:/usr/local/share/scala/bin"

保存退出,重启终端后输入scala,如果安装正常应该输出如下信息:

2. 将Scala object打包为可执行的jar文件(使用IntelliJ IDEA + Maven)

Maven是一种管理和构建Java项目的工具,由于Scala源于Java并可以运行在Java虚拟机之上,所以Scala的class文件自然也可以使用Maven进行管理和构建。Maven可以在终端中使用(brew install maven),本文主要介绍更为简便的使用IDE+Maven的方法。

2.1. 安装IDEA的Scala插件

打开IntelliJ IDEA,首先需要安装Scala的plugin。在启动窗口的configurations或者项目窗口左上角的IDEA-Preferences里找到Plugin,搜索scala然后安装如图插件。

2.2. 新建Maven项目

新建一个项目,项目类型选择Maven。下一步-输入项目名称-Finish。

项目结构如图所示。

2.3. 配置Scala Library

任务栏菜单File-Project Structure,对话框中选择Global Libraries,点击加号-Scala SDK,然后Browse选择scala文件夹。

(小tips:MacOS找不到系统隐藏文件/目录可以command+shift+G直接输入路径。)

2.4. 修改Maven项目相关设置

在项目名称处右键选择“Add Framework Support”,在弹出的对话框中选择Scala,确认版本无误后点确定。

分别右键src文件夹中main和test目录下的“java”文件夹,右键refactor-rename,勾选两个复选框,分别修改名字为scala

找到project目录下的pom.xml文件,接下来需要进行一些修改:

打开网页https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.11/2.1.2 , 找到下图文本框中的部分,全选复制。

在pom.xml中的<version>属性下面新建一个<dependencies></dependencies>属性,把复制的代码粘贴进去,也可以直接复制下面代码块粘贴到version后面。

1
2
3
4
5
6
7
8
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.2</version>
</dependency>
</dependencies>

最终的pom文件应该长这样

如果org.apache.spark有问题的话可以参考文章1文章2想办法自己导入一下相关的jar包。简单来说就是:

  1. 下载https://archive.apache.org/dist/spark/spark-2.1.2/spark-2.1.2-bin-hadoop2.7.tgz 并解压

  2. 在IDEA的菜单File-Project Structure-Modules里点击右下角的加号-JARs or directory,导入刚才解压的文件夹/jar/目录下面的所有jar包,应该就不会报错了。

2.5. 新建Scala Object

我们以一个小写字母转大写的函数为例。在目录下的src-main-scala右键,新建名为com.spark.utilities的package。在新建的package右键New-Scala class,类型选择object,命名为MyUpper。

代码如下:

1
2
3
4
5
6
7
package com.spark.utilities

import org.apache.spark.sql.api.java.UDF1

class MyUpper extends UDF1[String, String] {
override def call(input: String):String = input.toUpperCase
}

2.6. 编译.jar文件

在File-Program Structure-Artifacts点击加号-JAR-From modules with dependencies。在弹出的对话框直接OK。

此时右边应该会出现这样的一大堆东西。

我们把左边这一列上面的内容都删了,只留最下面的xxxxx output,最后应该是下面这样

点ok或者apply保存设置,回到主界面,Build-Build Artifacts-xxx:jar-Build,然后应该就开始构建jar文件了

最后在目录下面应该出现了out-artifact-xxx.jar文件,说明操作正确。

3. 在PySpark脚本中调用udf

示例代码(注意修改jar文件的路径):

1
2
3
4
5
6
7
8
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import *
spark = SparkSession.builder \
.config("spark.driver.extraClassPath", "JAR文件所在的目录/ScalaUtilities.jar") \
.getOrCreate()
sqlcontext = SQLContext(spark)
sqlcontext.registerJavaFunction("my_udf", "com.first.spark.MyUpper", StringType())
spark.sql("""SELECT my_udf('abeD123okoj')""").show()

输出正确即证明环境配置、代码调用都准确无误,之后就可以用scala实现一些udf啦。