目录

一、python连接操作hdfs

1 往hdfs上传文件

2 处理并存储到hdfs

3 读取hdfs上的txt文件

这里使用的是 pip 安装,很方便:

pip install hdfs

一、python连接操作hdfs

from hdfs.client import Client

client = Client("http://LocalHost:Port")

client.makedirs('/ml/zmingmingmng')#建立文件夹

client.delete('/ml/zmming')#删除文件夹

client.upload("/ml/zmingmingmng/zm.txt","E:/ttt/testhdfs.txt")#上传文件

client.download("/ml/zmingmingmng/zm.txt","E:/ming.txt")#下载文件

# -*- encoding=utf-8 -*-

from hdfs.client import Client

client = Client("http://XXX.XXX.XX.XX:50070")

# 创建目录

def mkdirs(client, hdfs_path):

client.makedirs(hdfs_path)

# 删除hdfs文件

def delete_hdfs_file(client, hdfs_path):

client.delete(hdfs_path)

# 上传文件到hdfs

def put_to_hdfs(client, local_path, hdfs_path):

client.upload(hdfs_path, local_path, cleanup=True)

# 从hdfs获取文件到本地

def get_from_hdfs(client, hdfs_path, local_path):

client.download(hdfs_path, local_path, overwrite=False)

# 追加数据到hdfs文件

def append_to_hdfs(client, hdfs_path, data):

client.write(hdfs_path, data, overwrite=False, append=True)

# 覆盖数据写到hdfs文件

def write_to_hdfs(client, hdfs_path, data):

client.write(hdfs_path, data, overwrite=True, append=False)

# 移动或者修改文件

def move_or_rename(client, hdfs_src_path, hdfs_dst_path):

client.rename(hdfs_src_path, hdfs_dst_path)

# 返回目录下的文件

def list(client, hdfs_path):

return client.list(hdfs_path, status=False)

if __name__ == '__main__':

# 调用

kk=list(client,"/user/admin/deploy/user_lable_dimension/")

for each in kk:

print(each)

 

1 往hdfs上传文件

from hdfs.client import Client

"""往hdfs上传文件"""

# TODO 往hdfs上传文件

client = Client("http://XXX.XXX.XX.XX:50070")

# 新建文件夹

hdfs_path ="【文件要存放的目录路径,eg:/a/b/c】"

client.makedirs(hdfs_path)

print("uploading data...")

client.upload(hdfs_path, "intersection.xlsx", overwrite=True) # 资源中心上传的文件

 

2 处理并存储到hdfs

# TODO 先得到结果列表。eg:i_list

# TODO 把结果列表存储成文件上传到hdfs

print("===============================================")

i_df = pd.DataFrame(i_list)

client = Client("http://XXX.XXX.XX.XX:50070")

fout = "【文件要存放的路径,eg:/a/b/c.csv】" # hdfs下的目录

with client.write(fout, encoding='utf-8') as writer:

i_df.to_csv(writer)

print("存储成功")

 

3 读取hdfs上的txt文件

from hdfs.client import Client

import json

from kafka import KafkaConsumer

import time

import pyhdfs

def GetEncodingSheme(_filename):

""" 查看文本编码方式 """

with open(_filename, 'rb') as file:

buf = file.read()

result = chardet.detect(buf)

return result['encoding']

def read_hdfs_file(client, filename):

"""读取hdfs文件内容,将每行存入数组返回"""

lines = []

print("开始读取txt数据")

with client.open(filename, delimiter='\n') as reader:

for line in reader:

lines.append(line.decode("GB2312").strip())

return lines

def deleteHDFSfile(client, hdfs_path):

"""删除hdfs文件,删除文件夹时该文件夹必须为空"""

client.delete(hdfs_path)

if __name__ == "__main__":

print(GetEncodingSheme('intersection.xlsx')) # GB2312

# hdfs连接

client = pyhdfs.HdfsClient(hosts="http://xxxxxx:50070,http://xxxxxx:50070", user_name="xxxxxx")

# TODO 读取hdfs文件内容,将每行存入数组返回

hdfs_path = "【文件路径,eg:/a/b/c.xlsx】" # hdfs存储目录

print("===============================================")

print("开始读取hdfs上的txt文件")

lines = read_hdfs_file(client, hdfs_path)

print(lines)

print("读取完成")

print("===============================================")

# TODO 删除hdfs存储目录下的文件

hdfs_path = "【文件路径】"

deleteHDFSfile(client, hdfs_path)

相关阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: