博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spark运行wordcount程序
阅读量:6577 次
发布时间:2019-06-24

本文共 2663 字,大约阅读时间需要 8 分钟。

首先提一下spark rdd的五大核心特性:

1、rdd由一系列的分片组成,比如说128m一片,类似于hadoop中的split

2、每一个分区都有一个函数去迭代/运行/计算
3、一系列的依赖,比如:rdda转换为rddb,rddb转换为rddc,那么rddc依赖于rddb,rddb依赖于rdda。
lineage:保存了一些列的转换
4、对于每个k-v的rdd可以指定一个partition,告诉它如何分区,常用分区规则有hash和range
5、处理rdd split的数据在哪里,尽量在哪里做计算(移动计算而非移动数据),这里选择最优位置,为什么存在选择,因为hdfs默认存储3个副本,每个副本都是一个选择。

 

RDD的两种创建方式:

parallelist

外部数据源

 

RDD的两种操作方式:

transformation:从一个RDD转化为另一个RDD

action:输出结果集

 

RDD依赖关系:

窄依赖(narrow dependencies):n——>1

子RDD的每个分区依赖于常数个父分区(即与数据规模无关)
输入输出一对一的算子,且结果RDD的分区结构不变,主要是map,flatmap
输入输出一对一,但结果集RDD的分区结构发生了变化,如union、coalesce
从输入中选择部分元素的算子,如filter、distinct、subtract、sample
宽依赖(wide dependencies):1——>n
子RDD的每个分区依赖于所有父RDD分区
对单个RDD基于key进行重组和reduce,如groupByKey、reduceByKey
对两个RDD基于key进行join和重组,如join

 

spark的shuffer过程类似于mapreduce shuffer过程。

 

创建spark应用模板:

1)创建SparkConf

2)创建SparkContact

3)加工逻辑

4)stop()关闭资源

---------------------------------------------------------------------------------------------------------------------

接下来通过wordcont程序熟悉一下:

检查需要分析的文本文件:

# bin/hdfs dfs -ls /user/hadoop/wordcount/input/

Found 1 items
-rw-r--r--   3 root supergroup         63 2017-05-22 14:48 /user/hadoop/wordcount/input/wc.input
[root@db02 hadoop-2.5.0]# bin/hdfs dfs -text /user/hadoop/wordcount/input/wc.input
hadoop hdfs mapreduce
zookeeper
spark hive hbase
spark hadoop

编辑scala程序实现wordcount功能:
1)读取文本文件
scala> val linesRdd = sc.textFile("hdfs://db02:8020/user/hadoop/wordcount/input/wc.input")
2)按空格切分文件
#scala> val wordRdd = linesRdd.map(line => line.split(" "))
scala> val wordRdd = linesRdd.flatMap(line => line.split(" "))
3)map函数统计单词
scala> val keyvalRdd = wordRdd.map(word => (word,1))
4)统计单词
scala> val countRdd = keyvalRdd.reduceByKey((a,b) => (a+b))
5)输出结果集
scala> countRdd.collect

 

 

6)将以上程序整合成一行scala程序,结果如下:

sc.textFile("hdfs://db02:8020/user/hadoop/wordcount/input/wc.input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _).collect

 

7)spark默认输出结果是没有排序的,如果想要wordcount输出结果按照key排序可以使用sortByKey()函数:

升序:sc.textFile("hdfs://db02:8020/user/hadoop/wordcount/input/wc.input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _).sortByKey(true).collect

降序:sc.textFile("hdfs://db02:8020/user/hadoop/wordcount/input/wc.input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _).sortByKey(false).collect

8)如果想要输出结果按照value排序可以使用sortByKey的如下技巧:

sc.textFile("hdfs://db02:8020/user/hadoop/wordcount/input/wc.input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _).map(x => (x._2,x._1)).sortByKey(false).map(x => (x._2,x._1)).collect

9)wordcount结果按值降序排序,可以使用take(n)函数输出前n个结果:

sc.textFile("hdfs://db02:8020/user/hadoop/wordcount/input/wc.input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _).map(x => (x._2,x._1)).sortByKey(false).map(x => (x._2,x._1)).take(3)

转载地址:http://whyno.baihongyu.com/

你可能感兴趣的文章
Java多线程进阶(十二)—— J.U.C之atomic框架:Unsafe类
查看>>
MacOS安装MySQL 报错
查看>>
Java知识点总结(反射-反射操作泛型)
查看>>
Vue+webpack+Element 兼容问题总结
查看>>
【跃迁之路】【534天】刻意练习系列293(2018.07.24)
查看>>
初中级PHP面试基础汇总
查看>>
python爬取B站千万级数据,发现了这些热门UP主的秘密!
查看>>
《软技能》读书笔记(下)
查看>>
textarea文域高度自适应
查看>>
go语言renderer包代码分析
查看>>
【Scala谜题】成员声明的位置
查看>>
git最最最最...常用命令
查看>>
复杂recyclerView封装库
查看>>
JDK 10 的新特性和增强功能
查看>>
llvm样例parser解析
查看>>
Android每周一轮子:android-pluginmgr(插件化)
查看>>
【mongo】mongoose
查看>>
程序员快速记忆英文单词的专属诀窍
查看>>
使用Redis构建文章投票网站(Java)
查看>>
【355天】我爱刷题系列114(2018.01.26)
查看>>