扫码关注公众号,回复 spark 关键字下载geekbang 原价 90 元 零基础入门 Spark 学习资料
准备 maven 依赖
先上代码
WordCount(单词计数)
要先对文件中的单词做统计计数,然后再打印出频次最高的 5 个单词,江湖人称“Word Count”wikiOfSpark.txt 文件下载地址:这里
scala 实现
import org.apache.spark.rdd.RDD
// 这里的下划线"_"是占位符,代表数据文件的根目录
val rootPath: String = "/Users/mustafa/Documents/databases/bigdata/spark/spark_core"
val file: String = s"${rootPath}/wikiOfSpark.txt"
// 读取文件内容
val lineRDD: RDD[String] = spark.sparkContext.textFile(file)
// 以行为单位做分词
val wordRDD: RDD[String] = lineRDD.flatMap(line => line.split(" "))
val cleanWordRDD: RDD[String] = wordRDD.filter(word => !word.equals(""))
// 把RDD元素转换为(Key,Value)的形式
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(word => (word, 1))
// 按照单词做分组计数
val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey((x, y) => x + y)
// 打印词频最高的5个词汇
wordCounts.map{case (k, v) => (v, k)}.sortByKey(false).take(5)
java实现
package com.mustafa.mynetty;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.broadcast.Broadcast;
import org.jetbrains.annotations.NotNull;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
public class WordCount
{
public static void main( String[] args )
{
JavaSparkContext context = getJavaSparkContext();
String file = "hdfs://node1:8020/bigdata/spark/wordCount/datas/wikiOfSpark.txt";
JavaRDD
JavaRDD
List
Broadcast> bc = context.broadcast(wordCountWords);
JavaRDD
JavaPairRDD
JavaPairRDD
JavaPairRDD
JavaPairRDD
String targetPath = "hdfs://node1:8020/bigdata/spark/wordCount/result";
wordCountsSorted.saveAsTextFile(targetPath);
// List
// result.forEach(row -> System.out.println(row._2() + ":" + row._1()));
}
@NotNull
private static JavaSparkContext getJavaSparkContext() {
SparkConf conf = new SparkConf();
conf.setMaster("spark://node0:7077");
conf.setAppName("WordCount");
conf.set("spark.executor.instances", "2");
conf.set("spark.cores.max", "6");
conf.set("spark.executor.cores", "3");
conf.set("spark.executor.memory", "2g");
conf.set("spark.memory.fraction", "0.9");
conf.set("spark.memory.storageFraction", "0.1");
JavaSparkContext context = new JavaSparkContext(conf);
return context;
}
}
小汽车摇号分析
为了限制机动车保有量,从 2011 年开始,北京市政府推出了小汽车摇号政策。随着摇号进程的推进,在 2016 年,为了照顾那些长时间没有摇中号码牌的“准司机”,摇号政策又推出了“倍率”制度。
所谓倍率制度,它指的是,结合参与摇号次数,为每个人赋予不同的倍率系数。有了倍率加持,大家的中签率就由原来整齐划一的基础概率,变为“基础概率 * 倍率系数”。参与摇号的次数越多,倍率系数越大,中签率也会相应得到提高。
不过,身边无数的“准司机”总是跟我说,其实倍率这玩意没什么用,背了 8 倍、10 倍的倍率,照样摇不上!那么今天这一讲,咱们就来借着学习 Spark SQL 的机会,用数据来为这些还没摸过车的“老司机”答疑解惑,帮他们定量地分析一下,倍率与中签率之间,到底有没有关系?
2011 年到 2019 年北京市小汽车的摇号数据,你可以通过这个地址,从网盘进行下载,提取码为 ajs6
scala 实现
//spark-shell --conf spark.executor.memory=4g --conf spark.driver.memory=4g
import org.apache.spark.sql.DataFrame
val rootPath: String = "/Users/mustafa/Documents/databases/bigdata/spark/spark_sql"
// 申请者数据
val hdfs_path_apply: String = s"${rootPath}/apply"
// spark是spark-shell中默认的SparkSession实例
// 通过read API读取源文件
val applyNumbersDF: DataFrame = spark.read.parquet(hdfs_path_apply)
// 数据打印
applyNumbersDF.show
// 中签者数据
val hdfs_path_lucky: String = s"${rootPath}/lucky"
// 通过read API读取源文件
val luckyDogsDF: DataFrame = spark.read.parquet(hdfs_path_lucky)
// 数据打印
luckyDogsDF.show
// 过滤2016年以后的中签数据,且仅抽取中签号码carNum字段
val filteredLuckyDogs: DataFrame = luckyDogsDF.filter(col("batchNum") >= "201601").select("carNum")
// 摇号数据与中签数据做内关联,Join Key为中签号码carNum
val jointDF: DataFrame = applyNumbersDF.join(filteredLuckyDogs, Seq("carNum"), "inner")
// 以batchNum、carNum做分组,统计倍率系数
val multipliers: DataFrame = jointDF.groupBy(col("batchNum"),col("carNum")).agg(count(lit(1)).alias("multiplier"))
// 以carNum做分组,保留最大的倍率系数
val uniqueMultipliers: DataFrame = multipliers.groupBy("carNum").agg(max("multiplier").alias("multiplier"))
// 以multiplier倍率做分组,统计人数
val result: DataFrame = uniqueMultipliers.groupBy("multiplier").agg(count(lit(1)).alias("cnt")).orderBy("multiplier")
result.collect
Java 实现
package com.mustafa.mynetty;
import org.apache.spark.sql.*;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import scala.Enumeration;
import java.util.List;
public class CarsBet {
public static void main(String[] args) {
SparkSession session = SparkSession
.builder()
.appName("CarsBet")
.master("spark://node0:7077")
.config("spark.executor.instances", "2")
.config("spark.cores.max", "8")
.config("spark.executor.cores", "3")
// .config("spark.executor.cores", "8")
.config("spark.executor.memory", "2g")
// .config("spark.executor.memory", "4g")
.config("spark.memory.fraction", "0.9")
.config("spark.memory.storageFraction", "0.2")
.getOrCreate();
String rootPath = "hdfs://node1:8020/bigdata/spark/carsBet/datas";
String hdfs_path_apply = rootPath + "/apply";
Dataset
String hdfs_path_lucky = rootPath + "/lucky";
Dataset
Dataset
Dataset
Dataset
Dataset
Dataset
Dataset
Dataset
Dataset
Dataset
result.write().format("csv").option("header", true).mode("overwrite").save("hdfs://node1:8020/bigdata/spark/carsBet/result");
// result.cache();
// result.count();
//
// result.show();
//
// List
// list.forEach(row -> System.out.println(row.getLong(0) + ":" + row.getLong(1)));
}
}
流动的 wordCount
在之前的 Word Count 中,数据以文件(wikiOfSpark.txt)的形式,一次性地“喂给”Spark,从而触发一次 Job 计算。而在“流动的 Word Count”里,数据以行为粒度,分批地“喂给”Spark,每一行数据,都会触发一次 Job 计算。
具体来说,使用 netcat 工具,向本地 9999 端口的 Socket 地址发送数据行。而 Spark 流处理应用,则时刻监听着本机的 9999 端口,一旦接收到数据条目,就会立即触发计算逻辑的执行。这里的计算逻辑,就是 Word Count。计算执行完毕之后,流处理应用再把结果打印到终端(Console)上。
打开一个终端,在 9999 端口喂数据。linux nc -lk 9999 ,mac nc -l -p 9999
scala 实现
import org.apache.spark.sql.DataFrame
// 设置需要监听的本机地址与端口号
val host: String = "127.0.0.1"
val port: String = "9999"
// 从监听地址创建DataFrame
var df: DataFrame = spark.readStream
.format("socket")
.option("host", host)
.option("port", port)
.load()
// 首先把接收到的字符串,以空格为分隔符做拆分,得到单词数组words
df = df.withColumn("words", split($"value", " "))
// 把数组words展平为单词word
.withColumn("word", explode($"words"))
// 以单词word为Key做分组
.groupBy("word")
// 分组计数
.count()
df.writeStream
// 指定Sink为终端(Console)
.format("console")
// 指定输出选项
.option("truncate", false)
// 指定输出模式
.outputMode("complete")
//.outputMode("update")
// 启动流处理应用
.start()
// 等待中断指令
.awaitTermination()
java实现
package com.mustafa.mynetty;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQueryException;
import java.util.concurrent.TimeoutException;
public class StreamingWordCount {
public static void main(String[] args) {
SparkSession session = SparkSession
.builder()
.appName("CarsBet")
.master("spark://node0:7077")
.config("spark.executor.instances", "2")
.config("spark.cores.max", "6")
.config("spark.executor.cores", "3")
.config("spark.executor.memory", "2g")
.config("spark.memory.fraction", "0.9")
.config("spark.memory.storageFraction", "0.1")
.getOrCreate();
String host = "127.0.0.1";
String port = "9999";
Dataset
df = df.withColumn("words", functions.split(df.col("value"), " "));
df = df.withColumn("word", functions.explode(df.col("words")));
df = df.groupBy(df.col("word")).count();
try {
df.writeStream().format("console")
.option("truncate", "false")
.outputMode("complete")
.start()
.awaitTermination();
} catch (StreamingQueryException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}
}
集群运行
spark-submit \
--master spark://node0:7077 \
--deploy-mode cluster \
--class com.mustafa.mynetty.CarsBet \
hdfs://node1:8020/software/spark-demo-1.0-SNAPSHOT.jar
附录
文件上传到 hdfs 文件系统(小汽车摇号分析)
package com.mustafa.mynetty;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Arrays;
public class CarsBetUploader {
private static FileSystem getFileSystem() throws IOException {
Configuration conf = new Configuration();
FileSystem fileSystem = FileSystem.get(conf);
return fileSystem;
}
private static void uploadHdfsFile(File localFile, String targetPath) throws IOException {
FileSystem fileSystem = getFileSystem();
fileSystem.delete(new Path(targetPath), true);
try (FileInputStream inStream = new FileInputStream(localFile); FSDataOutputStream outStream = fileSystem.create(new Path(targetPath))) {
IOUtils.copy(inStream, outStream, 4096);
}
}
private static void readHdfsFile(String hdfsFilePath) throws IOException {
//预处理,删除文件加
FileSystem fileSystem = getFileSystem();
Path path = new Path(hdfsFilePath);
//用文件系统操作路径
try (FSDataInputStream inStream = fileSystem.open(path)) {
IOUtils.copy(inStream, System.out, 4096);
}
}
public static void main(String[] args) throws Exception {
// String hdfsFilePath = "hdfs://node1:8020/bigdata/spark/wordCount/datas/wikiOfSpark.txt";
// readHdfsFile(hdfsFilePath);
String sourceRootPath = "/Users/mustafa/Documents/databases/bigdata/spark/spark_sql";
String targetRootPath = "hdfs://node1:8020/bigdata/spark/carsBet/datas";
String[] subFolders = new String[]{"apply", "lucky"};
for (String subFolder : subFolders) {
String sourcePath = sourceRootPath + "/" + subFolder;
String targetPath = targetRootPath + "/" + subFolder;
File sourcePathFile = new File(sourcePath);
File[] sourceList = sourcePathFile.listFiles();
for (File source : sourceList) {
if (source.isDirectory() && source.getName().startsWith("batchNum=")) {
String target = targetPath + "/" + source.getName();
File[] sourcePartitionsList = source.listFiles();
Arrays.asList(sourcePartitionsList).parallelStream().forEach(sourcePartition -> {
if (sourcePartition.isFile() && sourcePartition.getName().startsWith("part-")) {
String targetPartition = target + "/" + sourcePartition.getName();
System.out.println("upload " + targetPartition);
try {
uploadHdfsFile(sourcePartition, targetPartition);
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
}
}
}
}
hadoop hdfs 配置
core-site.xml
hdfs-site.xml
hadoop-env.sh
export LANG=en_US.UTF-8
export HADOOP_OS_TYPE=${HADOOP_OS_TYPE:-$(uname -s)}
export JAVA_HOME=/opt/software/jdk
export HADOOP_HOME=/opt/software/hadoop
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HADOOP_LOG_DIR=$HADOOP_HOME/logs
export HDFS_NAMENODE_USER=root
export HDFS_DATANODE_USER=root
export HDFS_SECONDARYNAMENODE_USER=root
export YARN_RESOURCEMANAGER_USER=root
export YARN_NODEMANAGER_USER=root
spark 配置
spark-defaults.conf
spark.master spark://192.168.31.125:7077
spark.local.dir /opt/software/spark/temp
spark.sql.autoBroadcastJoinThreshold 1g
spark.sql.adaptive.enabled true
spark.ui.port 4040
spark.executor.cores 3
spark.executor.memory 2g
spark.memory.fraction 0.9
spark.memory.storageFraction 0.1
spark.eventLog.enabled true
spark.eventLog.dir hdfs://node1:8020/software/spark/history
spark.driver.memory 1g
spark-env.sh
#!/usr/bin/env bash
SPARK_MASTER_HOST=node0
HADOOP_CONF_DIR=/opt/software/hadoop/etc/hadoop
SPARK_EXECUTOR_CORES=3
SPARK_EXECUTOR_MEMORY=2g
JAVA_HOME=/opt/software/jdk
SPARK_HOME=/opt/software/spark
推荐阅读
发表评论