扫码关注公众号,回复 spark 关键字下载geekbang 原价 90 元 零基础入门 Spark 学习资料

准备 maven 依赖

org.apache.spark

spark-core_2.12

3.5.1

org.apache.spark

spark-sql_2.12

3.5.1

先上代码 

WordCount(单词计数)

要先对文件中的单词做统计计数,然后再打印出频次最高的 5 个单词,江湖人称“Word Count”wikiOfSpark.txt 文件下载地址:这里

scala 实现

import org.apache.spark.rdd.RDD

// 这里的下划线"_"是占位符,代表数据文件的根目录

val rootPath: String = "/Users/mustafa/Documents/databases/bigdata/spark/spark_core"

val file: String = s"${rootPath}/wikiOfSpark.txt"

// 读取文件内容

val lineRDD: RDD[String] = spark.sparkContext.textFile(file)

// 以行为单位做分词

val wordRDD: RDD[String] = lineRDD.flatMap(line => line.split(" "))

val cleanWordRDD: RDD[String] = wordRDD.filter(word => !word.equals(""))

// 把RDD元素转换为(Key,Value)的形式

val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(word => (word, 1))

// 按照单词做分组计数

val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey((x, y) => x + y)

// 打印词频最高的5个词汇

wordCounts.map{case (k, v) => (v, k)}.sortByKey(false).take(5)

java实现

package com.mustafa.mynetty;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.FlatMapFunction;

import org.apache.spark.api.java.function.Function;

import org.apache.spark.api.java.function.Function2;

import org.apache.spark.api.java.function.PairFunction;

import org.apache.spark.broadcast.Broadcast;

import org.jetbrains.annotations.NotNull;

import scala.Tuple2;

import java.util.Arrays;

import java.util.List;

public class WordCount

{

public static void main( String[] args )

{

JavaSparkContext context = getJavaSparkContext();

String file = "hdfs://node1:8020/bigdata/spark/wordCount/datas/wikiOfSpark.txt";

JavaRDD lineRDD = context.textFile(file, 3);

JavaRDD wordRDD = lineRDD.flatMap((FlatMapFunction) line -> Arrays.asList(line.split(" ")).iterator());

List wordCountWords = Arrays.asList("Apache", "Spark");

Broadcast> bc = context.broadcast(wordCountWords);

JavaRDD cleanWordRDD = wordRDD.filter((Function) word -> bc.value().contains(word));

JavaPairRDD kvRDD = cleanWordRDD.mapToPair((PairFunction) word -> new Tuple2<>(word,1));

JavaPairRDD wordCounts = kvRDD.reduceByKey((Function2) (x, y) -> x + y);

JavaPairRDD wordCountsRevert = wordCounts.mapToPair((PairFunction, Integer, String>) kv -> new Tuple2<>(kv._2(), kv._1()));

JavaPairRDD wordCountsSorted =wordCountsRevert.sortByKey(false);

String targetPath = "hdfs://node1:8020/bigdata/spark/wordCount/result";

wordCountsSorted.saveAsTextFile(targetPath);

// List> result = wordCountsSorted.collect();

// result.forEach(row -> System.out.println(row._2() + ":" + row._1()));

}

@NotNull

private static JavaSparkContext getJavaSparkContext() {

SparkConf conf = new SparkConf();

conf.setMaster("spark://node0:7077");

conf.setAppName("WordCount");

conf.set("spark.executor.instances", "2");

conf.set("spark.cores.max", "6");

conf.set("spark.executor.cores", "3");

conf.set("spark.executor.memory", "2g");

conf.set("spark.memory.fraction", "0.9");

conf.set("spark.memory.storageFraction", "0.1");

JavaSparkContext context = new JavaSparkContext(conf);

return context;

}

}

小汽车摇号分析

 为了限制机动车保有量,从 2011 年开始,北京市政府推出了小汽车摇号政策。随着摇号进程的推进,在 2016 年,为了照顾那些长时间没有摇中号码牌的“准司机”,摇号政策又推出了“倍率”制度。

所谓倍率制度,它指的是,结合参与摇号次数,为每个人赋予不同的倍率系数。有了倍率加持,大家的中签率就由原来整齐划一的基础概率,变为“基础概率 * 倍率系数”。参与摇号的次数越多,倍率系数越大,中签率也会相应得到提高。

不过,身边无数的“准司机”总是跟我说,其实倍率这玩意没什么用,背了 8 倍、10 倍的倍率,照样摇不上!那么今天这一讲,咱们就来借着学习 Spark SQL 的机会,用数据来为这些还没摸过车的“老司机”答疑解惑,帮他们定量地分析一下,倍率与中签率之间,到底有没有关系?

2011 年到 2019 年北京市小汽车的摇号数据,你可以通过这个地址,从网盘进行下载,提取码为 ajs6

scala 实现

//spark-shell --conf spark.executor.memory=4g --conf spark.driver.memory=4g

import org.apache.spark.sql.DataFrame

val rootPath: String = "/Users/mustafa/Documents/databases/bigdata/spark/spark_sql"

// 申请者数据

val hdfs_path_apply: String = s"${rootPath}/apply"

// spark是spark-shell中默认的SparkSession实例

// 通过read API读取源文件

val applyNumbersDF: DataFrame = spark.read.parquet(hdfs_path_apply)

// 数据打印

applyNumbersDF.show

// 中签者数据

val hdfs_path_lucky: String = s"${rootPath}/lucky"

// 通过read API读取源文件

val luckyDogsDF: DataFrame = spark.read.parquet(hdfs_path_lucky)

// 数据打印

luckyDogsDF.show

// 过滤2016年以后的中签数据,且仅抽取中签号码carNum字段

val filteredLuckyDogs: DataFrame = luckyDogsDF.filter(col("batchNum") >= "201601").select("carNum")

// 摇号数据与中签数据做内关联,Join Key为中签号码carNum

val jointDF: DataFrame = applyNumbersDF.join(filteredLuckyDogs, Seq("carNum"), "inner")

// 以batchNum、carNum做分组,统计倍率系数

val multipliers: DataFrame = jointDF.groupBy(col("batchNum"),col("carNum")).agg(count(lit(1)).alias("multiplier"))

// 以carNum做分组,保留最大的倍率系数

val uniqueMultipliers: DataFrame = multipliers.groupBy("carNum").agg(max("multiplier").alias("multiplier"))

// 以multiplier倍率做分组,统计人数

val result: DataFrame = uniqueMultipliers.groupBy("multiplier").agg(count(lit(1)).alias("cnt")).orderBy("multiplier")

result.collect

Java 实现

package com.mustafa.mynetty;

import org.apache.spark.sql.*;

import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;

import scala.Enumeration;

import java.util.List;

public class CarsBet {

public static void main(String[] args) {

SparkSession session = SparkSession

.builder()

.appName("CarsBet")

.master("spark://node0:7077")

.config("spark.executor.instances", "2")

.config("spark.cores.max", "8")

.config("spark.executor.cores", "3")

// .config("spark.executor.cores", "8")

.config("spark.executor.memory", "2g")

// .config("spark.executor.memory", "4g")

.config("spark.memory.fraction", "0.9")

.config("spark.memory.storageFraction", "0.2")

.getOrCreate();

String rootPath = "hdfs://node1:8020/bigdata/spark/carsBet/datas";

String hdfs_path_apply = rootPath + "/apply";

Dataset applyNumbersDF = session.read().parquet(hdfs_path_apply);

String hdfs_path_lucky = rootPath + "/lucky";

Dataset luckyDogsDF = session.read().parquet(hdfs_path_lucky);

Dataset filteredLuckyDogs = luckyDogsDF.filter("batchNum >= 201601").select("carNum");

Dataset filteredLuckyDogsCache = functions.broadcast(filteredLuckyDogs);

Dataset jointDF = applyNumbersDF.join(filteredLuckyDogsCache, applyNumbersDF.col("carNum").equalTo(filteredLuckyDogsCache.col("carNum")), "inner").drop(filteredLuckyDogsCache.col("carNum"));

Dataset jointDFCache = functions.broadcast(jointDF);

Dataset multipliers = jointDFCache.groupBy("carNum", "batchNum").agg(functions.count(functions.lit(1)).alias("multiplier"));

Dataset multipliersCache = functions.broadcast(multipliers);

Dataset uniqueMultipliers = multipliersCache.groupBy("carNum").agg(functions.max("multiplier").alias("multiplier"));

Dataset uniqueMultipliersCache = functions.broadcast(uniqueMultipliers);

Dataset result = uniqueMultipliersCache.groupBy("multiplier").agg(functions.count(functions.lit(1)).alias("cnt")).orderBy("multiplier");

result.write().format("csv").option("header", true).mode("overwrite").save("hdfs://node1:8020/bigdata/spark/carsBet/result");

// result.cache();

// result.count();

//

// result.show();

//

// List list = result.collectAsList();

// list.forEach(row -> System.out.println(row.getLong(0) + ":" + row.getLong(1)));

}

}

流动的 wordCount

在之前的 Word Count 中,数据以文件(wikiOfSpark.txt)的形式,一次性地“喂给”Spark,从而触发一次 Job 计算。而在“流动的 Word Count”里,数据以行为粒度,分批地“喂给”Spark,每一行数据,都会触发一次 Job 计算。

具体来说,使用 netcat 工具,向本地 9999 端口的 Socket 地址发送数据行。而 Spark 流处理应用,则时刻监听着本机的 9999 端口,一旦接收到数据条目,就会立即触发计算逻辑的执行。这里的计算逻辑,就是 Word Count。计算执行完毕之后,流处理应用再把结果打印到终端(Console)上。

打开一个终端,在 9999 端口喂数据。linux nc -lk 9999 ,mac nc -l -p 9999

scala 实现

import org.apache.spark.sql.DataFrame

// 设置需要监听的本机地址与端口号

val host: String = "127.0.0.1"

val port: String = "9999"

// 从监听地址创建DataFrame

var df: DataFrame = spark.readStream

.format("socket")

.option("host", host)

.option("port", port)

.load()

// 首先把接收到的字符串,以空格为分隔符做拆分,得到单词数组words

df = df.withColumn("words", split($"value", " "))

// 把数组words展平为单词word

.withColumn("word", explode($"words"))

// 以单词word为Key做分组

.groupBy("word")

// 分组计数

.count()

df.writeStream

// 指定Sink为终端(Console)

.format("console")

// 指定输出选项

.option("truncate", false)

// 指定输出模式

.outputMode("complete")

//.outputMode("update")

// 启动流处理应用

.start()

// 等待中断指令

.awaitTermination()

java实现

package com.mustafa.mynetty;

import org.apache.spark.sql.*;

import org.apache.spark.sql.streaming.StreamingQueryException;

import java.util.concurrent.TimeoutException;

public class StreamingWordCount {

public static void main(String[] args) {

SparkSession session = SparkSession

.builder()

.appName("CarsBet")

.master("spark://node0:7077")

.config("spark.executor.instances", "2")

.config("spark.cores.max", "6")

.config("spark.executor.cores", "3")

.config("spark.executor.memory", "2g")

.config("spark.memory.fraction", "0.9")

.config("spark.memory.storageFraction", "0.1")

.getOrCreate();

String host = "127.0.0.1";

String port = "9999";

Dataset df = session.readStream().format("socket").option("host", host).option("port", port).load();

df = df.withColumn("words", functions.split(df.col("value"), " "));

df = df.withColumn("word", functions.explode(df.col("words")));

df = df.groupBy(df.col("word")).count();

try {

df.writeStream().format("console")

.option("truncate", "false")

.outputMode("complete")

.start()

.awaitTermination();

} catch (StreamingQueryException e) {

throw new RuntimeException(e);

} catch (TimeoutException e) {

throw new RuntimeException(e);

}

}

}

集群运行

spark-submit \

--master spark://node0:7077 \

--deploy-mode cluster \

--class com.mustafa.mynetty.CarsBet \

hdfs://node1:8020/software/spark-demo-1.0-SNAPSHOT.jar

附录

文件上传到 hdfs 文件系统(小汽车摇号分析)

package com.mustafa.mynetty;

import org.apache.commons.io.IOUtils;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FSDataInputStream;

import org.apache.hadoop.fs.FSDataOutputStream;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import java.io.File;

import java.io.FileInputStream;

import java.io.IOException;

import java.util.Arrays;

public class CarsBetUploader {

private static FileSystem getFileSystem() throws IOException {

Configuration conf = new Configuration();

FileSystem fileSystem = FileSystem.get(conf);

return fileSystem;

}

private static void uploadHdfsFile(File localFile, String targetPath) throws IOException {

FileSystem fileSystem = getFileSystem();

fileSystem.delete(new Path(targetPath), true);

try (FileInputStream inStream = new FileInputStream(localFile); FSDataOutputStream outStream = fileSystem.create(new Path(targetPath))) {

IOUtils.copy(inStream, outStream, 4096);

}

}

private static void readHdfsFile(String hdfsFilePath) throws IOException {

//预处理,删除文件加

FileSystem fileSystem = getFileSystem();

Path path = new Path(hdfsFilePath);

//用文件系统操作路径

try (FSDataInputStream inStream = fileSystem.open(path)) {

IOUtils.copy(inStream, System.out, 4096);

}

}

public static void main(String[] args) throws Exception {

// String hdfsFilePath = "hdfs://node1:8020/bigdata/spark/wordCount/datas/wikiOfSpark.txt";

// readHdfsFile(hdfsFilePath);

String sourceRootPath = "/Users/mustafa/Documents/databases/bigdata/spark/spark_sql";

String targetRootPath = "hdfs://node1:8020/bigdata/spark/carsBet/datas";

String[] subFolders = new String[]{"apply", "lucky"};

for (String subFolder : subFolders) {

String sourcePath = sourceRootPath + "/" + subFolder;

String targetPath = targetRootPath + "/" + subFolder;

File sourcePathFile = new File(sourcePath);

File[] sourceList = sourcePathFile.listFiles();

for (File source : sourceList) {

if (source.isDirectory() && source.getName().startsWith("batchNum=")) {

String target = targetPath + "/" + source.getName();

File[] sourcePartitionsList = source.listFiles();

Arrays.asList(sourcePartitionsList).parallelStream().forEach(sourcePartition -> {

if (sourcePartition.isFile() && sourcePartition.getName().startsWith("part-")) {

String targetPartition = target + "/" + sourcePartition.getName();

System.out.println("upload " + targetPartition);

try {

uploadHdfsFile(sourcePartition, targetPartition);

} catch (IOException e) {

e.printStackTrace();

}

}

});

}

}

}

}

}

hadoop hdfs 配置

core-site.xml

fs.defaultFS

hdfs://node1:8020

hadoop.tmp.dir

/opt/software/hadoop/temp

fs.trash.interval

86400

io.file.buffer.size

131072

hdfs-site.xml

dfs.permissions

false

dfs.namenode.http-address

myserver2:9870

dfs.namenode.secondary.http-address

myserver2:9868

dfs.replication

2

hadoop-env.sh

export LANG=en_US.UTF-8

export HADOOP_OS_TYPE=${HADOOP_OS_TYPE:-$(uname -s)}

export JAVA_HOME=/opt/software/jdk

export HADOOP_HOME=/opt/software/hadoop

export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

export HADOOP_LOG_DIR=$HADOOP_HOME/logs

export HDFS_NAMENODE_USER=root

export HDFS_DATANODE_USER=root

export HDFS_SECONDARYNAMENODE_USER=root

export YARN_RESOURCEMANAGER_USER=root

export YARN_NODEMANAGER_USER=root

spark 配置

spark-defaults.conf

spark.master spark://192.168.31.125:7077

spark.local.dir /opt/software/spark/temp

spark.sql.autoBroadcastJoinThreshold 1g

spark.sql.adaptive.enabled true

spark.ui.port 4040

spark.executor.cores 3

spark.executor.memory 2g

spark.memory.fraction 0.9

spark.memory.storageFraction 0.1

spark.eventLog.enabled true

spark.eventLog.dir hdfs://node1:8020/software/spark/history

spark.driver.memory 1g

spark-env.sh

#!/usr/bin/env bash

SPARK_MASTER_HOST=node0

HADOOP_CONF_DIR=/opt/software/hadoop/etc/hadoop

SPARK_EXECUTOR_CORES=3

SPARK_EXECUTOR_MEMORY=2g

JAVA_HOME=/opt/software/jdk

SPARK_HOME=/opt/software/spark

推荐阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: