PySpark和SparkSQL基础:如何利用Python编程执行Spark(附代码)
作者:Pinar Ersoy
翻译:孙韬淳
校对:陈振东
本文约2500字,建议阅读10分钟
本文通过介绍Apache Spark在Python中的应用来讲解如何利用PySpark包执行常用函数来进行数据处理工作。
Apache Spark是一个对开发者提供完备的库和API的集群计算系统,并且支持多种语言,包括Java,Python,R和Scala。SparkSQL相当于Apache Spark的一个模块,在DataFrame API的帮助下可用来处理非结构化数据。
通过名为PySpark的Spark Python API,Python实现了处理结构化数据的Spark编程模型。
这篇文章的目标是展示如何通过PySpark运行Spark并执行常用函数。
Python编程语言要求一个安装好的IDE。最简单的方式是通过Anaconda使用Python,因其安装了足够的IDE包,并附带了其他重要的包。
通过这个链接,你可以下载Anaconda。你可以在Windows,macOS和Linux操作系统以及64位/32位图形安装程序类型间选择。我们推荐安装Python的最新版本。
Anaconda的安装页面(https://www.anaconda.com/distribution/)
下载好合适的Anaconda版本后,点击它来进行安装,安装步骤在Anaconda Documentation中有详细的说明。
安装完成时,Anaconda导航主页(Navigator Homepage)会打开。因为只是使用Python,仅需点击“Notebook”模块中的“Launch”按钮。
Anaconda导航主页
为了能在Anaconda中使用Spark,请遵循以下软件包安装步骤。
第一步:从你的电脑打开“Anaconda Prompt”终端。
第二步:在Anaconda Prompt终端中输入“conda install pyspark”并回车来安装PySpark包。
第三步:在Anaconda Prompt终端中输入“conda install pyarrow”并回车来安装PyArrow包。
当PySpark和PyArrow包安装完成后,仅需关闭终端,回到Jupyter Notebook,并在你代码的最顶部导入要求的包。
首先需要初始化一个Spark会话(SparkSession)。通过SparkSession帮助可以创建DataFrame,并以表格的形式注册。其次,可以执行SQL表格,缓存表格,可以阅读parquet/json/csv/avro数据格式的文档。
想了解SparkSession每个参数的详细解释,请访问pyspark.sql.SparkSession。
一个DataFrame可被认为是一个每列有标题的分布式列表集合,与关系数据库的一个表格类似。在这篇文章中,处理数据集时我们将会使用在PySpark API中的DataFrame操作。
你可以从https://www.kaggle.com/cmenca/new-york-times-hardcover-fiction-best-sellers中下载Kaggle数据集。
3.1、从Spark数据源开始
DataFrame可以通过读txt,csv,json和parquet文件格式来创建。在本文的例子中,我们将使用.json格式的文件,你也可以使用如下列举的相关读取函数来寻找并读取text,csv,parquet文件格式。
表格中的重复值可以使用dropDuplicates()函数来消除。
使用dropDuplicates()函数后,我们可观察到重复值已从数据集中被移除。
查询操作可被用于多种目的,比如用“select”选择列中子集,用“when”添加条件,用“like”筛选列内容。接下来将举例一些最常用的操作。完整的查询操作列表请看Apache Spark文档。
5.1、“Select”操作
可以通过属性(“author”)或索引(dataframe[‘author’])来获取列。
第一个结果表格展示了“author”列的查询结果,第二个结果表格展示多列查询。
5.2、“When”操作
在第一个例子中,“title”列被选中并添加了一个“when”条件。
展示特定条件下的10行数据
在第二个例子中,应用“isin”操作而不是“when”,它也可用于定义一些针对行的条件。
5行特定条件下的结果集
5.3、“Like”操作
在“Like”函数括号中,%操作符用来筛选出所有含有单词“THE”的标题。如果我们寻求的这个条件是精确匹配的,则不应使用%算符。
title列中含有单词“THE”的判断结果集
5.4、“startswith”-“endswith”
StartsWith指定从括号中特定的单词/内容的位置开始扫描。类似的,EndsWith指定了到某处单词/内容结束。两个函数都是区分大小写的。
对5行数据进行startsWith操作和endsWith操作的结果。
5.5、“substring”操作
Substring的功能是将具体索引中间的文本提取出来。在接下来的例子中,文本从索引号(1,3),(3,6)和(1,6)间被提取出来。
分别显示子字符串为(1,3),(3,6),(1,6)的结果
在DataFrame API中同样有数据处理函数。接下来,你可以找到增加/修改/删除列操作的例子。
6.1、增加列
在数据集结尾已添加新列
6.2、修改列
对于新版DataFrame API,withColumnRenamed()函数通过两个参数使用。
“Amazon_Product_URL”列名修改为“URL”
6.3、删除列
列的删除可通过两种方式实现:在drop()函数中添加一个组列名,或在drop函数中指出具体的列。两个例子展示如下。
“publisher”和“published_date”列用两种不同的方法移除。
存在几种类型的函数来进行数据审阅。接下来,你可以找到一些常用函数。想了解更多则需访问Apache Spark doc。
通过GroupBy()函数,将数据列根据指定函数进行聚合。
作者被以出版书籍的数量分组
通过使用filter()函数,在函数内添加条件参数应用筛选。这个函数区分大小写。
标题列经筛选后仅存在有“THE HOST”的内容,并显示5个结果。
对每个数据集,经常需要在数据预处理阶段将已存在的值替换,丢弃不必要的列,并填充缺失值。pyspark.sql.DataFrameNaFunction库帮助我们在这一方面处理数据。举例如下。
在RDD(弹性分布数据集)中增加或减少现有分区的级别是可行的。使用repartition(self,numPartitions)可以实现分区增加,这使得新的RDD获得相同/更高的分区数。分区缩减可以用coalesce(self, numPartitions, shuffle=False)函数进行处理,这使得新的RDD有一个减少了的分区数(它是一个确定的值)。请访问Apache Spark doc获得更多信息。
原始SQL查询也可通过在我们SparkSession中的“sql”操作来使用,这种SQL查询的运行是嵌入式的,返回一个DataFrame格式的结果集。请访问Apache Spark doc获得更详细的信息。
13.1、数据结构
DataFrame API以RDD作为基础,把SQL查询语句转换为低层的RDD函数。通过使用.rdd操作,一个数据框架可被转换为RDD,也可以把Spark Dataframe转换为RDD和Pandas格式的字符串同样可行。
不同数据结构的结果
13.2、写并保存在文件中
任何像数据框架一样可以加载进入我们代码的数据源类型都可以被轻易转换和保存在其他类型文件中,包括.parquet和.json。请访问Apache Spark doc寻求更多保存、加载、写函数的细节。
当.write.save()函数被处理时,可看到Parquet文件已创建。
当.write.save()函数被处理时,可看到JSON文件已创建。
13.3、停止SparkSession
Spark会话可以通过运行stop()函数被停止,如下。
代码和Jupyter Notebook可以在我的GitHub上找到。
欢迎提问和评论!
参考文献:
1. http://spark.apache.org/docs/latest/
2. https://docs.anaconda.com/anaconda/
原文标题:
PySpark and SparkSQL Basics
How to implement Spark with Python Programming
原文链接:
https://towardsdatascience.com/pyspark-and-sparksql-basics-6cb4bf967e53
编辑:于腾凯
校对:洪舒越
译者简介
孙韬淳,首都师范大学大四在读,主修遥感科学与技术。目前专注于基本知识的掌握和提升,期望在未来有机会探索数据科学在地学应用的众多可能性。爱好之一为翻译创作,在业余时间加入到THU数据派平台的翻译志愿者小组,希望能和大家一起交流分享,共同进步。
—完—
关注清华-青岛数据科学研究院官方微信公众平台“ THU数据派 ”及姊妹号“ 数据派THU ”获取更多讲座福利及优质内容。
AI 代码编辑器-Cursor
Cursor 包含强大的自动完成功能,可预测您的下一次编辑。启用后,此功能将始终处于打开状态,并会根据您最近的更改建议跨多行编辑您的代码。
光标可以看到您最近的更改,因此它可以预测您下一步要做什么。
光标可以一次建议多个编辑,从而节省您的时间。
随意打字,Cursor 会修复你的错误。
光标预测您的下一个光标位置,以便您可以无缝导航您的代码。
聊天功能可让您与能够看到您的代码库的 AI 进行交谈。聊天功能始终可以看到您当前的文件和光标,因此您可以询问它诸如“这里有错误吗?”之类的问题。您可以使用Ctrl +Shift+L 或“@”将特定代码块添加到上下文中。您可以使用Ctrl +Enter与整个代码库聊天。
使用 @Codebase 或Ctrl Enter 询问有关代码库的问题。光标将搜索您的代码库以查找与您的查询相关的代码。
引用带有 @ 符号的代码作为 AI 的上下文。只需输入 @ 即可查看文件夹中所有文件和代码符号的列表。
点击聊天下的图像按钮或将图像拖入输入框以将视觉内容纳入聊天。
使用@Web 从互联网获取最新信息。Cursor 将为您搜索网络并使用最新信息来回答您的问题。
通过单击任何聊天代码块顶部的播放按钮,将聊天中的代码建议应用回您的代码库中。
使用@LibraryName 引用流行的库,或使用@Docs → 添加新文档添加您自己的库。
Ctrl K 可让您使用 AI 编辑和编写代码。要编辑,请尝试选择一些代码,单击“编辑”,然后描述应如何更改代码。要生成全新的代码,只需键入Ctrl K,而无需选择任何内容。
使用 AI 编辑和编写代码。选择一些代码,单击 Ctrl K,然后描述应如何更改代码。或者,使用Ctrl K 生成新代码而不选择任何内容。
在终端中使用Ctrl K 以简单的英语编写终端命令。Cursor 会将它们转换为您需要的终端命令。
如果您对代码的某些部分有任何快速问题,您可以选择并单击“快速问题”以立即获得答案。
本文作者及来源:Renderbus瑞云渲染农场https://www.renderbus.com
文章为作者独立观点不代本网立场,未经允许不得转载。