一、选题背景   

早在17 世纪80 年代,人类就进行了在气象大数据可视化方面的尝试,这次尝试来源于英国科学家埃德蒙·哈雷,凭借整理和计算大量数据的才能,哈雷绘制了世界上第一张载有海洋盛行风分布的气象图,以地图为依托,对信风的分布状况做了全球性的统计分析,并将分布状态生动的展现在世人面前,这也是有史可依的最早的气象数据可视化案例。

如今,气象数据可视化已经发展到了全新的时代。在美国National Weather Service 网站上,气象数据信息已经实现了以地图为载体的全面可视化展示,文字描述变成了辅助信息,图形可以一目了然的传达不同地理区划内各类气候历史资料和实时的天气实况、预报数据。

随着气象数据的监测和预报的高度发展、以及自动化水平的不断提升,气象数据信息也正呈现爆发式增长的趋势。而同时,伴随着媒体技术的发展,受众对于气象服务信息的需求也逐步从传统媒体的单向传达向全媒体的交互式体验转变,单一的讲述方式已经不再是让受众接收气象信息的有力途径。而在这个变化的过程当中,人们对于气象信息本身的需求也不再是单一的“明天下雨吗”,而是需要对天气原理、气候统计等方面越来越专业化、深入化的系统解答。这种受众需求的引领,也推动了气象服务向数据信息可视化方向全面发展。

二、设计目的和要求

数据采集与处理:天气预测系统需要采集大量的数据,包括气象观测数据、卫星遥感数据、地理信息数据等。同时,需要对这些数据进行清洗、去重、格式转换等处理,以便于后续的分析和预测。机器学习与深度学习算法:基于大数据的天气预测系统需要利用机器学习与深度学习算法对历史数据进行分析,找出其中的规律和特征,并根据这些规律和特征进行预测。常见的算法包括线性回归、决策树、随机森林、神经网络等。数据可视化技术:将预测结果以直观的方式呈现给用户,是天气预测系统的重要功能之一。数据可视化技术可以将数据以图表、图像等形式展示,帮助用户更好地理解预测结果。

三、设计内容和思路

内容

本文首先介绍了天气预测的概念,阐述了天气预测对生产生活的重要影响。接着,从发展历程、特点两个方面对Python进行了分析,并介绍了如何搭建Python开发环境。随后,介绍了网络爬虫的概念,从原理和分类两个方面对网络爬虫进行了研究,在上述基础上,分析了基于Python的网络爬虫技术,设计并实现了基于Python的天气预测系统。所使用的算法可以有效对天气进行预测,为用户的出行天气预警提供有效帮助。

Python爬虫模块:使用requests爬取腾讯天气的气象数据作为分析数据集存入mysql;Spark实时计算模块:集成SparkSQL完成气象数据统计指标的计算提取;数据预测模块:使用Python线性回归预测、机器学习/深度学习模型对气象数据进行分析,并将结果以json的形式推送给前端UI界面;数据可视化模块:使用echarts实现数据可视化大屏; 

思路

调查法:系统的搜集有关天气数据的具体信息。 线性回归预测模型:用:使用Python线性回归预测模型对气象数据进行分析。

文献研究法:通过搜集和查阅大量国内外学者关于天气预测相关的研究,以及各大天气数据网站上的相关信息,梳理出影响天气气候的因素。     实证研究法:依据现有的科学理论和实践的需要提出设计

四、预期成果

(1)编写系统源代码;

(2)毕业设计说明书。

五、设计时间安排

第1周:查阅相关资料,完成文献综述。

第2周:结合课题要求,提交开题报告,并完成开题答辩。

第3~5周:进行系统分析、总体设计和详细设计。

第6~9周:实现系统编码、调试及软件测试;撰写毕业设计。

第10~12周:修改毕业设计至定稿,资格审查。

第13~14周:毕业设计答辩及资料归档。

六、完成设计所需要的条件

(1)软硬件环境:硬件环境有win10笔记本电脑配置有16G内存、256G固态硬盘(用于存储、计算、开发);软件环境有Python、JDK1.8、Hadoop、Spark、Hive、Maven、Vue.js等。

(2)数据库:MySQL数据库

(3)开发环境与工具:Vmvare、IDEA、Pycharm、Navicat

核心算法代码分享如下:

package com.sql

import org.apache.spark.sql.functions.{count, desc, when}

import org.apache.spark.sql.{DataFrame, SparkSession}

import org.apache.spark.sql.types._

import org.junit.Test

import java.util.Properties

class weather {

val spark = SparkSession.builder()

.master("local")

.appName("sparksql")

.getOrCreate()

val schema = StructType(

List(

StructField("小时", StringType),

StructField("温度", IntegerType),

StructField("风力方向", StringType),

StructField("风级",IntegerType),

StructField("降水量", DoubleType),

StructField("相对湿度", IntegerType),

StructField("空气质量",IntegerType),

StructField("城市",StringType)

)

)

val schema1 = StructType(

List(

StructField("日期", StringType),

StructField("天气", StringType),

StructField("最低气温", IntegerType),

StructField("最高气温",IntegerType),

StructField("风向1", StringType),

StructField("风向2", StringType),

StructField("风级",IntegerType),

StructField("城市",StringType)

)

)

val df: DataFrame = spark.read

.option("delimiter", ",")

.schema(schema)

.option("header","true")

.csv("hdfs://192.168.227.166:9000/Data/weather1.csv")

val df1: DataFrame = spark.read

.option("delimiter", ",")

.schema(schema1)

.option("header","true")

.csv("hdfs://192.168.227.166:9000/Data/weather14.csv")

val df2: DataFrame = spark.read

.option("delimiter", ",")

// .schema(schema1)

.option("header","true")

.csv("hdfs://192.168.227.166:9000/Data/history.csv")

@Test

def a1(): Unit = {

df.show()

}

@Test

def a(): Unit = {

df1.show()

}

@Test

def a2(): Unit = {

df2.show()

}

//小时 :温度

@Test

def diqusl(): Unit = {

df.createOrReplaceTempView("sc")

val df2 = spark.sql(

"""

select `小时`,`城市`,`温度`

from sc

""")

df2

// .show()

.coalesce(1)

.write

.mode("overwrite")

.option("driver", "com.mysql.cj.jdbc.Driver")

.option("user", "root")

.option("password", "123456")

.jdbc(

"jdbc:mysql://localhost:3309/weather",

"time_wd",

new Properties()

)

}

// 小时 :相对湿度

@Test

def quandiqusl(): Unit = {

df.createOrReplaceTempView("cplx")

val df2 = spark.sql(

"""

select `小时`,`城市`,`相对湿度`

from cplx

""")

df2

// .show()

.coalesce(1)

.write

.mode("overwrite")

.option("driver", "com.mysql.cj.jdbc.Driver")

.option("user", "root")

.option("password", "123456")

.jdbc(

"jdbc:mysql://localhost:3309/weather",

"time_sd",

new Properties()

)

}

// 小时 :空气质量

@Test

def quandiqusl1(): Unit = {

df.createOrReplaceTempView("cplx")

val df2 = spark.sql(

"""

select `小时`,`城市`,`空气质量`

from cplx

where `空气质量` IS NOT NULL

""")

df2

// .show()

.coalesce(1)

.write

.mode("overwrite")

.option("driver", "com.mysql.cj.jdbc.Driver")

.option("user", "root")

.option("password", "123456")

.jdbc(

"jdbc:mysql://localhost:3309/weather",

"time_kq",

new Properties()

)

}

@Test

def quandiqusl3(): Unit = {

df.createOrReplaceTempView("cplx")

val df2 = spark.sql(

"""

SELECT `城市`,avg(`相对湿度`) `湿度`

from cplx

GROUP BY `城市`

ORDER BY `湿度` desc

""")

df2

// .show()

.coalesce(1)

.write

.mode("overwrite")

.option("driver", "com.mysql.cj.jdbc.Driver")

.option("user", "root")

.option("password", "123456")

.jdbc(

"jdbc:mysql://localhost:3309/weather",

"time_sd1",

new Properties()

)

}

// 小时 :降雨量

@Test

def quandiqusl2(): Unit = {

df.createOrReplaceTempView("cplx")

val df2 = spark.sql(

"""

select `小时`,`城市`,`降水量`

from cplx

where `降水量` != 0

""")

df2

// .show()

.coalesce(1)

.write

.mode("overwrite")

.option("driver", "com.mysql.cj.jdbc.Driver")

.option("user", "root")

.option("password", "123456")

.jdbc(

"jdbc:mysql://localhost:3309/weather",

"time_js",

new Properties()

)

}

// 近14天 :最高气温 最低气温

@Test

def chanpsl(): Unit = {

df1.createOrReplaceTempView("wzsl")

val df2 = spark.sql(

"""

select `日期`,`最低气温`,`最高气温`,`城市`

from wzsl

""")

df2

// .show()

.coalesce(1)

.write

.mode("overwrite")

.option("driver", "com.mysql.cj.jdbc.Driver")

.option("user", "root")

.option("password", "123456")

.jdbc(

"jdbc:mysql://localhost:3309/weather",

"day_wd",

new Properties()

)

}

// 近14天 :天气

@Test

def chanpsssl(): Unit = {

df1.createOrReplaceTempView("wzsl")

val df2 = spark.sql(

"""

select `天气`,count(`天气`) as `数量`,`城市`

from wzsl

group by `城市`,`天气`

""")

df2

// .show()

.coalesce(1)

.write

.mode("overwrite")

.option("driver", "com.mysql.cj.jdbc.Driver")

.option("user", "root")

.option("password", "123456")

.jdbc(

"jdbc:mysql://localhost:3309/weather",

"day_tq",

new Properties()

)

}

// 窗口函数

@Test

def chanpssl(): Unit = {

df2.createOrReplaceTempView("wzsl")

val df3 = spark.sql(

"""

SELECT

`日期`,`最高气温`,`城市`

FROM

( SELECT `日期`,`最高气温`,`城市`, row_number() over ( PARTITION BY `城市` ORDER BY `最高气温` DESC ) AS rn FROM wzsl ) t

WHERE

t.rn <= 5

""")

df3

// .show()

.coalesce(1)

.write

.mode("overwrite")

.option("driver", "com.mysql.cj.jdbc.Driver")

.option("user", "root")

.option("password", "123456")

.jdbc(

"jdbc:mysql://localhost:3309/weather",

"his_maxwd",

new Properties()

)

}

// 窗口函数

@Test

def chanpssl1(): Unit = {

df2.createOrReplaceTempView("wzsl")

val df3 = spark.sql(

"""

SELECT

`日期`,`最低气温`,`城市`

FROM

( SELECT `日期`,`最低气温`,`城市`, row_number() over ( PARTITION BY `城市` ORDER BY `最低气温` ASC ) AS rn FROM wzsl ) t

WHERE

t.rn <= 5

""")

df3

// .show()

.coalesce(1)

.write

.mode("overwrite")

.option("driver", "com.mysql.cj.jdbc.Driver")

.option("user", "root")

.option("password", "123456")

.jdbc(

"jdbc:mysql://localhost:3309/weather",

"his_minwd",

new Properties()

)

}

@Test

def chanpsssl1(): Unit = {

df2.createOrReplaceTempView("wzsl")

val df3 = spark.sql(

"""

select `风向`,count(`风向`) as `数量`,`城市`

from wzsl

group by `城市`,`风向`

""")

df3

// .show()

.coalesce(1)

.write

.mode("overwrite")

.option("driver", "com.mysql.cj.jdbc.Driver")

.option("user", "root")

.option("password", "123456")

.jdbc(

"jdbc:mysql://localhost:3309/weather",

"his_fx",

new Properties()

)

}

@Test

def chanpsssl2(): Unit = {

df2.createOrReplaceTempView("wzsl")

val df3 = spark.sql(

"""

select `天气`,count(`天气`) as `数量`,`城市`

from wzsl

group by `城市`,`天气`

HAVING `数量` > 10

""")

df3

// .show()

.coalesce(1)

.write

.mode("overwrite")

.option("driver", "com.mysql.cj.jdbc.Driver")

.option("user", "root")

.option("password", "123456")

.jdbc(

"jdbc:mysql://localhost:3309/weather",

"his_tq",

new Properties()

)

}

}

好文阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: