第9章《Spark SQL》

第9章《Spark SQL》,第1张

第9章《Spark SQL》 1 简介

本章介绍 Spark 用来 *** 作结构化和半结构化数据的接口——Spark SQL。结构化数据是指任何有结构信息的数据。如下是Spark SQL的用途展示图:

Spark SQL 提供了以下三大功能:

Spark SQL 可以从各种==结构化数据源 (例如 JSON、Hive、Parquet 等)==中读取数据Spark SQL 不仅支持在 Spark 程序内使用 SQL 语句进行数据查询,也支持从类似商业智能软件Tableau这样的外部工具中通过标准数据库连接器(JDBC/ODBC)连接 Spark SQL 进行查询。当在 Spark 程序内使用 Spark SQL 时,Spark SQL 支持 SQL 与常规的 Python/Java/Scala 代码高度整合,包括连接 RDD 与 SQL 表、公开的自定义SQL 函数接口等。

为了实现这些功能,Spark SQL提供了一种特殊的 RDD,叫作SchemaRDDSchemaRDD 是存放 Row对象的 RDD,每个 Row 对象代表一行记录。SchemaRDD 还包含记录的结构信息(即数据字段)。SchemaRDD 看起来和普通的 RDD 很像,但是在内部,SchemaRDD 可以利用结构信息更加高效地存储数据。此外,SchemaRDD 还支持 RDD 上所没有的一些新 *** 作,比如运行 SQL 查询。SchemaRDD 可以从外部数据源创建,也可以从查询结果或普通RDD中创建。

2 连接Spark SQL

Apache Hive 是 Hadoop上的 SQL 引擎,Spark SQL 编译时可以包含 Hive 支持,也可以 不包含。包含 Hive 支持的 Spark SQL 可以支持 Hive 表访问、UDF(用户自定义函数)、 SerDe(序列化格式和反序列化格式),以及 Hive 查询语言(HiveQL/HQL)。如果你下载的是二进制版本的Spark,一般已经在编译时添加了 Hive 支持。
当使用 Spark SQL 进行编程时,根据是否使用 Hive 支持,有两个不同的入口。推荐使用的入口是== HiveContext==,它可以提供 HiveQL 以及其他依赖于 Hive 的功能的支持。更为基础的 SQLContext 则支持 Spark SQL 功能的一个子集,子集中去掉了需要依赖于 Hive 的功 能。这种分离主要是为那些可能会因为引入 Hive 的全部依赖而陷入依赖冲突的用户而设 计的。使用 HiveContext 不需要事先部署好 Hive。

3 在应用中使用Spark SQL

Spark SQL 最强大之处就是可以在 Spark 应用内使用。要以这种方式使用 Spark SQL,需要基于已有的 SparkContext 创建出一个 HiveContext。这个上下文环境提供了对 Spark SQL 的数据进行查询和交互的额外函数。使用 HiveContext 可以创建出表示结构化数据的 SchemaRDD,并且使用 SQL 或是类似 map() 的普通 RDD *** 作来 *** 作这些 SchemaRDD

3.1 初始化Spark SQL

要开始使用 Spark SQL,首先要在程序中添加一些 import 声明,如下所示:

from pyspark.sql import HiveContext, Row

添加好 import 声明之后,需要创建出一个 HiveContext 对象,需要传入一个 SparkContext 对象作为运行的基础:

hivCtx = HiveContext(sc)
3.2 查询

要在一张数据表上进行查询,需要调用 HiveContext中的 sql() 方法。要做的第一件事就是告诉 Spark SQL要查询的数据是什么。比如,需要先从 JSON 文件中读取 一些推特数据,把这些数据注册为一张临时表并赋予该表一个名字,然后就可以用 SQL 来 查询它了。

input = hiveCtx.jsonFile(inputFile)
input.registerTempTable("tweets")
topTweets = hiveCtx.sql(""" select text, retweetCount from tweets order by retweetCount limit 10""")
3.3 SchemaRDD

读取数据和执行查询都会返回 SchemaRDD。SchemaRDD 和传统数据库中的表的概念类似。从内部机理来看,SchemaRDD 是一个由 Row 对象组成的 RDD,附带包含每列数据类型的结构信息。Row 对象只是对基本数据类型(如整型和字符串型等)的数组的封装。需要特别注意的是,在今后的 Spark 版本中(1.3 及以后),SchemaRDD 这个名字可能会被改为 Dataframe。SchemaRDD 仍然是 RDD,所以你可以对其应用已有的 RDD 转化 *** 作,比如 map() 和 filter()。然而,SchemaRDD 也提供了一些额外的功能支持。最重要的是,你可以把任意SchemaRDD注册为临时表,这样就可以使用 HiveContext.sql来对它进行查询了。你可以通过 SchemaRDD 的 registerTempTable() 方法这么做。SchemaRDD 可以存储一些基本数据类型,也可以存储由这些类型组成的结构体和数组。SchemaRDD中可以存储的数据类型如下:


最后一种类型,也就是结构体,在 Spark SQL 中直接被表示为其他的 Row 对象。所有这些 复杂类型都可以互相嵌套。Row对象表示 SchemaRDD 中的记录,其本质就是一个定长的字段数组。在 Python 中,由于没有显式的类型系统,Row 对象变得稍有不同。我们使用 row[i] 来访问 第 i 个元素。除此之外,Python 中的 Row 还支持以 row.column_name 的形式使用名字来访问 其中的字段。

3.4 缓存

Spark SQL 的缓存机制与 Spark 中的稍有不同。由于我们知道每个列的类型信息,所以 Spark 可以更加高效地存储数据。为了确保使用更节约内存的表示方式进行缓存而不是储存整个对象,应当使用专门的 hiveCtx.cacheTable(“tableName”) 方法。当缓存数据表时, Spark SQL 使用一种列式存储格式在内存中表示数据。这些缓存下来的表只会在驱动器程 序的生命周期里保留在内存中,所以如果驱动器进程退出,就需要重新缓存数据。和缓存 RDD 时的动机一样,如果想在同样的数据上多次运行任务或查询时,就应把这些数据表缓 存起来。

4 读取和存储数据

Spark SQL 支持很多种结构化数据源,可以让你跳过复杂的读取过程,轻松从各种数据 源中读取到 Row 对象。这些数据源包括== Hive 表、JSON 和 Parquet 文件==。此外,当你使用 SQL 查询这些数据源中的数据并且只用到了一部分字段时,Spark SQL 可以智能地只扫描这些用到的字段,而不是像 SparkContext.hadoopFile 中那样简单粗暴地扫描全部数据。
除这些数据源之外,你也可以在程序中通过指定结构信息,将常规的 RDD 转化为 SchemaRDD。这使得在 Python 或者 Java 对象上运行 SQL 查询更加简单。当需要计算许多数值时,SQL 查询往往更加简洁(比如要同时求出平均年龄、最大年龄、不重复的用 户 ID 数目等)。不仅如此,你还可以自如地将这些 RDD 和来自其他 Spark SQL 数据源的 SchemaRDD 进行连接 *** 作。

4.1 Apache Hive

当从 Hive 中读取数据时,Spark SQL 支持任何 Hive 支持的存储格式(SerDe),包括文本文件、RCFiles、ORC、Parquet、Avro,以及 Protocol Buffer。如下展示了如何查询一张Hive表:

from pyspark.sql import HiveContext
hiveCtx = HiveContext(sc)
rows = hiveCtx.sql("select key, value from mytable")
keys = rows.map(lambda row: row[0])
4.2 Parquet

