Spark-DataFrame

DataFrame是一种不可变的分布式数据集,这种数据集被组织成指定的列,类似于关系数据库中的表。

Spark早期的API中(即RDD),由于Java JVM和Py4J之间的通信,每当使用RDD执行PySpark程序时,潜在地需要巨大的开销来执行作业。

DataFrame和Catalyst优化器(以及Tungsten项目)的意义是在和非优化的RDD查询比较时增加PySpark查询的性能。

使用DataFrame,过去不仅有明显的Python性能改进,现在还有Python、Scale、SQL和R之间的性能校验。

通过RDD转换创建DataFrame

首先创建我们的jsonRDD:

1
jsonRDD = sc.textFile(r'C:\Users\Guitar\Desktop\json.txt',2)

json.txt文件中我们按行存放了一些格式化的json字符串:

1
2
3
4
{"title": "第1章 黄山真君和九洲一号群","link": "http://m.baidu.com/tc?appui=alaxs&gid=3965456126&tn=utouch&page=ct&url=http%3A%2F%2Fwww.uukanshu.com%2Fb%2F33933%2F55150.html#2","totalpage": 0,"partsize": 0,"order": 0,"currency": 0,"unreadble": false,"isVip": false}
{"title": "第2章 且待本尊算上一卦","link": "http://m.baidu.com/tc?appui=alaxs&gid=3965456126&tn=utouch&page=ct&url=http%3A%2F%2Fwww.92txt.net%2Fbook%2F25%2F25882%2F10074048.html#2","totalpage": 0,"partsize": 0,"order": 0,"currency": 0,"unreadble": false,"isVip": false}
{"title": "第3章 一张丹方","link": "http://m.baidu.com/tc?appui=alaxs&gid=3965456126&tn=utouch&page=ct&url=http%3A%2F%2Fwww.92txt.net%2Fbook%2F25%2F25882%2F10074053.html#2","totalpage": 0,"partsize": 0,"order": 0,"currency": 0,"unreadble": false,"isVip": false}
...

创建 DataFrame

现在,我们已经创建了RDD,利用SparkSession的read.json方法,RDD将会被转换成一个DataFrame,以下是创建DataFrame的代码:

1
xs_chapter = spark.read.json(jsonRDD)

创建一个临时表

利用DataFrame的.createOrReplaceTempView方法创建一个临时视图表:

1
xs_chapter.createOrReplaceTempView("xs_chapter")
  • 创建临时表是一次DataFrame转换,我们可以通过临时表用SQL查询DataFrame中的数据。

简单的DataFrame查询

show()

运行.show方法默认显示前10行,当然也可以传入参数n来控制显示的行数。

1
2
3
4
5
6
7
8
9
10
11
12
13
+--------+-----+--------------------+-----+--------+--------------+---------+---------+
|currency|isVip| link|order|partsize| title|totalpage|unreadble|
+--------+-----+--------------------+-----+--------+--------------+---------+---------+
| 0|false|http://m.baidu.co...| 0| 0|第1章 黄山真君和九洲一号群| 0| false|
| 0|false|http://m.baidu.co...| 0| 0| 第2章 且待本尊算上一卦| 0| false|
| 0|false|http://m.baidu.co...| 0| 0| 第3章 一张丹方| 0| false|
| 0|false|http://m.baidu.co...| 0| 0| 第4章 h市三品后天雷劫| 0| false|
| 0|false|http://m.baidu.co...| 0| 0| 第5章 要相信科学!| 0| false|
| 0|false|http://m.baidu.co...| 0| 0| 第6章 铜卦仙师| 0| false|
| 0|false|http://m.baidu.co...| 0| 0| 第7章 被群灭的不良们| 0| false|
| 0|false|http://m.baidu.co...| 0| 0| 第8章 羽柔子和罗信街区| 0| false|
| 0|false|http://m.baidu.co...| 0| 0| 第9章 另一个罗信街区| 0| false|
+--------+-----+--------------------+-----+--------+--------------+---------+---------+

SQL查询

1
2
3
4
5
spark.sql("select isVip,partsize,title from xs_chapter").take(3)
# Out:
[Row(isVip=False, partsize=0, title='第1章 黄山真君和九洲一号群'),
Row(isVip=False, partsize=0, title='第2章 且待本尊算上一卦'),
Row(isVip=False, partsize=0, title='第3章 一张丹方')]

RDD的交互操作

事实上有两种从RDD变换到DataFrame(或者Dataset[T])的不同方法:使用反射推断模式或以编程方式指定模式。

  • 模式可以理解为DataFrame字段的数据类型

使用反射来推断模式

在建立DataFrame和运行查询的过程中,DataFrame的模式是自动定义的。

最初,行对象通过传递一列键/值对作为行类的**kwargs来构造。然后,SparkSQL将行对象的RDD转变为一个DataFrame,在DataFrame中键就是列,数据类型通过采样数据来推断。

利用DataFrame的printSchema方法查看模式的定义:

1
2
3
4
5
6
7
8
9
10
xs_chapter.printSchema()
root
|-- currency: long (nullable = true)
|-- isVip: boolean (nullable = true)
|-- link: string (nullable = true)
|-- order: long (nullable = true)
|-- partsize: long (nullable = true)
|-- title: string (nullable = true)
|-- totalpage: long (nullable = true)
|-- unreadble: boolean (nullable = true)

假设某个字段的类型不是我们预期的,该如何调整?

编程指定模式

我们通过在SparkSQL中引入数据类型(pyspark.sql.types),以编程方式来指定模式,并生成一些.csv数据,如下例所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 导入数据类型
from pyspark.sql.types import *

# 生成逗号分隔的数据
csvRDD = sc.parallelize([(123,'张三',18,'游泳'),(456,'李四',19,'游泳'),(789,'小王',23,'游泳')])

csvRDD.take(3)
Out[33]: [(123, '张三', 18, '游泳'), (456, '李四', 19, '游泳'), (789, '小王', 23, '游泳')]

# 指定对应字段的模式(数据类型)
schema = StructType([StructField('id',LongType(),True),StructField('name',StringType(),True),StructField('age',LongType(),True),StructField('hobby',StringType(),True)])

# 创建DataFrame
swimmers = spark.createDataFrame(csvRDD,schema)

# 创建临时表
swimmers.createOrReplaceTempView('swimmers')

swimmers.printSchema()
root
|-- id: long (nullable = true)
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- hobby: string (nullable = true)

swimmers.show()
+---+----+---+-----+
| id|name|age|hobby|
+---+----+---+-----+
|123| 张三| 18| 游泳|
|456| 李四| 19| 游泳|
|789| 小王| 23| 游泳|
+---+----+---+-----+

使用DataFrame API查询

行数

1
2
swimmers.count()
# Out: 3

筛选

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
swimmers.select('id','name','age').filter('age=18').show()
+---+----+---+
| id|name|age|
+---+----+---+
|123| 张三| 18|
+---+----+---+

swimmers.select(swimmers.id,swimmers.name,swimmers.age).filter(swimmers.age==18).show()
+---+----+---+
| id|name|age|
+---+----+---+
|123| 张三| 18|
+---+----+---+

swimmers.select('id','name','age').filter('age like "1%"').show()
+---+----+---+
| id|name|age|
+---+----+---+
|123| 张三| 18|
|456| 李四| 19|
+---+----+---+

使用SQL查询

如上文,我们可以使用SparkSession的sql方法查询swimmers,前提是针对swimmers建立了临时视图表

行数

1
2
3
4
5
6
spark.sql('select count(*) from swimmers').show()
+--------+
|count(1)|
+--------+
| 3|
+--------+

筛选

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
spark.sql('select * from swimmers where age>18').show()
+---+----+---+-----+
| id|name|age|hobby|
+---+----+---+-----+
|456| 李四| 19| 游泳|
|789| 小王| 23| 游泳|
+---+----+---+-----+

spark.sql('select * from swimmers where age like "1%"').show()
+---+----+---+-----+
| id|name|age|hobby|
+---+----+---+-----+
|123| 张三| 18| 游泳|
|456| 李四| 19| 游泳|
+---+----+---+-----+

结语

使用Spark DataFrame,Python开发人员可以利用一个简单的并且潜在地加快速度的抽象层。最初Spark中的Python速度较慢的一个主要原因源自于Python子进程和JVM之间的通信层。对于Python DataFrame的用户,我们有一个在Scala DataFrame周围的Python包装器,Scala DataFrame避免了Python子进程/JVM的通信开销。

坚持原创技术分享,您的支持将鼓励我继续创作!