采用spark实现的拉链表

拉链表初始化

import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.functions.lit

/**

* 拉链表初始化

*/

object table_zip_initial {

val lastDay = "9999-12-31"

def main(args: Array[String]): Unit = {

var table_base = "t_uac_organization" //基表

var table_zip = "ods_uac_org_zip" //拉链表

/**

* 基于该天的t_uac_organization

*/

var dt = "2023-01-31"

System.setProperty("HADOOP_USER_NAME", "root")

val builder = SparkUtils.getBuilder

if (System.getProperties.getProperty("os.name").contains("Windows")) {

builder.master("local[*]")

} else {

table_base = args(0)

table_zip = args(1)

dt = args(2)

}

val spark = builder

.appName(this.getClass.getName).getOrCreate()

val hive_db = "common"

spark.sql(s"use $hive_db")

/**

* 初始化,一次

*/

if (!TableUtils.tableExists(spark, hive_db, table_zip)) {

println(s"$table_zip not exists,初始化")

init(dt, spark, hive_db, table_base, table_zip)

} else {

val t_zip = spark.sql(

s"""

|

|select * from $table_zip where dt='$lastDay'

|

|""".stripMargin)

if (t_zip.isEmpty) {

//init

println(s"$table_zip isEmpty 初始化")

init(dt, spark, hive_db, table_base, table_zip)

} else {

println(s"$table_zip exist and not empty,无需初始化!!!")

}

}

spark.stop()

}

private def init(dt: String, spark: SparkSession, hive_db: String, table_base: String, table_zip: String): Unit = {

val t_base = spark.sql(

s"""

|

|select * from $table_base where dt='${dt}'

|""".stripMargin)

println(s"$table_base show")

t_base.show(false)

val ods_zip = t_base

.drop("dt")

.withColumn("t_start", lit(dt))

.withColumn("t_end", lit(lastDay))

.withColumn("dt", lit(lastDay))

if (!ods_zip.isEmpty) {

println(s"$table_zip show")

ods_zip.show(false)

println(s"$table_zip 初始化...")

SinkUtil.sink_to_hive(lastDay, spark, ods_zip, hive_db, hive_table = s"$table_zip", "parquet", MySaveMode.OverWriteByDt)

} else {

println(s"$table_zip is empty,初始化失败...")

}

}

}

拉链表每日滚动计算

import org.apache.spark.sql.functions.{count, lit}

import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

import org.apache.spark.storage.StorageLevel

/**

* 拉链表只能从装载首日起,一天一天滚动计算

*/

object ods_uac_org_zip {

val lastDay = "9999-12-31"

def main(args: Array[String]): Unit = {

var dt = "2023-02-01"

var dt1 = "2023-02-02"

System.setProperty("HADOOP_USER_NAME", "root")

val builder = SparkUtils.getBuilder

if (System.getProperties.getProperty("os.name").contains("Windows")) {

builder.master("local[*]")

} else {

dt = args(0)

dt1 = args(1)

}

val spark = builder

.appName(this.getClass.getName).getOrCreate()

val hive_db = "common"

spark.sql(s"use $hive_db")

new IDate {

override def onDate(dt: String): Unit = {

processByDt(spark, dt, hive_db)

}

}.invoke(dt, dt1)

spark.stop()

}

/**

* 滚动计算每个dt的对应的过期数据

*/

def processByDt(spark: SparkSession, dt: String, hive_db: String): Unit = {

val theDayBeforeDt = DateUtil.back1Day(dt + " 00:00:00").split(" ")(0)

/**

* 一定需要先缓存

* 否则重算则fileNotFoundException

* 因此需要借助临时表处理或者设置ck

*/

var ods_uac_org_zip = spark.sql(

s"""

|

|select * from ods_uac_org_zip where dt='$lastDay'

|""".stripMargin)

.persist(StorageLevel.MEMORY_ONLY_SER_2)

/**

* 持久化为临时表

*/

ods_uac_org_zip

.repartition(3)

.write

.format("parquet")

.mode(SaveMode.Overwrite)

.saveAsTable(s"${hive_db}.ods_uac_org_zip_tmp")

/**

* 已经指向临时表

* 后续方便对源表(ods_uac_org_zip)进行更新

*/

ods_uac_org_zip = spark.sql(

s"""

|

|select * from ods_uac_org_zip_tmp

|""".stripMargin)

/**

* old,已经存在的拉链表的最新全量

*/

val f_old_9999 = ods_uac_org_zip

.drop("dt")

println("f_old_9999 show")

f_old_9999.show(false)

/**

* dt该天的新增和变化

*/

val f_new = spark.sql(

s"""

|

|select * from new_change_t_uac_organization where dt='${dt}'

|""".stripMargin)

.drop("dt")

.withColumnRenamed("id", "id2")

.withColumnRenamed("org_name", "org_name2")

.withColumnRenamed("parent_id", "parent_id2")

.withColumnRenamed("sort", "sort2")

.withColumnRenamed("org_type", "org_type2")

.withColumnRenamed("org_level", "org_level2")

.withColumnRenamed("is_auth_scope", "is_auth_scope2")

.withColumnRenamed("parent_auth_scope_id", "parent_auth_scope_id2")

.withColumnRenamed("status", "status2")

.withColumnRenamed("icon_class", "icon_class2")

.withColumnRenamed("create_id", "create_id2")

.withColumnRenamed("create_time", "create_time2")

.withColumnRenamed("update_id", "update_id2")

.withColumnRenamed("update_time", "update_time2")

.withColumnRenamed("version", "version2")

.withColumn("t_start2", lit(dt))

.withColumn("t_end2", lit(lastDay))

println("f_new show")

f_new.show(false)

val f1 = f_old_9999.join(f_new, f_old_9999.col("id") === f_new.col("id2"), "full_outer")

f1.createOrReplaceTempView("v1")

println("v1 temp show")

f1.show(false)

f1.filter(s"id='1008'").show(false)

/**

* 这是所有dt=9999的

*/

val f_9999: DataFrame = spark.sql(

"""

|

|select

|nvl(id2,id) as id

|,nvl(org_name2,org_name) as org_name

|,nvl(parent_id2,parent_id) as parent_id

|,nvl(sort2,sort) as sort

|,nvl(org_type2,org_type) as org_type

|,nvl(org_level2,org_level) as org_level

|,nvl(is_auth_scope2,is_auth_scope) as is_auth_scope

|,nvl(parent_auth_scope_id2,parent_auth_scope_id) as parent_auth_scope_id

|,nvl(status2,status) as status

|,nvl(icon_class2,icon_class) as icon_class

|,nvl(create_id2,create_id) as create_id

|,nvl(create_time2,create_time) as create_time

|,nvl(update_id2,update_id) as update_id

|,nvl(update_time2,update_time) as update_time

|,nvl(version2,version) as version

|,nvl(t_start2,t_start) as t_start

|,nvl(t_end2,t_end) as t_end

|,nvl(t_end2,t_end) as dt

|

|from v1

|

|

|""".stripMargin)

/**

* +----+--------------------------------------------+---------+----+--------+---------+-------------+--------------------+------+-------------------------+---------+-------------------+---------+-------------------+-------+----------+----------+----------+

* |id |org_name |parent_id|sort|org_type|org_level|is_auth_scope|parent_auth_scope_id|status|icon_class |create_id|create_time |update_id|update_time |version|t_start |t_end |dt |

* +----+--------------------------------------------+---------+----+--------+---------+-------------+--------------------+------+-------------------------+---------+-------------------+---------+-------------------+-------+----------+----------+----------+

* |1 |运营系统 |0 |0 |4 |1 |N |null |1 |iconfont icon-xitong |655 |2019-05-20 17:58:11|null |null |null |2023-01-31|9999-12-31|9999-12-31|

*/

println("f_9999 show")

f_9999.show(false)

println(s"在${dt}的发生状态变化的,新的有效区间[$dt,$lastDay]...")

f_9999.filter(s"t_start='$dt'").show()

f_9999.groupBy("dt")

.agg(count("id"))

.show()

/**

* 过期的数据

* 需要闭合t_end

* dt天发现有变化,那么则在dt-1天过期

* 过期的数据:上一次的起始时间,必然小于dt(这个条件很重要,否则幂等计算会有问题,会把计算过的历史分区的起始时间给覆盖掉)

*/

val f_expire = spark.sql(

s"""

|

|select

|id,

|org_name,

|parent_id,

|sort,

|org_type,

|org_level,

|is_auth_scope,

|parent_auth_scope_id,

|status,

|icon_class,

|create_id,

|create_time,

|update_id,

|update_time,

|version,

|t_start,

|cast(date_add('${dt}',-1) as string) as t_end,

|cast(date_add('${dt}',-1) as string) as dt

|

|from v1

|where id2 is not null and id is not null and t_start<'$dt'

|

|""".stripMargin)

println("f_expire show")

f_expire.show(false)

/**

* 没有动态分区,那就分别各自持久化

*/

if (!f_9999.isEmpty) {

SinkUtil.sink_to_hive(lastDay, spark, f_9999, hive_db, hive_table = "ods_uac_org_zip", "parquet", MySaveMode.OverWriteByDt)

}

if (!f_expire.isEmpty) {

SinkUtil.sink_to_hive(theDayBeforeDt, spark, f_expire, hive_db, hive_table = "ods_uac_org_zip", "parquet", MySaveMode.OverWriteByDt)

}

}

}

好文推荐

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: