文章目录
Spark快速入门1. 创建Maven项目2. 增加 Scala 插件3. WordCount第一种写法:第二种写法:
4. 日志处理5. 可能的异常
☆
Spark快速入门
在大数据早期的课程中我们已经学习了 MapReduce 框架的原理及基本使用,并了解了其底层数据处理的实现方式。接下来,就让咱们走进 Spark 的世界,了解一下它是如何带领我们完成数据处理的。
1. 创建Maven项目
创建spark-study的Maven项目。 引入以下依赖:
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
我们让spark-study作为我们的父工程,删除spark-study的src文件夹 创建spark-core模块 在src/main下创建scala文件夹,并标记为Resources文件夹 项目中引入Java和Scala的开发工具包,整个项目结构如下图:
下面开始通过Spark写一个WordCount案例
2. 增加 Scala 插件
Spark 由 Scala 语言开发的,所以本课件接下来的开发所使用的语言也为 Scala,咱们当前使用的 Spark 版本为 3.0.0,默认采用的 Scala 编译版本为 2.12,所以后续开发时。我们依然采用这个版本。开发前请保证 IDEA 开发工具中含有 Scala 开发插件
3. WordCount
为了能直观地感受 Spark 框架的效果,接下来我们实现一个大数据学科中最常见的教学案例WordCount
第一种写法:
package com.atguigu.sparkstudy.wc
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* @Date 2021/4/8 19:09
* @Version 10.21
* @Author DuanChaojie
*/
object WordCount01 {
/**
* Spark WordCount第一种写法
* @param args
*/
def main(args: Array[String]): Unit = {
// 1、创建Spark运行配置对象
val sparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
// 2、创建Spark上下文环境对象(连接对象)
val sc = new SparkContext(sparkConf)
// 3、读取文件数据,RDD[String]是什么?
// 获取一行一行的数据
val fileRDD: RDD[String] = sc.textFile("data/word.txt")
// 4、将文件中的数据进行分词
// 扁平化:将整体拆分成个体的操作(形成一个一个的单词)
val words: RDD[String] = fileRDD.flatMap(_.split(" "))
// 5、将数据根据单词进行分组,便于统计,如:(Hello,(Hello,Hello))
val wordGroup: RDD[(String, Iterable[String])] = words.groupBy(word => word)
// 6、对分组后的数据进行转换
val wordToCount = wordGroup.map {
case (word, list) => {
(word, list.size)
}
}
// 7、打印结果
wordToCount.foreach(println)
// 8、关闭spark连接
sc.stop()
}
}
具体过程:
第二种写法:
package com.atguigu.sparkstudy.wc
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* @Date 2021/4/8 19:09
* @Version 10.21
* @Author DuanChaojie
*/
object WordCount02 {
def main(args: Array[String]): Unit = {
// 1、创建Spark运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
// 2、创建Spark上下文环境对象(连接对象)
val sc = new SparkContext(sparkConf)
// 3、读取文件数据,RDD[String]是什么?
val fileRDD: RDD[String] = sc.textFile("data/word.txt")
// 4、将文件中的数据进行分词
val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" "))
// 5、将文件中的数据进行分词
val wordTupleRDD: RDD[(String, Int)] = wordRDD.map((_, 1))
// 6、将转换结构后的数据按照相同的单词进行分组聚合
// 相同key的value进行聚合操作
// (word, 1) => (word, sum)
val worldCountRDD: RDD[(String, Int)] = wordTupleRDD.reduceByKey(_ + _)
// 7、将数据聚合的结果采集到内存中
val worldCount: Array[(String, Int)] = worldCountRDD.collect()
// 8、打印结果
worldCount.foreach(println)
// 9、关闭spark连接
sc.stop()
}
}
具体过程:
4. 日志处理
执行过程中,会产生大量的执行日志,如果为了能够更好的查看程序的执行结果,可以在项目的 resources 目录中创建log4j.properties 文件,并添加日志配置信息:
log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
# Set the default spark-shell log level to ERROR. When running the spark-shell, the
# log level for this class is used to overwrite the root logger's log level, so that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=ERROR
# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=ERROR
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=ERROR
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=ERROR
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
5. 可能的异常
如果本机操作系统是 Windows,在程序中使用了 Hadoop 相关的东西,比如写入文件到HDFS,则会遇到如下异常:
出现这个问题的原因,并不是程序的错误,而是windows 系统用到了 hadoop 相关的服务,解决办法是通过配置关联到 windows 的系统依赖就可以了。
在 IDEA 中配置Run Configuration,添加HADOOP_HOME 变量
☆
推荐阅读
发表评论