从WordCount开始学Spark

梳理Word Count 的计算步骤

1.读取内容:调用 Spark 文件读取 API,加载 wikiOfSpark.txt 文件内容;

2.分词:以行为单位,把句子打散为单词;

3.分组计数:按照单词做分组计数;

明确了计算步骤后,接下来我们就可以调用 Spark 开发 API,对这些步骤进行代码实现,从而完成 Word Count 的应用开发。

众所周知,Spark 支持种类丰富的开发语言,如 Scala、Java、Python,等等。你可以结合个人偏好和开发习惯,任意选择其中的一种进行开发。尽管不同语言的开发 API 在语法上有着细微的差异,但不论是功能方面、还是性能方面,Spark 对于每一种语言的支持都是一致的。换句话说,同样是 Word Count,你用 Scala 实现也行,用 Python 实现也可以,两份代码的执行结果是一致的。不仅如此,在同样的计算资源下,两份代码的执行效率也是一样的。

因此,就 Word Count 这个示例来说,开发语言不是重点,我们不妨选择 Scala。你可能会说:“我本来对 Spark 就不熟,更没有接触过 Scala,一上来就用 Scala 演示 Spark 应用代码,理解起来会不会很困难?”

其实大可不必担心,Scala 语法比较简洁,Word Count 的 Scala 实现不超过 10 行代码。再者,对于 Word Count 中的每一行 Scala 代码,我会带着你手把手、逐行地进行讲解和分析。我相信,跟着我过完一遍代码之后,你能很快地把它“翻译”成你熟悉的语言,比如 Java 或 Python。另外,绝大多数的 Spark 源码都是由 Scala 实现的,接触并了解一些 Scala 的基本语法,有利于你后续阅读、学习 Spark 源代码。

WordCount代码实现

选定了语言,接下来,我们就按照读取内容、分词、分组计数这三步来看看 Word Count 具体怎么实现。

第一步:读取内容

启动Spark,启动Spark-shell。

首先,我们调用 SparkContext 的 textFile 方法,读取源文件,也就是 The_Man_of_Property.txt,代码如下表所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
scala> import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD

// 这里的代表数据文件的根目录
scala> val rootPath: String = "file:///usr/local/src/practice_code/mr_practice/wordcount"
rootPath: String = /usr/local/src/practice_code/mr_practice/wordcount

scala> val file: String = s"${rootPath}/The_Man_of_Property.txt"
file: String = file:///usr/local/src/practice_code/mr_practice/wordcount/The_Man_of_Property.txt

// 读取文件内容
scala> val lineRDD: RDD[String] = spark.sparkContext.textFile(file)
lineRDD: org.apache.spark.rdd.RDD[String] = file:///usr/local/src/practice_code/mr_practice/wordcount/The_Man_of_Property.txt MapPartitionsRDD[3] at textFile at <console>:26
// 查看第一个元素数据
scala> lineRDD.take(1)
res8: Array[String] = Array(Preface)

在这段代码中,你可能会发现 3 个新概念,分别是 spark、sparkContext 和 RDD。

其中,spark 和 sparkContext 分别是两种不同的开发入口实例:

  • spark 是开发入口 SparkSession 实例(Instance),SparkSession 在 spark-shell 中会由系统自动创建;
  • sparkContext 是开发入口 SparkContext 实例。

在 Spark 版本演进的过程中,从 2.0 版本开始,SparkSession 取代了 SparkContext,成为统一的开发入口。换句话说,要开发 Spark 应用,你必须先创建 SparkSession。关于 SparkSession 和 SparkContext,这里你只要记住它们是必需的开发入口就可以了。

我们再来看看 RDD,RDD 的全称是 Resilient Distributed Dataset,意思是“弹性分布式数据集”。RDD 是 Spark 对于分布式数据的统一抽象,它定义了一系列分布式数据的基本属性与处理方法。关于 RDD 的定义、内涵与作用,我们留到后面的文章再去展开。

在这里,你不妨先简单地把 RDD 理解成“数组”,比如代码中的 lineRDD 变量,它的类型是 RDD[String],你可以暂时把它当成元素类型是 String 的数组,数组的每个元素都是文件中的一行字符串。

获取到文件内容之后,下一步我们就要做分词了。

第二步:分词

“分词”就是把“数组”的行元素打散为单词。要实现这一点,我们可以调用 RDD 的 flatMap 方法来完成。flatMap 操作在逻辑上可以分成两个步骤:映射和展平。

这两个步骤是什么意思呢?我们还是结合 Word Count 的例子来看:

1
2
3
// line代指lineRDD里的元素本身
scala> val wordRDD: RDD[String] = lineRDD.flatMap(line => line.split(" "))
wordRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at flatMap at <console>:26

要把 lineRDD 的行元素转换为单词,我们得先用分隔符对每个行元素进行分割(Split),咱们这里的分隔符是空格。

分割之后,每个行元素就都变成了单词数组,元素类型也从 String 变成了 Array[String],像这样以元素为单位进行转换的操作,统一称作“映射”。

映射过后,RDD 类型由原来的 RDD[String]变为 RDD[Array[String]]。如果把 RDD[String]看成是“数组”的话,那么 RDD[Array[String]]就是一个“二维数组”,它的每一个元素都是单词。

为了后续对单词做分组,我们还需要对这个“二维数组”做展平,也就是去掉内层的嵌套结构,把“二维数组”还原成“一维数组”,如下图所示。

就这样,在 flatMap 算子的作用下,原来以行为元素的 lineRDD,转换成了以单词为元素的 wordRDD。

不过,值得注意的是,我们用“空格”去分割句子,有可能会产生空字符串。所以,在完成“映射”和“展平”之后,对于这样的“单词”,我们要把其中的空字符串都过滤掉,这里我们调用 RDD 的 filter 方法来过滤:

1
2
3
// 过滤掉空字符串
scala> val cleanWordRdd: RDD[String] = wordRDD.filter(word => !word.equals(" "))
cleanWordRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at filter at <console>:26

这样一来,我们在分词阶段就得到了过滤掉空字符串之后的单词“数组”,类型是 RDD[String]。接下来,我们就可以准备做分组计数了。

第三步:分组计数

在 RDD 的开发框架下,聚合类操作,如计数、求和、求均值,需要依赖键值对(Key Value Pair)类型的数据元素,也就是(Key,Value)形式的“数组”元素。

因此,在调用聚合算子做分组计数之前,我们要先把 RDD 元素转换为(Key,Value)的形式,也就是把 RDD[String]映射成 RDD[(String, Int)]。

其中,我们统一把所有的 Value 置为 1。这样一来,对于同一个的单词,在后续的计数运算中,我们只要对 Value 做累加即可,就像这样:

下面是对应的代码:

1
2
3
4
5
scala> val kvRDD1: RDD[(String, Int)] = cleanWordRdd.map(word => (word, 1))
kvRDD1: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[6] at map at <console>:26

scala> kvRDD1.take(1)
res13: Array[(String, Int)] = Array((Preface,1))

这样一来,RDD 就由原来存储 String 元素的 cleanWordRDD,转换为了存储(String,Int)的 kvRDD。

完成了形式的转换之后,我们就该正式做分组计数了。分组计数其实是两个步骤,也就是先“分组”,再“计数”。下面,我们使用聚合算子 reduceByKey 来同时完成分组和计数这两个操作。

对于 kvRDD 这个键值对“数组”,reduceByKey 先是按照 Key(也就是单词)来做分组,分组之后,每个单词都有一个与之对应的 Value 列表。然后根据用户提供的聚合函数,对同一个 Key 的所有 Value 做 reduce 运算。

这里的 reduce,你可以理解成是一种计算步骤或是一种计算方法。当我们给定聚合函数后,它会用折叠的方式,把包含多个元素的列表转换为单个元素值,从而统计出不同元素的数量。

在 Word Count 的示例中,我们调用 reduceByKey 实现分组计算的代码如下:

1
2
scala> val wordCounts: RDD[(String, Int)] = kvRDD1.reduceByKey((x, y) => x + y)
wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[7] at reduceByKey at <console>:26

可以看到,我们传递给 reduceByKey 算子的聚合函数是 (x, y) => x + y,也就是累加函数。因此,在每个单词分组之后,reduce 会使用累加函数,依次折叠计算 Value 列表中的所有元素,最终把元素列表转换为单词的频次。对于任意一个单词来说,reduce 的计算过程都是一样的,如下图所示。

reduceByKey 完成计算之后,我们得到的依然是类型为 RDD[(String, Int)]的 RDD。不过,与 kvRDD 不同,wordCounts 元素的 Value 值,记录的是每个单词的统计词频。到此为止,我们就完成了 Word Count 主逻辑的开发与实现。

在程序的最后,我们还要把 wordCounts 按照词频做排序,并把词频最高的 5 个单词打印到屏幕上,代码如下所示。

1
2
scala> wordCounts.map{case (k, v) => (v, k)}.sortByKey(false).take(5)
res15: Array[(Int, String)] = Array((5144,the), (3407,of), (2782,to), (2573,and), (2543,a))

代码执行

应用开发完成之后,我们就可以把代码丢进已经准备好的本地 Spark 部署环境里啦。首先,我们打开命令行终端(Terminal),敲入“spark-shell”,打开交互式运行环境,如下图所示。

然后,把我们开发好的代码,依次敲入 spark-shell。为了方便你操作,我把完整的代码实现整理到下面了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import org.apache.spark.rdd.RDD
// 定义文件根路径,这里是定义本地文件路径
val rootPath: String = "file:///usr/local/src/practice_code/mr_practice/wordcount"
val file: String = s"${rootPath}/The_Man_of_Property.txt"
// 读取本地文件内容,不加"file://"默认读取hdfs上内容
val lineRDD: RDD[String] = spark.sparkContext.textFile(file)
// _.split(" ") == (x => x.split(" "))
// 先map,将单词split开,后flattern展平,展开成一维数组
val wordRDD: RDD[String] = lineRDD.flatMap(_.split(" "))
// 编程kv键值对
val kvRDD: RDD[(String, Int)] = wordRDD.map(x =>(x, 1))
// 按照单词分组计算
val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey((x, y) => (x+y))
// 打印词频最高的五个词
wordCounts.map{case (k, v) => (v, k)}.sortByKey(false).take(5)

>>res0: Array[(Int, String)] = Array((5144,the), (3407,of), (2782,to), (2573,and), (2543,a))

scala api的写法:

1
2
3
4
5
6
7
8
9
10
11
12
import scala.io.Source

val lines = Source.fromFile("/usr/local/src/practice_code/mr_practice/wordcount/The_Man_of_Property.txt")
lines: scala.io.BufferedSource = non-empty iterator // iterator it(迭代器):不是一个集合,可以用于访问集合的方法

val lines = Source.fromFile("/usr/local/src/practice_code/mr_practice/wordcount/The_Man_of_Property.txt").getLines().toList // 将迭代器中的元素放入列表进行返回

scala> val lines = Source.fromFile("/usr/local/src/practice_code/mr_practice/wordcount/The_Man_of_Property.txt").getLines().toList
lines: List[String] = List(Preface, “The Forsyte Saga” was the title originally destined for that part of it which is called “The Man of Property”; and to adopt it for the collected chronicles of the Forsyte family has indulged the Forsytean tenacity that is in all of us. The word Saga might be objected to on the ground that it connotes the heroic and that there is little heroism in these pages. But it is used with a suitable irony; and, after all, this long tale, though it may deal with folk in frock coats, furbelows, and a gilt-edged period, is not devoid of the essential heat of conflict. Discounting for the gigantic stature and blood-thirstiness of old days, as they have come down to us in fairy-tale and legend, the folk of the old Sagas were Forsytes, assuredly, in their possessive...

scala> lines.map(_.split(" ")).flatten.map((_, 1)).groupBy(_._1).map(x => (x._1, x._2.size)).toList.sortBy(_._2).reverse.slice(0, 10)
res20: List[(String, Int)] = List((the,5144), (of,3407), (to,2782), (and,2573), (a,2543), (he,2139), (his,1912), (was,1702), (in,1694), (had,1526))