文章目录

1 连接HDFS1.1 Configuration类1.2 FileSystem类

2 创建文件夹3 上传文件4 下载文件5 数据写入(流式)源码分析

6 数据写出(流式)源码分析

1 连接HDFS

public class instance {

private static Configuration conf =null;

private static FileSystem fs =null;

@Before

public void connect2HDFS() throws IOException {

//设置客户端身份 以具备权限在hdfs上进行操作

System.setProperty("HADOOP_USER_NAME","root");

//创建配置对象实例

conf = new Configuration();

//设置操作的文件系统是HDFS 并且指定HDFS操作地址

conf.set("fs.defaultFS","hdfs://node1.itcast.cn:8020");

//创建FileSystem对象实例

fs = FileSystem.get(conf);

}

@After

public void close() {

//首先判断文件系统实例是否为null 如果不为null 进行关闭

if(fs !=null){

try {

fs.close();

} catch (IOException e) {

e.printStackTrace();

}

}

}

}

1.1 Configuration类

Configuration提供对配置参数的访问,通常称之为配置文件类。主要用于加载或者设定程序运行时相关的参数属性

通过debug发现:

首先加载了静态方法和静态代码块。其中在静态代码块中显示默认加载了两个配置文件:core-default.xml、core-site.xml当到达FileSystem.get(conf)这一行代码的时候,可以发现用户通过conf.set设置的属性也会被加载。

1.2 FileSystem类

FileSystem类是一个通用的文件系统的抽象基类。具体来说它可以实现为一个分布式的文件系统,也可以实现为一个本地文件系统。所有的可能会使用到HDFS的用户代码在进行编写时都应该使用FileSystem对象。代表本地文件系统的实现是LocalFileSystem,代表分布式文件系统的实现是DistributedFileSystem。当然针对其他hadoop支持的文件系统也有不同的具体实现。因此HDFS客户端在进行读写操作之前,需要创建FileSystem对象的实例。

通过debug发现:

FileSystem对象是通过调用getInternal方法得到的。在getInternal方法中调用了createFileSystem方法。FileSystem实例是通过反射的方式获得的。具体实现是通过调用反射工具类ReflectionUtils的newInstance方法并将class对象以及Configuration对象作为参数传入最终得到了FileSystem实例。

2 创建文件夹

@Test

public void mkdir() throws IOException {

//首先判断文件夹是否存在,如果不存在再创建

if(!fs.exists(new Path("/user1"))){

//创建文件夹

fs.mkdirs(new Path("/user1"));

}

}

3 上传文件

@Test

public void putFile2HDFS() throws IOException {

//创建本地文件路径

Path src = new Path("E:\\1.txt");

//hdfs上传路径

Path dst = new Path("/user1/1.txt");

//文件上传动作(local--->hdfs)

fs.copyFromLocalFile(src,dst);

}

注意:因为实在windows本地文件系统下写的运行的,因此本地路径为windows文件系统的。

4 下载文件

@Test

public void getFile2Local() throws IOException {

//源路径:hdfs的路径

Path src = new Path("/ithiema/1.txt");

//目标路径:local本地路径

Path dst = new Path("D:\\1.txt");

//文件下载动作(hdfs--->local)

fs.copyToLocalFile(src,dst);

}

注意:该代码运行时会报错,原因是Hadoop访问windows本地文件系统,要求Windows上的本地库能正常工作。 其中Hadoop使用某些Windows API来实现类似posix的文件访问权限。这些功能需要在hadoop.dll和winutils.exe来实现。

解决:下载Hadoop源码在windows平台编译,编译出windows本地库。然后配置Hadoop环境变量

5 数据写入(流式)

@Test

public void write() throws IOException {

//设置文件输出路径

Path path = new Path("/helloworld");

//调用create方法创建文件

FSDataOutputStream out = fs.create(path);

//创建本地文件输入流

FileInputStream in = new FileInputStream("E:\\input\\1.txt");

//io工具类实现流的拷贝

IOUtils.copy(in,out);

}

源码分析

(1)客户端请求NameNode

HDFS客户端通过对DistributedFileSystem对象调用create()请求创建文件。DistributedFileSystem为客户端返回FSDataOutputStream输出流对象,用于后续写数据。FSDataOutputStream是一个包装类,所包装的是DFSOutputStream。可以通过create方法调用不断跟下去,可以发现最终的调用也验证了上述结论,返回的是DFSOutputStream 。 点击进入代码DFSOutputStream dfsos = dfs.create可以发现,DFSOutputStream这个类是从DFSClient类的create方法中返回过来的。 而DFSClient类中的DFSOutputStream实例对象是通过调用DFSOutputStream类的newStreamForCreate方法产生的。 点击进入DFSOutputStream.newStreamForCreate方法,找到了客户端请求NameNode新建元数据的关键代码。代码dfsClient.namenode.create中namenode是ClientProtocol类实例。DFSClient使用ClientProtocol与NameNode守护程序进行通信。

(2)NameNode执行请求操作检查

dfsClient.namenode.create方法会请求namenode在文件系统名称空间中创建一个新的文件条目。点击进入dfsClient.namenode.create方法,可以发现该方法位于ClientProtocol接口中,这是客户端和NameNode之间RPC接口。点击create方法左边绿色I按钮,打开接口的实现类NameNodeRpcServer。 NameNodeRpcServer中status = this.namesystem.startFile就是实际请求创建文件的方法。 继续追踪status = startFileInt方法。 可以发现,请求创建文件的时候,namenode会执行各种检查判断:目标文件是否存在、父目录是否存在、客户端是否具有创建该文件的权限。 检查通过,namenode就会为创建新文件记录一条记录。否则,文件创建失败并向客户端抛出一个IOException。

(3)DataStreamer类

在DFSOutputStream.newStreamForCreate方法的最后发现了DFSOutputStream的创建;并且在返回之前,调用了out对象的start方法。 点进start方法,发现调用的是DataStreamer对象的start方法。 DataStreamer类是DFSOutputSteam的一个内部类,本质是一个线程。

(4)DataStreamer写数据

在DataStreamer的run方法中实现了数据写入的关键代码,概况起来:

DataStreamer类负责将数据包发送到pipeline(管道)中的datanodes节点上。它从namenode检索新的blockid和块位置,然后开始将数据包流传输到Datanodes的管道中。每个数据包都有与之关联的序列号。当发送完一个数据块的所有数据包并接收到每个数据包的确认后,DataStreamer将关闭当前数据块。DataStreamer线程从dataQueue中拾取数据包,将其发送到管道中的第一个datanode,然后将其从dataQueue中移动到ackQueue中。ResponseProcessor从数据节点接收ack信息。当从所有数据节点接收到成功的数据包确认后,ResponseProcessor将从ackQueue中删除相应的数据包。

在客户端写入数据时,DFSOutputStream将它分成一个个数据包(packet 默认64kb),并写入一个称之为数据队列(data queue)的内部队列。DataStreamer请求NameNode挑选出适合存储数据副本的一组DataNode。这一组DataNode采用pipeline机制做数据的发送。默认是3副本存储。 DataStreamer将数据包流式传输到pipeline的第一个datanode,该DataNode存储数据包并将它发送到pipeline的第二个DataNode。同样,第二个DataNode存储数据包并且发送给第三个(也是最后一个)DataNode。 DFSOutputStream也维护着一个内部数据包队列来等待DataNode的收到确认回执,称之为确认队列(ack queue)。发送packet的时候,会把packet从data queue移动到ack queue。收到pipeline中所有DataNode确认信息后,该数据包才会从确认队列删除。 客户端完成数据写入后,将在流上调用close()方法关闭。并在联系到NameNode告知其文件写入完成之前,等待确认。因为namenode已经知道文件由哪些块组成(DataStream请求分配数据块),因此它仅需等待最小复制块即可成功返回。数据块最小复制是由参数dfs.namenode.replication.min指定,默认是1.

6 数据写出(流式)

@Test

public void Read() throws IOException {

//调用open方法读取文件

FSDataInputStream in = fs.open(new Path("/ithiema/1.txt"));

//创建本地文件输出流

FileOutputStream out = new FileOutputStream("E:\\output\\1.txt");

//IO工具类实现流的拷贝

IOUtils.copy(in,out);

}

源码分析

(1)客户端请求NameNode

客户端通过调用DistributedFileSystem对象上的open()来打开希望读取的文件。DistributedFileSystem为客户端返回FSDataInputStream输入流对象。通过源码注释可以发现FSDataInputStream是一个包装类,所包装的是DFSInputStream。通过open方法调用不断跟下去,可以发现最终的调用也验证了上述结论,返回的是DFSInputStream。 点击进入代码DFSInputStream dfsis =dfs.open可以发现,DFSInputStream这个类是从DFSClient类的open方法中返回过来的。该输入流从namenode获取block的位置信息。

(2)getLocatedBlocks

在dfs.open方法中,有一个核心方法调用叫做getLocatedBlocks,见名知意,该方法是用于获取块位置信息的。getLocatedBlocks方法调用返回的结果是LocatedBlocks。 lLocatedBlocks封装了文件block的位置信息。 点击getLocatedBlocks进去之后发现,最终调用的是callGetBlockLocations: 在callGetBlockLocations内部,最终是通过namenode.getBlockLocations(src, start, length)请求namenode获取文件数据块位置信息的。 关于namenode.getBlockLocations中的namenode不用过多解释了,是ClientProtocol对象的实例。用户代码通过DistributedFileSystem类使用ClientProtocol与NameNode进行通信。

(3)NameNode返回block信息

对于每个block,namenode返回具有该块副本的datanode的地址,并且datanode根据块与客户端的距离进行排序。注意此距离指的是网络拓扑中的距离。比如客户端的本身就是一个DataNode,那么从本地读取数据明显比跨网络读取数据效率要高。 getBlockLocations方法在源码注释上也描述了这段逻辑,大意是:

获取指定范围内指定文件的块位置。 每个块的DataNode位置按与客户端的接近程度进行排序。返回LocatedBlocks,其中包含文件长度,块及其位置。每个块的DataNode位置按到客户端地址的距离排序。然后,客户端将必须联系指示的DataNode之一以获得实际数据。

(4)客户端读取数据

DFSClient在获取到block的位置信息之后,继续调用openInternal方法。 点击进入该方法可以发现,分了两种不同的输入流。这取决于文件的存储策略是否采用EC纠删码。如果未使用EC编码策略存储,那么直接创建DFSInputStream。在DFSInputStream输入流中,封装了之前获取到的block位置信息。 客户端调用DFSInputStream的read()方法,连接到文件中第一个块的最近的DataNode节点(最优的)读取数据块。数据会以数据包packet为单位从数据节点通过流式接口传送到客户端。当达到一个数据块的末尾时,DFSInputStream再次调用getBlockLocations获取文件的下一个数据块的位置信息,并建立和这个新的数据块的最优节点之间的连接,然后客户端就可以继续读数据。这些操作对用户来说是透明的。所以用户感觉起来它一直在读取一个连续的流。 客户端完成文件读取后,就对FSDataInputStream调用close()方法关闭输入流。如果DFSInputStream与DataNode通信时遇到错误,它将尝试该块的下一个最接近的DataNode读取数据。并将记住发生故障的DataNode,保证以后不会反复读取该DataNode后续的块。此外,DFSInputStream也会通过校验和(checksum)确认从DataNode发来的数据是否完整。如果发现有损坏的块,DFSInputStream会尝试从其他DataNode读取该块的副本,也会将被损坏的块报告给namenode 。

相关阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: