Spark读取Hive数据的两种方式与保存数据到HDFS

Spark读取Hive数据的方式主要有两种

1、 通过访问hive metastore的方式,这种方式通过访问hive的metastore元数据的方式获取表结构信息和该表数据所存放的HDFS路径,这种方式的特点是效率高、数据吞吐量大、使用spark操作起来更加友好。

2、 通过spark jdbc的方式访问,就是通过链接hiveserver2的方式获取数据,这种方式底层上跟spark链接其他rdbms上一样,可以采用sql的方式先在其数据库中查询出来结果再获取其结果数据,这样大部分数据计算的压力就放在了数据库上。

两种方式的具体实现示例

首先创建Spark Session对象:

val spark = SparkSession.builder()

.appName("test")

.enableHiveSupport()

.getOrCreate()

方式一(推荐) 直接采用Spark on Hive的方式读取数据,这样SparkSession在使用sql的时候会去找集群hive中的库表,加载其hdfs数据与其元数据组成DataFrame

val df = spark.sql("select * from test.user_info")

方式二 采用spark jdbc的方式,如果有特别的使用场景的话也可以通过这种方法来实现。

import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects}

object test{

def main(args: Array[String]): Unit = {

val spark = SparkSession.builder()

.master("local[2]")

.appName("test")

.getOrCreate()

register() //如果不手动注册,只能获取到数据库中的表结构,而不能获取到数据

val df = spark.read

.format("jdbc")

.option("driver","org.apache.hive.jdbc.HiveDriver")

.option("url","jdbc:hive2://xxx:10000/")

.option("user","hive")

.option("password",xxx)

.option("fetchsize", "2000")

.option("dbtable","test.user_info")

.load()

df.show(10)

}

def register(): Unit = {

JdbcDialects.registerDialect(HiveSqlDialect)

}

case object HiveSqlDialect extends JdbcDialect {

override def canHandle(url: String): Boolean = url.startsWith("jdbc:hive2")

override def quoteIdentifier(colName: String): String = {

colName.split('.').map(part => s"`$part`").mkString(".")

}

}

}

Spark的DataFrame和DataSet使用

​ DataFrame是Spark SQL提供的一个编程抽象,与RDD类似,也是一个分布式的数据集合。但与RDD不同的是,DataFrame的数据都被组织到有名字的列中,就像关系型数据库中的表一样。此外,多种数据都可以转化为DataFrame,例如Spark计算过程中生成的RDD、结构化数据文件、Hive中的表、外部数据库等。

在Spark中,一个DataFrame所代表的是一个元素类型为Row的Dataset,即DataFrame只是Dataset[Row]的一个类型别名。相对于RDD,Dataset提供了强类型支持,在RDD的每行数据加了类型约束。而且使用DatasetAPI同样会经过Spark SQL优化器的优化,从而提高程序执行效率。

DataFrame和R的数据结构以及python pandas DataFrame的数据结构和操作基本一致。

创建DataFrame、DataSet

创建RDDRDD转化为ROW通过ROW和元数据信息生成DataFrame然后通过DataFrame和对应的类转化为DataSet也就是说DataFrame是DataSet[Row],这里可以通过指定的类将其转化,DataSet[User]需要注意的事转化使用的类需要时内部类,然后就是类里的变量名要和元数据信息的列名保持对齐。

object MovieLenDataSet {

case class User(UserID:String, Gender:String, Age:String, Occupation:String, Zip_Code:String)

def main(args: Array[String]): Unit = {

Logger.getLogger("org").setLevel(Level.ERROR)

val spark = SparkSession.builder()

.appName("MovieLenDataSet")

.master("local[*]")

.getOrCreate()

import spark.implicits._

val dataPath = "/home/ffzs/data/ml-1m"

val schema4users = StructType(

"UserID::Gender::Age::Occupation::Zip_code"

.split("::")

.map(it => StructField(it, StringType, nullable = true))

)

val usersRdd = spark.sparkContext.textFile(f"$dataPath/users.dat")

val usersRows = usersRdd.map(_.split("::"))

.map(it => {

it.map(_.trim)

})

.map(it => Row(it(0), it(1), it(2), it(3), it(4)))

val usersDF: DataFrame = spark.createDataFrame(usersRows, schema4users)

val usersDataSet = usersDF.as[User]

usersDataSet.show(5)

}

}

Spark的DataFrame存储的Mode模式选择

spark的dataframe存储中都会调用write的mode方法:

data.write.mode(“append”).saveAsTable(s"u s e r i d . {userid}.userid.{datasetid}")

data.write.mode(SaveMode.Overwrite).parquet(hdfspath)

但不同时候的参数是不同的。

先看一下源码:

spark-v2.3.0:

def mode(saveMode: SaveMode): DataFrameWriter[T] = {

this.mode = saveMode

this

}

/**

* Specifies the behavior when data or table already exists. Options include:

* - `overwrite`: overwrite the existing data.

* - `append`: append the data.

* - `ignore`: ignore the operation (i.e. no-op).

* - `error` or `errorifexists`: default option, throw an exception at runtime.

*

* @since 1.4.0

*/

def mode(saveMode: String): DataFrameWriter[T] = {

this.mode = saveMode.toLowerCase(Locale.ROOT) match {

case "overwrite" => SaveMode.Overwrite

case "append" => SaveMode.Append

case "ignore" => SaveMode.Ignore

case "error" | "errorifexists" | "default" => SaveMode.ErrorIfExists

case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " +

"Accepted save modes are 'overwrite', 'append', 'ignore', 'error', 'errorifexists'.")

}

this

}

SaveMode.Overwrite(对应着字符串"overwrite"):表示如果目标文件目录中数据已经存在了,则用需要保存的数据覆盖掉已经存在的数据

SaveMode.Append(对应着字符串"append"):表示如果目标文件目录中数据已经存在了,则将数据追加到目标文件中 数据追加方式是:先将表中的所有索引删除,再追加数据

SaveMode.Ignore(对应着字符串为:“ignore”):表示如果目标文件目录中数据已经存在了,则不做任何操作 SaveMode.ErrorIfExists(对应着字符串"error"):表示如果目标文件目录中数据已经存在了,则抛异常(这个是默认的配置)

spark之Dataframe保存模式

以前spark.write时总要先把原来的删了,但其实是可以设置写入模式的。

val df = spark.read.parquet(input)

df.write.mode("overwrite").parquet(output)

dataframe写入的模式一共有4种:

overwrite 覆盖已经存在的文件append 向存在的文件追加ignore 如果文件已存在,则忽略保存操作error / default 如果文件存在,则报错

def mode(saveMode: String): DataFrameWriter = {

this.mode = saveMode.toLowerCase match {

case "overwrite" => SaveMode.Overwrite

case "append" => SaveMode.Append

case "ignore" => SaveMode.Ignore

case "error" | "default" => SaveMode.ErrorIfExists

case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " +

"Accepted modes are 'overwrite', 'append', 'ignore', 'error'.")

}

this

}

spark write写入数据task failed失败,两种模式下的不同表现

1、SaveMode.Append

task失败重试,并不会删除上一次失败前写入的数据(文件根据分区号命名),重新执行时会继续追加数据。所以会出现数据重复。

2、SaveMode.Overwrite

task失败重试,会删除该分区上次失败所写入的数据文件,然后创建一个新的数据文件写入数据。所以不会出现数据重复。

启动spark任务报错:ERROR SparkUI: Failed to bind SparkUI

当启动一个spark任务的时候,就会占用一个端口,默认为4040,从日志可以看到当端口被占用时,它会默认依次增加16次到4056,如果还是失败的话,就会报错退出。

解决方法:

使用spark-submit提交任务时,在脚本中加配置:–conf spark.port.maxRetries=128(亲测有效)

以下代码仅供学习参考

Spark执行外部Hql脚本Scala代码Demo示例

pom.xml

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

org.example

SparkReadHql_Test

1.0-SNAPSHOT

UTF-8

UTF-8

UTF-8

UTF-8

3.1.3

3.1.2

2.12.11

3.0.0

junit

junit

4.11

org.apache.hadoop

hadoop-client

${hadoop.version}

io.netty

netty

io.netty

netty-all

4.1.18.Final

org.apache.hadoop

hadoop-common

${hadoop.version}

org.scala-lang

scala-library

${scala.version}

org.apache.spark

spark-sql_2.12

${spark.version}

com.google.guava

guava

org.apache.spark

spark-hive_2.12

${spark.version}

log4j

log4j

1.2.15

javax.jms

jms

com.sun.jdmk

jmxtools

com.sun.jmx

jmxri

src/main/java

net.alchim31.maven

scala-maven-plugin

3.2.2

compile

testCompile

-dependencyfile

${project.build.directory}/.scala_dependencies

代码

package com.xxxx

import org.apache.spark.sql.SparkSession

import java.io.File

import java.io.FileInputStream

import scala.io.{BufferedSource, Source}

object SparkReadHqlTest {

def main(args: Array[String]): Unit = {

val filePath: String = args(0)

val input_date: String = args(1)

val session: SparkSession = SparkSession.builder()//.master("local[2]")

.appName("SparkSeesionApp")

.enableHiveSupport() //支持hive

.getOrCreate()

// session.sparkContext.setLogLevel("WARN")

val sql: String = doFile(filePath)

val strings: Array[String] = sql.split(";")

var i = 0;

strings.foreach(sql=>{

val startTime: Long = System.currentTimeMillis()

println("==============第 "+(i+1)+" 次===sql开始=================")

println(sql)

//替换参数

// session.sql(sql.replace("'${hivevar:input_date}'", input_date)).show()

session.sql(sql).show()

val stopTime: Long = System.currentTimeMillis()

val processTime: Long = (startTime - stopTime) / 1000

println("===============第 "+(i+1)+" 次==sql结束====耗时=="+processTime+" 秒==========")

i = i+1

})

//关闭SparkSession

session.stop()

}

//读取外部sql文件文件

def doFile(fileName: String): String = {

val file: File = new File(fileName)

val stream: FileInputStream = new FileInputStream(file)

val buff: BufferedSource = Source.fromInputStream(stream,"UTF-8")

//读取拼装SQL

val sql: String = buff.getLines().mkString("\n")

sql

}

}

Spark读取外部SQL文件java代码

pom.xml

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

org.example

SparkReadHqlFile

1.0-SNAPSHOT

org.apache.hadoop

hadoop-client

3.1.1

org.apache.hadoop

hadoop-hdfs

3.1.1

org.apache.hadoop

hadoop-common

3.1.1

org.apache.spark

spark-core_2.12

3.2.1

org.apache.spark

spark-sql_2.12

3.2.1

org.apache.spark

spark-hive_2.12

3.2.1

log4j

log4j

1.2.15

javax.jms

jms

com.sun.jdmk

jmxtools

com.sun.jmx

jmxri

代码

package org.example;

import org.apache.commons.lang3.StringUtils;

import org.slf4j.Logger;

import org.apache.spark.sql.SparkSession;

import org.slf4j.LoggerFactory;

import java.io.BufferedReader;

import java.io.FileReader;

import java.io.IOException;

public class SparkReadFile {

private static final Logger logger= LoggerFactory.getLogger(SparkReadFile.class);

public static void main(String[] args) throws IOException {

// 传入参数非空判断

validateArgs(args);

// 1.创建sparkSession

SparkSession spark = SparkSession.builder().config("hive.metastore.uris", args[1]) //hive的metastore地址

.config("Spark.serializer", "org.apache.spark.serializer.KryoSerializer")

.config("hive.exec.dynamic.partition.mode", "nonstrict").enableHiveSupport().getOrCreate();

// 2.解析sql文件

BufferedReader bufferedReader =null;

String tmpStr;

String execStatus ="";

try {

bufferedReader =new BufferedReader(new FileReader(args[0])); //sql文件名

StringBuilder tempSqlContent =new StringBuilder();

while((tmpStr =bufferedReader.readLine()) !=null){

tempSqlContent.append(tmpStr+"\n");

}

// 替代sql语句中的变量${batchDate}为对应的分区信息

String[] sqlList = tempSqlContent.toString().replaceAll("\\$\\{batchDate\\}", args[2]).split(";");

for (int i = 0; i

logger.info("sql语句:{}",sqlList[i]);

// 3.执行SQL语句

spark.sql(sqlList[i]).show(false);

}

} catch (Exception e) {

logger.error("\n作业执行失败,{}\n"+e.getMessage(),e);

execStatus="1";

} finally {

// 4.关闭流

if (null !=bufferedReader){

bufferedReader.close();

}

if (null !=spark){

spark.close();

}

if ("1".equals(execStatus)){

System.exit(-1);

}

}

}

//参数非空判断

public static void validateArgs(String[] agrs){

if (null == agrs || args.length !=3 || StringUtils.isAnyEmpty(args)){

System.exit(-1);

}

}

}

租户Spark跨集群读取hive数据进行保存

pom.xml

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

org.example

MasterClusterToZH

1.0-SNAPSHOT

org.apache.hadoop

hadoop-client

3.1.1

org.apache.hadoop

hadoop-hdfs

3.1.1

org.apache.hadoop

hadoop-common

3.1.1

org.apache.spark

spark-core_2.12

3.2.1

org.apache.spark

spark-sql_2.12

3.2.1

org.apache.spark

spark-hive_2.12

3.2.1

log4j

log4j

1.2.15

javax.jms

jms

com.sun.jdmk

jmxtools

com.sun.jmx

jmxri

代码

package org.example;

import org.apache.commons.lang3.StringUtils;

import org.apache.spark.sql.Dataset;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.SaveMode;

import org.apache.spark.sql.SparkSession;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

public class ReadHiveToTenant {

private static final Logger logger= LoggerFactory.getLogger(ReadHiveToTenant.class);

public static void main(String[] args) {

// 传入参数非空判断

validateArgs(args);

// 1.初始化SparkSession对象,主集群Metastore

SparkSession spark = createSparkSession(args[0]);

// 2.读取主集群hive表分区数据

Dataset partitonDF = spark.read().table(args[1]).where(args[2]);

long count =partitonDF.count();

logger.info(args[1]+"表的数据量:-----:"+count);

//3.写入租户hive表的HDFS路径

partitonDF.write().mode(SaveMode.Overwrite).save(args[3]);

//4.关闭资源

if (null !=spark){

spark.close();

}

//5.修复hive表

SparkSession tenantSparkSession = createSparkSession(args[4]) ;

tenantSparkSession.sql("MSCK REPAIR TABLE"+agrs[5]);

if (null !=tenantSparkSession){

tenantSparkSession.close();

}

}

// 获取一个SparkSession对象

public static SparkSession createSparkSession(String hiveMetastore){

SparkSession sparkSession = SparkSession.builder().config("hive.metastore.uris", hiveMetastore)

.config("hive.exec.dynamic.partition", true) //开启动态分区

.config("hive.exec.dynamic.partition.mode", "nonstrict").enableHiveSupport().getOrCreate();

return sparkSession;

}

//参数非空判断

public static void validateArgs(String[] agrs){

if (null == agrs || args.length !=6 || StringUtils.isAnyEmpty(args)){

System.exit(-1);

}

}

}

以上代码示例仅供学习参考,方便收藏,对代码进行整理汇总 参考博客:

https://blog.csdn.net/qq_42213403/article/details/117557610https://blog.csdn.net/qq_34009542/article/details/118366474?spm=1001.2014.3001.5502https://blog.csdn.net/totally123/article/details/117224169

相关文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: