文章目录

Spark快速入门1. 创建Maven项目2. 增加 Scala 插件3. WordCount第一种写法:第二种写法:

4. 日志处理5. 可能的异常

Spark快速入门

在大数据早期的课程中我们已经学习了 MapReduce 框架的原理及基本使用,并了解了其底层数据处理的实现方式。接下来,就让咱们走进 Spark 的世界,了解一下它是如何带领我们完成数据处理的。

1. 创建Maven项目

创建spark-study的Maven项目。 引入以下依赖:

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

com.atguigu

spark-study

pom

1.0-SNAPSHOT

spark-core

org.apache.spark

spark-core_2.12

3.0.0

net.alchim31.maven

scala-maven-plugin

3.2.2

testCompile

org.apache.maven.plugins

maven-assembly-plugin

3.1.0

jar-with-dependencies

make-assembly

package

single

我们让spark-study作为我们的父工程,删除spark-study的src文件夹 创建spark-core模块 在src/main下创建scala文件夹,并标记为Resources文件夹 项目中引入Java和Scala的开发工具包,整个项目结构如下图:

下面开始通过Spark写一个WordCount案例

2. 增加 Scala 插件

Spark 由 Scala 语言开发的,所以本课件接下来的开发所使用的语言也为 Scala,咱们当前使用的 Spark 版本为 3.0.0,默认采用的 Scala 编译版本为 2.12,所以后续开发时。我们依然采用这个版本。开发前请保证 IDEA 开发工具中含有 Scala 开发插件

3. WordCount

为了能直观地感受 Spark 框架的效果,接下来我们实现一个大数据学科中最常见的教学案例WordCount

第一种写法:

package com.atguigu.sparkstudy.wc

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

/**

* @Date 2021/4/8 19:09

* @Version 10.21

* @Author DuanChaojie

*/

object WordCount01 {

/**

* Spark WordCount第一种写法

* @param args

*/

def main(args: Array[String]): Unit = {

// 1、创建Spark运行配置对象

val sparkConf = new SparkConf().setMaster("local").setAppName("WordCount")

// 2、创建Spark上下文环境对象(连接对象)

val sc = new SparkContext(sparkConf)

// 3、读取文件数据,RDD[String]是什么?

// 获取一行一行的数据

val fileRDD: RDD[String] = sc.textFile("data/word.txt")

// 4、将文件中的数据进行分词

// 扁平化:将整体拆分成个体的操作(形成一个一个的单词)

val words: RDD[String] = fileRDD.flatMap(_.split(" "))

// 5、将数据根据单词进行分组,便于统计,如:(Hello,(Hello,Hello))

val wordGroup: RDD[(String, Iterable[String])] = words.groupBy(word => word)

// 6、对分组后的数据进行转换

val wordToCount = wordGroup.map {

case (word, list) => {

(word, list.size)

}

}

// 7、打印结果

wordToCount.foreach(println)

// 8、关闭spark连接

sc.stop()

}

}

具体过程:

第二种写法:

package com.atguigu.sparkstudy.wc

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

/**

* @Date 2021/4/8 19:09

* @Version 10.21

* @Author DuanChaojie

*/

object WordCount02 {

def main(args: Array[String]): Unit = {

// 1、创建Spark运行配置对象

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")

// 2、创建Spark上下文环境对象(连接对象)

val sc = new SparkContext(sparkConf)

// 3、读取文件数据,RDD[String]是什么?

val fileRDD: RDD[String] = sc.textFile("data/word.txt")

// 4、将文件中的数据进行分词

val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" "))

// 5、将文件中的数据进行分词

val wordTupleRDD: RDD[(String, Int)] = wordRDD.map((_, 1))

// 6、将转换结构后的数据按照相同的单词进行分组聚合

// 相同key的value进行聚合操作

// (word, 1) => (word, sum)

val worldCountRDD: RDD[(String, Int)] = wordTupleRDD.reduceByKey(_ + _)

// 7、将数据聚合的结果采集到内存中

val worldCount: Array[(String, Int)] = worldCountRDD.collect()

// 8、打印结果

worldCount.foreach(println)

// 9、关闭spark连接

sc.stop()

}

}

具体过程:

4. 日志处理

执行过程中,会产生大量的执行日志,如果为了能够更好的查看程序的执行结果,可以在项目的 resources 目录中创建log4j.properties 文件,并添加日志配置信息:

log4j.rootCategory=ERROR, console

log4j.appender.console=org.apache.log4j.ConsoleAppender

log4j.appender.console.target=System.err

log4j.appender.console.layout=org.apache.log4j.PatternLayout

log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Set the default spark-shell log level to ERROR. When running the spark-shell, the

# log level for this class is used to overwrite the root logger's log level, so that

# the user can have different defaults for the shell and regular Spark apps.

log4j.logger.org.apache.spark.repl.Main=ERROR

# Settings to quiet third party logs that are too verbose

log4j.logger.org.spark_project.jetty=ERROR

log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR

log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=ERROR

log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=ERROR

log4j.logger.org.apache.parquet=ERROR

log4j.logger.parquet=ERROR

# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support

log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL

log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR

5. 可能的异常

如果本机操作系统是 Windows,在程序中使用了 Hadoop 相关的东西,比如写入文件到HDFS,则会遇到如下异常:

出现这个问题的原因,并不是程序的错误,而是windows 系统用到了 hadoop 相关的服务,解决办法是通过配置关联到 windows 的系统依赖就可以了。

在 IDEA 中配置Run Configuration,添加HADOOP_HOME 变量

推荐阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: