Spark基本操作-sc

RDD是Spark的核心;这些无schema数据结构是在Spark中处理的最基本的数据结构。

创建 SparkSession

1
2
3
4
5
from pyspark.sql import SparkSession

spark = SparkSession.builder \
.appName("Word Count") \
.getOrCreate()

构建 sc

1
2
3
4
5
6
7
from pyspark.sql import SparkSession

spark = SparkSession.builder \
.appName("Word Count") \
.getOrCreate()

sc = spark.sparkContext

创建RDD

从内存生成创建

1
data = sc.parallelize([('a',1),('b',2),('c',3])])
  • parallelize 的参数可以为集合、元素list或dict。

读文件创建

1
xs = sc.textFile('zx_xs_chapters.txt',2)

textFile方法的最后一个参数代表数据集被划分的分取的个数,如果创建时没有指定可以通过下面方式重新分区:

1
2
3
xs = sc.textFile('zx_xs_chapters.txt')
xs = xs.repartition(2)
len(xs.glom().collect()) # Out: 2

这样我们便读进来一本小说的所有章节:

1
2
xs.take(1)
# Out: ['1#578c4a41a350f5245b52ef02#正文 第3章 拜师习武#黑漆漆的山岗上,萧...有太极心法与震山掌做底子,胜天的刀法进步极快。#2#2017/12/5 ']

常用转换

1
xs = xs.map(lambda row:row.split('#'))

xs本身是一个类似于列表的RDD对象,包含若干元素。.map方法接受一个函数,这个函数将会作用于xs的每个元素,返回计算好的新的可迭代的RDD对象。

上面代码中我们把小说每一章节内容按照#分割,得到一个字段列表:

1
2
3
4
5
6
7
8
xs.take(1)
# Out:
[['1',
'578c4a41a350f5245b52ef02',
'正文 第3章 拜师习武',
'黑漆漆的山岗上,萧胜天浑身...的刀法进步极快。',
'2',
'2017/12/5 ']]

对比没有map操作之前我们得到了一个分隔好的字段列表。

  • 需要注意的是xs.take(1)仅仅取了我们RDD数据集中的第一个元素,它是第一章的列表,包含若干字段。

我们再获取前十章的标题看看:

1
2
3
4
5
6
7
8
9
10
11
12
13
title = xs.map(lambda list:list[2])
title.take(10)
# Out:
['正文 第3章 拜师习武',
'正文 第1章 霾氏部落',
'正文 第2章 祭灵',
'正文 第6章 战利品',
'正文 第5章 挑衅!',
'正文 第3章 万千世界',
'正文 第7章 阴谋',
'正文 第8章 入侵!',
'正文 第9章 激战!',
'正文 第4章 满载而归']

除了map之外还有一些很重要的转换操作:

  • filter 接受一个函数,对于返回值为false的丢弃,主要功能是过滤。
  • flatMap 与map类似,但它返回一个扁平的结果,它把每一行看做一个列表对待,然后将所有的记录简单地加入到一起。通过传递一个空列表可以丢弃格式不正确的记录。
  • distinct 去重
  • sample 返回数据集的随机样本
  • repartition 对数据集重新分区,如上文所示

在此不一一赘述

常用操作

和转换不同,操作执行数据集上的计划任务。一旦完成数据转换,则可以执行相应转换。

.take方法

如上文所示,这可以说是最有用的方法(也是用得最多的方法,如.map(…)方法)。该方法优于.collect(…),因为它只返回单个数据分区的前n行,对比之下,.collect(…)返回的是整个RDD。

.collect方法

该方法将所有RDD的元素返回给驱动程序。

.reduce方法

reduce方法接受一个函数,它把RDD的前两个元素作为参数传入进行运算,运算结果再作为一个元素和第三个元素传入运算…,以此类推直至所有元素运算完毕,我们可以用这个操作计算小说正文部分的总字数:

1
2
len(xs.map(lambda list:list[3]).reduce(lambda x,y:x+y))
# Out: 844418

注意:

  • reduce传递的函数需要是关联的,即元素顺序改变,结果不变;该函数还需要是交换的,即操作符顺序改变,结果不变。

.count方法

统计RDD元素的数量:

1
2
xs.count()
# Out: 295

看来我们的小说一共有295章。

  • len(xs.collect())不同的是该.count方法不需要把整个数据集移动到驱动程序。

小结

我们了解了如何在Pyspark中创建和操作RDD,但Python RDD与Java、Scala相比速度慢很多。PySpark应用程序与Scala中编写的数据结构相符的数据结构是DataFrame。

坚持原创技术分享,您的支持将鼓励我继续创作!