Parquet(http://parquet.apache.org/)是一种流行的列式存储格式,可以高效地存储具有嵌套 字段的记录。Parquet 格式经常在 Hadoop 生态圈中被使用,它也支持 Spark SQL 的全部数 据类型。Spark SQL 提供了直接读取和存储 Parquet 格式文件的方法。首先,你可以通过 HiveContext.parquetFile 或者 SQLContext.parquetFile 来读取数据,如下所示:

tbl = rows.registerTempTable("people")
pandaFriends = hiveCtx.sql("SELECt name FROM people WHERe favouriteAnimal = "panda"")
print "Panda friends"
print pandaFriends.map(lambda row: row.name).collect()

最后,你可以使用 saveAsParquetFile() 把 SchemaRDD 的内容以 Parquet 格式保存,如下所示:

pandaFiends.saveAsParquetFile("hdfs://...")
4.3 JSON

如果你有一个 JSON 文件,其中的记录遵循同样的结构信息,那么 Spark SQL 就可以通过扫描文件推测出结构信息,并且让你可以使用名字访问对应字段。要读取 JSON 数据,只要调用 hiveCtx 中的 jsonFile() 方法即可。如下所示:

input = hiveCtx.jsonFile(inputFile)
4.4 基于RDD

除了读取数据,也可以基于 RDD 创建 SchemaRDD。在 Python 中,可以创建一个由 Row对象组成的 RDD,然后调用 inferSchema(),如下所示:

happyPeopleRDD = sc.parallelizer([Row(name="holden", favouriteBeverage="coffee")])
happyPeopleSchemaRDD = hiveCtx.inferSchema(happyPeopleRDD)
happyPeopleSchemaRDD.registerTempTable("happy_people")
5 用户自定义函数

用户自定义函数,也叫UDF,可以让我们使用 Python/Java/Scala 注册自定义函数,并在 SQL 中调用。在 Spark SQL 中,编写 UDF 尤为简单。Spark SQL 不仅有自己的 UDF 接口,也支持已有的 Apache Hive UDF。

5.1 Spark SQL UDF

我们可以使用 Spark 支持的编程语言编写好函数,然后通过 Spark SQL 内建的方法传递进 来,非常便捷地注册我们自己的 UDF。在 Scala 和 Python 中,可以利用语言原生的函数和 lambda 语法的支持。如下是一个用来计算字符串长度的非常简易的 UDF,可以 用它来计算推文的长度:

hiveCtx.registerFunction("strLenPython", lambda x: len(x), IntegerType())
lengthSchemaRDD = hiveCtx.sql("select strLenPython("text") from tweets limit 10")
5.2 Hive UDF

Spark SQL 也支持已有的 Hive UDF。标准的 Hive UDF 已经自动包含在了 Spark SQL 中。如果需要支持自定义的 Hive UDF,我们要确保该 UDF 所在的 JAR 包已经包含在了应用中。需 要注意的是,如果使用的是 JDBC 服务器,也可以使用 --jars 命令行标记来添加 JAR。要使用 Hive UDF,应该使用 HiveContext,要注册一个 Hive UDF,只需调用 hiveCtx.sql(“CREATE TEMPORARY FUNCTION name AS class.function”)。

6 Spark SQL性能

Spark SQL 提供的高级查询语言及附加的类型信息可以使 Spark SQL 数据查询更加高效。Spark SQL 不仅是给熟悉 SQL 的用户使用的。Spark SQL 使有条件的聚合 *** 作变得非常容 易,比如对多个列进行求值。Spark SQL 可以利用其对类型的了解来高效地表示数据。当缓存数据时,Spark SQL 使用内存式的列式存储。这不仅仅节约了缓存的空间,而且尽可能地减少了后续查询中针对某几个字段查询时的数据读取。
谓词下推可以让 Spark SQL 将查询中的一些部分工作“下移”到查询引擎上。如果我们 只需在 Spark 中读取某些特定的记录,标准的方法是读入整个数据集,然后在上面执行筛选条件。然而,在 Spark SQL 中,如果底层的数据存储支持只读取键值在一个范围内的记录,或是其他某些限制条件,Spark SQL 就可以把查询语句中的筛选限制条件推到数据存储层,从而大大减少需要读取的数据
Spark SQL的性能调优选项有如下所示:

一些选项的配置需要给予特别的考量。第一个是 spark.sql.codegen,这个选项可以让 Spark SQL 把每条查询语句在运行前编译为 Java 二进制代码。由于生成了专门运行指定查询的代码,codegen 可以让大型查询或者频繁重复的查询明显变快。然而,在运行特别快(1 ~ 2 秒)的即时查询语句时,codegen有可能会增加额外开销,因为codegen 需要让每 条查询走一遍编译的过程。codegen 还是一个试验性的功能,但是我们推荐在所有大型的 或者是重复运行的查询中使用 codegen。调优时可能需要考虑的第二个选项是spark.sql.inMemoryColumnarStorage.batchSize。在缓存SchemaRDD时,Spark SQL 会按照这个选项制定的大小(默认值是 1000)把记录分 组,然后分批压缩。太小的批处理大小会导致压缩比过低,而批处理大小过大的话,比如 当每个批次处理的数据超过内存所能容纳的大小时,也有可能会引发问题。如果你表中的 记录比较大(包含数百个字段或者包含像网页这样非常大的字符串字段),你就可能需要 调低批处理大小来避免内存不够(OOM)的错误。如果不是在这样的场景下,默认的批处理大小是比较合适的,因为压缩超过 1000 条记录时也基本无法获得更高的压缩比了。

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/zaji/5704956.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-12-17
下一篇2022-12-17

发表评论

登录后才能评论

评论列表(0条)

    保存