Spark SQL、DataFrames 和 Datasets 指南


  Spark SQL 是一个结构化数据处理的 Spark 模块 。 与基础的 Spark RDD API 不同的是, Spark SQL 所提供的接口为 Spark 提供了 更多关于数据结构和正在执行的计算结构的信息。 Spark 在其内部利用这些额外的信息去做更多的优化。有几种用于和 Sparrk SQL

交互的方法,包括 SQL 和 Dataset API。 当你计算一个结果, 会使用同一个执行引擎, 这独立于你所用来描述这个算法的API和语言。这种一致性意味着开发者可以轻易地在不同的 API 中来回切换, 因为它为表达给定的转换提供了最自然的方式。

  本页所有示例使用了 Spark 提供的样例数据并且可以在 spark-shell 、pyspark shell 或者 sparkR shell 中运行。


  Spark SQL 的一个用处是执行 SQL 查询。 Spark SQL 同样可以用来从 现有的 HIVE 中读取数据。 更多有关配置这个特性的信息,请查阅 HIVE Tables 部分。当你使用其他语言执行SQL时,将会返回一个 Dataset 或者 DataFrame 作为结果。你同样可以使用命令行或者 JDBC/ODBC 与 SQL 接口进行交互。

Dataset 和 Dataframe

  Dataset 是一种分布式数据集,是 Spark1.6 新增的接口。它提供了RDD(强类型,可以使用强大的 lambda 表达式)的优点,并受益于Spark SQL 的优化执行引擎。Dataset 可以通过 JVM 构建,然后使用转换方法(map, flatMap, filter等等)进行操作。 Dataset API 

在 Java 和 Scala 中可用。 Python 并不支持Dataset API。但是由于Python的动态特性, Dataset API 的很多优势都是可用的(比如你可以自然地使用名称 row.columnName 来访问 row 的域 )。 R 语言的情况类似。

  DataFrame 是一种按列命名组织的 Dataset, 它在概念上等价于关系型数据库的一个表或者 R/Python 的一个数据帧, 但是它(DataFrame)的底层做了更多的优化。DataFrame 可以通过大量的数据源构建,例如:结构化的数据文件, HIVE 的表, 数据库,或现有的RDD。Java、Python、Scala、R语言都支持 DataFrame API。 在 Scala 和 Java, DataFrame 由Dataset的 rowS 表示。 在 Scala API 中,DataFrame 可以简单地认为是 Dataset[Row] 的别名。 然而,在 Java API 中, 用户需要使用 Dataset<Row> 来表示 DataFrame。

  在整个文档中, 我们通常把 Scala/Java Dataset 的 RowS 称为 DataFrames。


起点: SparkSession

Spark 所有功能的入口是 SparkSession 类。创建最基本的 SaprkSession, 只需要调用 SparkSession.builder():


import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits. 

在  Spark 仓库 “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” 中可以找到完整的示例代码。



import org.apache.spark.sql.SparkSession;

SparkSession spark = SparkSession
  .appName("Java Spark SQL basic example")
  .config("spark.some.config.option", "some-value")

在  Spark 仓库 “examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java” 中可以找到完整的示例代码。



from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \

在  Spark 仓库 “examples/src/main/python/sql/basic.py” 中可以找到完整的示例代码。



sparkR.session(appName = "R Spark SQL basic example", sparkConfig = list(spark.some.config.option = "some-value"))

在  Spark 仓库 “examples/src/main/r/RSparkSQLExample.R” 中可以找到完整的示例代码

请注意,sparkR.session() 第一次被调用时,它会初始化一个全局的 SparkSession 单例对象,并且之后继续调用这个方法都将返回这个实例。 通过这种方式,用户只需要对 SparkSession 做一次初始化,然后 SparkR 的其他方法比如 read.df 将会隐式地访问这个全局地单例对象, 并且用户不需要传递 SparkSession 的实例。


Spark2.0 的 SparkSession 提供了对 HIVE 特性的内置支持, 包括使用 HiveQL 编写查询语句的能力,访问 Hive UDFs 和 从 Hive Table 中读取数据的能力。为了使用这些特性,您需要安装一个 HIVE。


创建 DataFrame

有了SparkSession, 应用程序可以通过本地的 R data.frame、Hive Table、 或者 Spark 数据源 来创建DataFrame。

作为示例,以下代码使用一个 JSON 文件的内容 创建一个 DataFrame


val df = spark.read.json("examples/src/main/resources/people.json")

// Displays the content of the DataFrame to stdout
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

在  Spark 仓库 “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” 中可以找到完整的示例代码。



import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");

// Displays the content of the DataFrame to stdout
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

在  Spark 仓库 “examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java” 中可以找到完整的示例代码。



# spark is an existing SparkSession
df = spark.read.json("examples/src/main/resources/people.json")
# Displays the content of the DataFrame to stdout
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+

在  Spark 仓库 “examples/src/main/python/sql/basic.py” 中可以找到完整的示例代码。



df <- read.json("examples/src/main/resources/people.json")

# Displays the content of the DataFrame
##   age    name
## 1  NA Michael
## 2  30    Andy
## 3  19  Justin

# Another method to print the first few rows and optionally truncate the printing of long values
## +----+-------+
## | age|   name|
## +----+-------+
## |null|Michael|
## |  30|   Andy|
## |  19| Justin|
## +----+-------+

在  Spark 仓库 “examples/src/main/r/RSparkSQLExample.R” 中可以找到完整的示例代码


弱类型的 Dataset 操作(aka DataFrame 操作)

DataFrame 为 Scala、Java、Python、R语言提供了一种特定的结构化数据操作。

上面提到过,在 Spark2.0 中,DataFrame 对于 Scala 和 Java API 仅仅是 Dataset 的 RowS。这些操作也被称为 “弱类型转换”,这与 强类型的Scala/Java 中的 “强类型转换” 形成了鲜明的对比。

这里我们囊括了使用 Datasets 做结构化数据处理的基本示例:


// This import is needed to use the $-notation
import spark.implicits._
// Print the schema in a tree format
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+

// Select everybody, but increment the age by 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+

// Select people older than 21
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// Count people by age
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+

在  Spark 仓库 “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” 中可以找到完整的示例代码。



// col("...") is preferable to df.col("...")
import static org.apache.spark.sql.functions.col;

// Print the schema in a tree format
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+

// Select everybody, but increment the age by 1
df.select(col("name"), col("age").plus(1)).show();
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+

// Select people older than 21
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// Count people by age
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+

在  Spark 仓库 “examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java” 中可以找到完整的示例代码。



 对于Python来说,我们可以通过属性(df.age)或者通过索引(df[‘age’]) 来访问 DataFrame 的列。 虽然前者用于交互式数据探索非常方便, 但使用者强烈建议使用后者,因为它具有前瞻性,并且不会因为 DataFrame 的列命和属性名重复产生冲突。

# spark, df are from the previous example
# Print the schema in a tree format
# root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)

# Select only the "name" column
# +-------+
# |   name|
# +-------+
# |Michael|
# |   Andy|
# | Justin|
# +-------+

# Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + 1).show()
# +-------+---------+
# |   name|(age + 1)|
# +-------+---------+
# |Michael|     null|
# |   Andy|       31|
# | Justin|       20|
# +-------+---------+

# Select people older than 21
df.filter(df['age'] > 21).show()
# +---+----+
# |age|name|
# +---+----+
# | 30|Andy|
# +---+----+

# Count people by age
# +----+-----+
# | age|count|
# +----+-----+
# |  19|    1|
# |null|    1|
# |  30|    1|
# +----+-----+ 

在  Spark 仓库 “examples/src/main/python/sql/basic.py” 中可以找到完整的示例代码。



# Create the DataFrame
df <- read.json("examples/src/main/resources/people.json")

# Show the content of the DataFrame
##   age    name
## 1  NA Michael
## 2  30    Andy
## 3  19  Justin

# Print the schema in a tree format
## root
## |-- age: long (nullable = true)
## |-- name: string (nullable = true)

# Select only the "name" column
head(select(df, "name"))
##      name
## 1 Michael
## 2    Andy
## 3  Justin

# Select everybody, but increment the age by 1
head(select(df, df$name, df$age + 1))
##      name (age + 1.0)
## 1 Michael          NA
## 2    Andy          31
## 3  Justin          20

# Select people older than 21
head(where(df, df$age > 21))
##   age name
## 1  30 Andy

# Count people by age
head(count(groupBy(df, "age")))
##   age count
## 1  19     1
## 2  NA     1
## 3  30     1

在  Spark 仓库 “examples/src/main/r/RSparkSQLExample.R” 中可以找到完整的示例代码


关于 DataFrame 可执行的操作的完整列表,请移步 API Documentation

除了简单的列引用和表示之外,DataFrame 同样有一个丰富的函数库,包括字符串操作、日期算法、常用数学操作 等等。 完整的列表可以在 DataFrame Function Reference.中找到。


