一、环境配置

本文在Windows下配置Spark访问Hive。如需在Linux上配置,请对应Linux上同样的目录即可。

检查PySpark环境正常运行;检查Hive环境正常运行;启动Hive元数据服务

hive –service metastore

先将%HIVE_HOME%\conf\hive-site.xml拷贝到%SPARK_HOME%\conf。此步骤是为了Spark能读取Hive相应的配置; 再将%HIVE_HOME%\lib下的MySQL连接驱动的Jar包(mysql-connector-java-5.1.36-bin.jar)拷贝到%SPARK_HOME%\jars目录下。Jar包的版本与MySQL数据库的版本配套。此步骤是为了Spark能够访问Hive的元数据库。

此时,正常启动PySpark交互程序,可在交互模式下正常访问Hive了。进入交互环境,在提示符后直接输入以下代码:

spark.sql(‘show tables’).show()

正常执行后,应该能够看到default库中的表。这时,可以配置Python IDE的开发环境了

 

检查pycharm或其他IDE中的PySpark的开发环境正常;再将%HIVE_HOME%\lib下的MySQL连接驱动的Jar包(mysql-connector-java-5.1.36-bin.jar)拷贝到%PYTHONDIR%\Lib\site-packages\pyspark\jars目录下。Jar包的版本与MySQL数据库的版本配套。此步骤是为了在IDE环境中能够访问Hive的元数据库;添加环境变量SPARK_CONF_DIR,变量值为%SPARK_HOME%\conf。此步骤是为了在IDE中运行Spark程序时,能够读取Spark配置目录下的相应配置信息;

此时,正常启动pycharm。可在IDE环境下正常访问Hive了。在pycharm的工程中新建一个Python文件,输入以下代码:

from pyspark.sql import SparkSession

spark=SparkSession.builder.appName('sparkhive').master('local[*]').enableHiveSupport().getOrCreate()

spark.sql('show tables').show()

正常执行后,应该能够看到default库中的表。

二、读写Hive数据源

从Spark2.0开始,引入SparkSession 作为 DataSet 和 DataFrame API 的切入点,SparkSession封装了SparkConf、SparkContext 和 SQLContext。为了向后兼容,SQLContext 和 HiveContext也被保存下来。在实际写程序时,只需要定义一个SparkSession对象就可以了。不用使用SQLContext 和 HiveContext。

SQLContext 和 HiveContext方式读写Hive数据

(1)读取数据

from pyspark.sql import HiveContext

from pyspark import SparkConf,SparkContext

conf=SparkConf().setMaster("local").setAppName("sparkhive")

sc=SparkContext(conf=conf)

# 创建HiveContext实例

hive_context = HiveContext(sc)

# 读取default.stocks表

stocks_df = hive_context.sql("SELECT * FROM stocks")

# 显示数据

stocks_df.show(10)

(2)写入数据

from pyspark.sql import HiveContext

from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType

from pyspark import SparkConf,SparkContext

conf=SparkConf().setMaster("local").setAppName("sparkhive")

sc=SparkContext(conf=conf)

# 定义DataFrame的结构(与stocks表的结构一致)

schema = StructType([

    StructField("exchange_e", StringType(), True),

    StructField("symbol", StringType(), True),

    StructField("ymd", StringType(), True),

    StructField("price_open", FloatType(), True),

    StructField("price_high", FloatType(), True),

    StructField("price_low", FloatType(), True),

    StructField("price_close", FloatType(), True),

    StructField("volume", IntegerType(), True),

    StructField("price_adj_close", FloatType(), True)

])

hive_context = HiveContext(sc)

# 创建DataFrame

new_data = [("BJSSE","AAPL", "2023-07-04", 150.0, 155.0, 148.0, 152.0, 1000000, 151.0),

            ("SSE", "GOOG","2023-07-04", 2600.0, 2650.0, 2590.0, 2630.0, 500000, 2620.0)]

df_to_write = hive_context.createDataFrame(new_data, schema=schema)

# 注册为临时表以便进行后续操作

df_to_write.registerTempTable("temp_stocks")

# 将临时表中的数据插入到stocks表

hive_context.sql('''

    INSERT INTO TABLE stocks

    SELECT * FROM temp_stocks

''')

hive_context.sql("select * from stocks where exchange_e='SSE'").show()

此方法的读写操作也可以参看Spark2.1.0入门:连接Hive读写数据(DataFrame)(Python版)_厦大数据库实验室博客 Spark2.1.0入门:连接Hive读写数据(DataFrame)(Python版)_厦大数据库实验室博客

SparkSession方式读取Hive数据

在Spark中,使用SparkSession(从Spark 2.0开始)可以方便地读取和写入Hive表。以下是如何在Python中使用PySpark进行操作的例子:

(1)读取数据

from pyspark.sql import SparkSession

# 初始化SparkSession并启用Hive支持

spark = SparkSession.builder\

    .appName("StocksDataWriteExample")\

    .enableHiveSupport()\

    .getOrCreate()

# 读取并显示stocks表的数据

spark.sql("SELECT * FROM stocks").show(10)

(2)写入数据

from pyspark.sql import SparkSession

# 初始化SparkSession并启用Hive支持

spark = SparkSession.builder \

        .appName("StocksDataWriteExample") \

        .enableHiveSupport() \

        .getOrCreate()

# 定义数据和列结构(与stocks表结构一致)

columns = ["exchange_e", "symbol", "ymd", "price_open", "price_high", "price_low", "price_close", "volume", "price_adj_close"]

new_data = [("BJSSE","AAPL", "2023-07-04", 150.0, 155.0, 148.0, 152.0, 1000000, 151.0),

            ("SSE", "GOOG","2023-07-04", 2600.0, 2650.0, 2590.0, 2630.0, 500000, 2620.0)]

# 创建DataFrame

df_to_write = spark.createDataFrame(new_data, schema=columns)

# 写入数据到stocks表,这里假设mode为'append'(追加模式)

df_to_write.write \

.mode('append') \

.format('Hive') \

.saveAsTable('default.stocks')

(3)要注意的问题

Hive 3.0以后,默认建立的表是ORC格式的(不用在hive-site.xml中开启行级事务支持)。即可以支持INSERT,DELETE和UPDATE行级事务操作。但如果是在Hive交互命令行创建的表,在spark程序看来都是HiveFileFormat格式的表。因此,上面的代码中采用.format('Hive')。Spark会匹配相应的schema。要回避这个问题,也可以采用以下代码,即从一个临时表向目标表追加数据的方法。

# 创建一个与stocks表结构相同的临时表

df_to_write.createOrReplaceTempView("temp_stocks")

# 使用Hive SQL语句将临时表数据插入到stocks表

spark.sql("""

    INSERT INTO TABLE default.stocks

    SELECT * FROM temp_stocks

""")

spark.sql('select * from stocks limit 10').show()

文章链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: