Spark基础入门-从WordCount开始学Spark
从WordCount开始学Spark
梳理Word Count 的计算步骤
1.读取内容:调用 Spark 文件读取 API,加载 wikiOfSpark.txt 文件内容;
2.分词:以行为单位,把句子打散为单词;
3.分组计数:按照单词做分组计数;
明确了计算步骤后,接下来我们就可以调用 Spark 开发 API,对这些步骤进行代码实现,从而完成 Word Count 的应用开发。
众所周知,Spark 支持种类丰富的开发语言,如 Scala、Java、Python,等等。你可以结合个人偏好和开发习惯,任意选择其中的一种进行开发。尽管不同语言的开发 API 在语法上有着细微的差异,但不论是功能方面、还是性能方面,Spark 对于每一种语言的支持都是一致的。换句话说,同样是 Word Count,你用 Scala 实现也行,用 Python 实现也可以,两份代码的执行结果是一致的。不仅如此,在同样的计算资源下,两份代码的执行效率也是一样的。
因此,就 Word Count 这个示例来说,开发语言不是重点,我们不妨选择 Scala。你可能会说:“我本来对 Spark 就不熟,更没有接触过 Scala,一上来就用 Scala 演示 Spark 应用代码,理解起来会不会很困难?”
其实大可不必担心,Scala 语法比较简洁,Word Count 的 Scala 实现不超过 10 行代码。再者,对于 Word Count 中的每一行 Scala 代码,我会带着你手把手、逐行地进行讲解和分析。我相信,跟着我过完一遍代码之后,你能很快地把它“翻译”成你熟悉的语言,比如 Java 或 Python。另外,绝大多数的 Spark 源码都是由 Scala 实现的,接触并了解一些 Scala 的基本语法,有利于你后续阅读、学习 Spark 源代码。
WordCount代码实现
选定了语言,接下来,我们就按照读取内容、分词、分组计数这三步来看看 Word Count 具体怎么实现。
第一步:读取内容
启动Spark,启动Spark-shell。
首先,我们调用 SparkContext 的 textFile 方法,读取源文件,也就是 The_Man_of_Property.txt,代码如下表所示:
1 | scala> import org.apache.spark.rdd.RDD |
在这段代码中,你可能会发现 3 个新概念,分别是 spark、sparkContext 和 RDD。
其中,spark 和 sparkContext 分别是两种不同的开发入口实例:
- spark 是开发入口 SparkSession 实例(Instance),SparkSession 在 spark-shell 中会由系统自动创建;
- sparkContext 是开发入口 SparkContext 实例。
在 Spark 版本演进的过程中,从 2.0 版本开始,SparkSession 取代了 SparkContext,成为统一的开发入口。换句话说,要开发 Spark 应用,你必须先创建 SparkSession。关于 SparkSession 和 SparkContext,这里你只要记住它们是必需的开发入口就可以了。
我们再来看看 RDD,RDD 的全称是 Resilient Distributed Dataset,意思是“弹性分布式数据集”。RDD 是 Spark 对于分布式数据的统一抽象,它定义了一系列分布式数据的基本属性与处理方法。关于 RDD 的定义、内涵与作用,我们留到后面的文章再去展开。
在这里,你不妨先简单地把 RDD 理解成“数组”,比如代码中的 lineRDD 变量,它的类型是 RDD[String],你可以暂时把它当成元素类型是 String 的数组,数组的每个元素都是文件中的一行字符串。
获取到文件内容之后,下一步我们就要做分词了。
第二步:分词
“分词”就是把“数组”的行元素打散为单词。要实现这一点,我们可以调用 RDD 的 flatMap 方法来完成。flatMap 操作在逻辑上可以分成两个步骤:映射和展平。
这两个步骤是什么意思呢?我们还是结合 Word Count 的例子来看:
1 | // line代指lineRDD里的元素本身 |
要把 lineRDD 的行元素转换为单词,我们得先用分隔符对每个行元素进行分割(Split),咱们这里的分隔符是空格。
分割之后,每个行元素就都变成了单词数组,元素类型也从 String 变成了 Array[String],像这样以元素为单位进行转换的操作,统一称作“映射”。
映射过后,RDD 类型由原来的 RDD[String]变为 RDD[Array[String]]。如果把 RDD[String]看成是“数组”的话,那么 RDD[Array[String]]就是一个“二维数组”,它的每一个元素都是单词。
为了后续对单词做分组,我们还需要对这个“二维数组”做展平,也就是去掉内层的嵌套结构,把“二维数组”还原成“一维数组”,如下图所示。
就这样,在 flatMap 算子的作用下,原来以行为元素的 lineRDD,转换成了以单词为元素的 wordRDD。
不过,值得注意的是,我们用“空格”去分割句子,有可能会产生空字符串。所以,在完成“映射”和“展平”之后,对于这样的“单词”,我们要把其中的空字符串都过滤掉,这里我们调用 RDD 的 filter 方法来过滤:
1 | // 过滤掉空字符串 |
这样一来,我们在分词阶段就得到了过滤掉空字符串之后的单词“数组”,类型是 RDD[String]。接下来,我们就可以准备做分组计数了。
第三步:分组计数
在 RDD 的开发框架下,聚合类操作,如计数、求和、求均值,需要依赖键值对(Key Value Pair)类型的数据元素,也就是(Key,Value)形式的“数组”元素。
因此,在调用聚合算子做分组计数之前,我们要先把 RDD 元素转换为(Key,Value)的形式,也就是把 RDD[String]映射成 RDD[(String, Int)]。
其中,我们统一把所有的 Value 置为 1。这样一来,对于同一个的单词,在后续的计数运算中,我们只要对 Value 做累加即可,就像这样:
下面是对应的代码:
1 | scala> val kvRDD1: RDD[(String, Int)] = cleanWordRdd.map(word => (word, 1)) |
这样一来,RDD 就由原来存储 String 元素的 cleanWordRDD,转换为了存储(String,Int)的 kvRDD。
完成了形式的转换之后,我们就该正式做分组计数了。分组计数其实是两个步骤,也就是先“分组”,再“计数”。下面,我们使用聚合算子 reduceByKey 来同时完成分组和计数这两个操作。
对于 kvRDD 这个键值对“数组”,reduceByKey 先是按照 Key(也就是单词)来做分组,分组之后,每个单词都有一个与之对应的 Value 列表。然后根据用户提供的聚合函数,对同一个 Key 的所有 Value 做 reduce 运算。
这里的 reduce,你可以理解成是一种计算步骤或是一种计算方法。当我们给定聚合函数后,它会用折叠的方式,把包含多个元素的列表转换为单个元素值,从而统计出不同元素的数量。
在 Word Count 的示例中,我们调用 reduceByKey 实现分组计算的代码如下:
1 | scala> val wordCounts: RDD[(String, Int)] = kvRDD1.reduceByKey((x, y) => x + y) |
可以看到,我们传递给 reduceByKey 算子的聚合函数是 (x, y) => x + y,也就是累加函数。因此,在每个单词分组之后,reduce 会使用累加函数,依次折叠计算 Value 列表中的所有元素,最终把元素列表转换为单词的频次。对于任意一个单词来说,reduce 的计算过程都是一样的,如下图所示。
reduceByKey 完成计算之后,我们得到的依然是类型为 RDD[(String, Int)]的 RDD。不过,与 kvRDD 不同,wordCounts 元素的 Value 值,记录的是每个单词的统计词频。到此为止,我们就完成了 Word Count 主逻辑的开发与实现。
在程序的最后,我们还要把 wordCounts 按照词频做排序,并把词频最高的 5 个单词打印到屏幕上,代码如下所示。
1 | scala> wordCounts.map{case (k, v) => (v, k)}.sortByKey(false).take(5) |
代码执行
应用开发完成之后,我们就可以把代码丢进已经准备好的本地 Spark 部署环境里啦。首先,我们打开命令行终端(Terminal),敲入“spark-shell”,打开交互式运行环境,如下图所示。
然后,把我们开发好的代码,依次敲入 spark-shell。为了方便你操作,我把完整的代码实现整理到下面了:
1 | import org.apache.spark.rdd.RDD |
scala api的写法:
1 | import scala.io.Source |