目录

前言

题目:

一、读题分析

二、处理过程

1.常规思路

2.这里提供第二种比较和筛选数据

三、重难点分析

总结 

前言

本题来源于2022  年全国职业院校技能大赛(高职组)“大数据技术与应用”赛项(电商)-  离线数据处理 - 数据抽取

题目:

提示:以下是本篇文章正文内容,下面案例可供参考(使用Scala语言编写) 

一、读题分析

涉及组件:MYSQL,HIVE,SPARK,SCALA

涉及知识点:

与大数据之使用Spark增量抽取MySQL的数据到Hive数据库(1)一样与(1)不同的是,1是针对单列的时间进行比较,本题是在表上的两列当中选取时间较大的那一列的值作为判定时间

二、处理过程

  比较每一行两列的值,将他们筛选出来,然后赋给新的一列临时列,最后在删除临时列

1.常规思路

import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.functions._

import java.time.LocalDate

object sparkMysqltoHive {

def main(args: Array[String]): Unit = {

import java.text.SimpleDateFormat

import java.util.{Calendar, Properties}

val spark = SparkSession.builder()

.appName("Incremental Data Extraction").master("spark://host:7077")

.enableHiveSupport()

.getOrCreate()

val jdbcurl = "jdbc:mysql://bigdata1:3306/db"

val tablename = "table1"

val properties = new Properties()

properties.setProperty("user", "root")

properties.setProperty("password", "123456")

properties.setProperty("driver", "com.mysql.jdbc.Driver")

// 读取mysql数据创建dataframe

val mysqlDF = spark.read.jdbc(jdbcurl, tablename, properties)

// val maxtime = mysqlDF.agg(max(greatest(col("time1"), col("time2")))).toString() 最大时间

// 创建临时视图

mysqlDF.createOrReplaceTempView("mysql_user_info")

// 拿到之前的数据和两比较中最大的数据列

val mysqlDF2 = spark.sql("select *, greatest(operate_time, create_time) as incremental_field from mysql_user_info")

// 读取hive数据

val hiveDF = spark.sql("select * from ods.user_info")

// 在hive中生成一列,并找到hive那两列最大的列

val hiveDF2 = hiveDF.withColumn("incremental_field", greatest(col("operate_time"), col("create_time")))

// 在两列比较最大的同时在比较最大转换为时间

val maxDate = hiveDF2.agg(max(col("incremental_field"))).first().getTimestamp(0)

// agg被用来处理聚合数据的函数

// 在MySQL全部数据中将不满足mysql最大时间大于hive最大时间的数据过滤

val newRecords = mysqlDF2.filter(col("incremental_field") > maxDate)

val yesterday = new SimpleDateFormat("yyyyMMdd")

.format(Calendar.getInstance().getTime.getTime - 24 * 60 * 60 * 1000)

// 筛选后删除临时列,并存上分区列

val result = newRecords

.drop(col("incremental_field"))

.withColumn("etl_date", lit(yesterday))

// Write the new records to Hive table with static partitioning

result.write.mode("append")

.partitionBy("etl_date")

.saveAsTable("ods_userinfo")

}

}

2.这里提供第二种比较和筛选数据

// 从hive读取数据

val hiveDF = spark.sql("select max(greatest(time1,time2)) from ods.table4")

val maxTime = hiveDF.collect()(0)(0).toString

// 筛选出增量数据

val valueDF = mysqlDF.filter(col("time1").gt(maxTime) || col("time2").gt(maxTime))

三、重难点分析

必须要知道sql中greatest函数,这个函数能够筛选出一行中的两列中最大的值,与max不同,max是选出单独的一列中最大的值。

总结 

数据处理需要灵活的使用sql函数或者spark相关函数对数据进行处理,但思路总体上一样。对于数据处理,还需要掌握好一些非常见但是又很使用的函数。

可以与大数据之使用Spark增量抽取MySQL的数据到Hive数据库(1)进行比较,找到他们的不同点,对比一下。 链接:大数据之使用Spark增量抽取MySQL的数据到Hive数据库(1)

原创作品如需引用请标明出处

文章链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: