参考:https://daniel.blog.csdn.net/article/details/107415130

1.添加SFTP连接

选择 Tools =>  Deploment => Configuration。

 2. 在Deployment界面中,设置Connection、Mapping的配置。

Connection 配置:

 Mapping配置:

 2.添加SSH Interpreter

 3.Project Structure

 4.启动设置

点击右上角的三角形,然后删除working directory里面的路径,并更改Environment variables。

 5.WordCount.py代码

# coding=UTF-8

import sys

# 设置服务器上py4j库所在的路径

sys.path.append('/export/servers/spark/python/lib/py4j-0.10.4-src.zip')

from pyspark.sql import SparkSession

if __name__ == "__main__":

# 如果spark配置了yarn集群,这里的master可以修改为yarn

spark = SparkSession.builder \

.master('local') \

.appName('Pycharm Connection') \

.getOrCreate()

# wordcount操作,这里文件为hdfs的路径

words = spark.sparkContext \

.textFile("hdfs:/data/words") \

.flatMap(lambda line: line.split(" ")) \

.map(lambda word: (word, 1)) \

.reduceByKey(lambda a, b: a + b) \

.collect()

for word in words:

print(word)

spark.stop()

# spark = SparkSession.builder\

# .master('local[6]')\

# .appName('Course_Test') \

# .config("hive.metastore.uris", "thrift://node03:9083")\

# .enableHiveSupport()\

# .getOrCreate()

# # 方式一:

# sql = "select * from course.SCORE"

# spark.sql("use course")

# queryResult = spark.sql(sql)

# spark.sql("drop table if exists course.score_test")

# queryResult.write.format("hive").mode("overwrite").saveAsTable('course.score_test')

# spark.stop()

# 方式二:

# sql = "select * from course.SCORE"

# queryResult = spark.sql(sql)

# queryResult.registerTempTable('temp_table')

# spark.sql("truncate table course.score_test")

# spark.sql("insert into course.score_test select * from temp_table")

# spark.stop()

6.右键 => Run "WordCount"

相关阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: