当前位置: 首页 > news >正文

深入HDFS——数据上传源码

引入

就如RPC篇章里提到的观点一样,任何一种能广为传播的技术,都是通过抽象和封装的思想,屏蔽底层底层复杂实现,提供简单且强大的工具,来降低使用门槛的。

HDFS的风靡自然也是如此。

通过前面深入了NameNode和DataNode的启动源码,我们已经是略有体会,但重启毕竟属于工作时几乎遇不到的场景,所以今天我们从HDFS最常用的上传功能入手,去看看HDFS是如何实现的。

数据上传过程

既然是我们要使用的功能,自然需要我们自己动手编写向HDFS中写入数据的代码啦。

下面我写了一个简单的写入数据代码:

public class WriteDataToHDFS {public static void main(String[] args) throws IOException, InterruptedException{Configuration conf = new Configuration();//创建FileSystem对象FileSystem fs= FileSystem.get(URI.create("hdfs://hadoop1:8020/"),conf,"root");//创建HDFS文件路径Path path = new Path("/chaos.txt");FSDataOutputStream out = fs.create(path);//向HDFS中写出数据out.write("hello chaos".getBytes());}
}

可以看到,实现的代码是很简单的。

使用确实是很容易的,那么底层实现是怎样的呢?

我们先通过前面的了解,先来梳理一下,客户端向 HDFS写入数据的实现流程:

  1. 客户端与NameNode进行通信,获取数据写入HDFS中对应哪些DataNode节点;
  2. 在客户端将数据划分成packet传输到HDFS各个DataNode节点上。

看起来好像也不难的样子?

但实际上底层实现可没那么简单。下面我们会从以下几个模块去深入源码,一起看看HDFS是如何实现数据上传的,里面又有哪些有意思的细节。

  1. 创建文件系统并初始化DFSClient
  2. 连接NameNode创建目录
  3. 启动DataStreamer线程
  4. 向dataQueue队列中写入packet
  5. 设置副本写入策略源码
  6. 客户端与DataNode建立socket通信
  7. 向Datanode中上传数据

1.创建文件系统并初始化DFSClient

操作HDFS前需要创建文件系统,并初始化DFSClient对象,该对象中持有与NameNode通信的NameNode Rpc Proxy。

而DFSClient对象的创建代码如下:

FileSystem fileSystem = FileSystem.get(URI.create("hdfs://hadoop1:8020/"),conf,"root");

这里也可以通过FileSystem.newInstance(conf)创建,不过殊途同归,底层实现是类似的。

FileSystem.get()具体源码如下:

public static FileSystem get(final URI uri, final Configuration conf,final String user) throws IOException, InterruptedException {String ticketCachePath =conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);UserGroupInformation ugi =UserGroupInformation.getBestUGI(ticketCachePath, user);return ugi.doAs(new PrivilegedExceptionAction() {@Overridepublic FileSystem run() throws IOException {//创建分布式文件系统及初始化DFSClientreturn get(uri, conf);}});
}

继续往里走,来看get(uri, conf)方法源码如下:

public static FileSystem get(URI uri, Configuration conf) throws IOException {... ...//创建分布式文件系统及初始化DFSClientreturn CACHE.get(uri, conf);
}

CACHE.get(uri,conf)又调用到如下源码:

FileSystem get(URI uri, Configuration conf) throws IOException{Key key = new Key(uri, conf);//创建分布式文件系统及初始化DFSClientreturn getInternal(uri, conf, key);
}

getInternal方法源码如下:

private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{FileSystem fs;synchronized (this) {fs = map.get(key);}if (fs != null) {return fs;}// 创建分布式文件系统及初始化DFSClientfs = createFileSystem(uri, conf);... ...return fs;
}

从上面源码可以看到,getInternal方法中,核心就在createFileSystem方法,这个方法会创建分布式文件系统及初始化DFSClient。

关于这个文件系统对应的类是什么,其实直接跟着代码找还是很难找到的,但是熟悉面向对象知识的小伙伴,肯定一看就知道,这个类一定是FileSystem的实现类。

那我们就先来看看FileSystem的注释:

An abstract base class for a fairly generic filesystem. It may be implemented as a distributed filesystem, or as a "local" one that reflects the locally-connected disk. The local version exists for small Hadoop instances and for testing.

All user code that may potentially use the Hadoop Distributed File System should be written to use a FileSystem object. The Hadoop DFS is a multi-machine system that appears as a single disk. It's useful because of its fault tolerance and potentially very large capacity.

The local implementation is {@link LocalFileSystem} and distributed implementation is DistributedFileSystem.

翻译:

这是一个相当通用的文件系统的抽象基类。它可能被实现为分布式文件系统,或者是反映本地连接磁盘的 “本地” 文件系统。本地版本适用于小型 Hadoop 实例和测试。

 

所有可能潜在使用 Hadoop 分布式文件系统的用户代码,都应编写为使用文件系统对象。Hadoop DFS 是一个多机系统,表现得像单个磁盘。它很有用,因为其具有容错性和潜在的非常大的容量。

 

本地实现是 {@link LocalFileSystem} ,分布式实现是 DistributedFileSystem 。

这下子就清晰了,我们这里创建的分布式文件系统类自然就是DistributedFileSystem类。(org.apache.hadoop.hdfs.DistributedFileSystem

我们回来接着看createFileSystem的源码如下:

private static FileSystem createFileSystem(URI uri, Configuration conf) throws IOException {//创建的 class 为 org.apache.hadoop.hdfs.DistributedFileSystemClass<?> clazz = getFileSystemClass(uri.getScheme(), conf);//初始化分布式文件系统FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);//调用到 DistributedFileSystem 中的 initialize方法,初始化创建DFSClientfs.initialize(uri, conf);return fs;
}

可以看到,是通过ReflectionUtils.newInstance(clazz, conf) 获取的这个文件系统类。

但在执行ReflectionUtils.newInstance(clazz, conf) 前,通过getFileSystemClass(uri.getScheme(), conf) 获取clazz才是重点,其对应源码如下:

public static Class<? extends FileSystem> getFileSystemClass(String scheme,Configuration conf) throws IOException {if (!FILE_SYSTEMS_LOADED) {//将 FileSystem 抽象类的所有实现类中的 schema,和对应实现类,放入 SERVICE_FILE_SYSTEMS 对象中loadFileSystems();}Class<? extends FileSystem> clazz = null;if (conf != null) {// 从配置中获取 fs.hdfs.impl// 如果配置文件中没有配置 fs.hdfs.impl 那么获取的clazz 为 null// 3.x这里会有些小区别clazz = (Class<? extends FileSystem>) conf.getClass("fs." + scheme + ".impl", null);}if (clazz == null) {//获取的clazz 为 org.apache.hadoop.hdfs.DistributedFileSystemclazz = SERVICE_FILE_SYSTEMS.get(scheme);}if (clazz == null) {throw new IOException("No FileSystem for scheme: " + scheme);}return clazz;
}

以上代码中loadFileSystems(),会将 FileSystem 抽象类的所有实现类中的 schema,和对应实现类,放入 SERVICE_FILE_SYSTEMS 对象中。

loadFileSystems()实现源码如下:

private static void loadFileSystems() {synchronized (FileSystem.class) {if (!FILE_SYSTEMS_LOADED) {//ServiceLoader.load(FileSystem.class)会加载所有 FileSystem 实现类中的schema信息ServiceLoader<FileSystem> serviceLoader = ServiceLoader.load(FileSystem.class);for (FileSystem fs : serviceLoader) {//将所有文件系统的 schema信息存入 SERVICE_FILE_SYSTEMS Map中SERVICE_FILE_SYSTEMS.put(fs.getScheme(), fs.getClass());}FILE_SYSTEMS_LOADED = true;}}
}

getFileSystemClass 方法中首先从配置文件中获取 fs.hdfs.impl 配置的HDFS类,默认在HDFS中没有配置该属性,该属性也没有默认值,所以得到clazz为null,进而执行 SERVICE_FILE_SYSTEMS.get(scheme) 得到的clazz为org.apache.hadoop.hdfs.DistributedFileSystem。

FileSystem.createFileSystem() 中执行 fs.initialize(uri, conf) 时,这里的fs就是org.apache.hadoop.hdfs.DistributedFileSystem类,所以相当于执行的是DistributedFileSystem.initialize

对应实现与源码如下:

public void initialize(URI uri, Configuration conf) throws IOException {super.initialize(uri, conf);setConf(conf);String host = uri.getHost();if (host == null) {throw new IOException("Incomplete HDFS URI, no host: "+ uri);}homeDirPrefix = conf.get(DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_KEY,DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_DEFAULT);//创建DFSClient,传入的URI 为NameNode URIthis.dfs = new DFSClient(uri, conf, statistics);this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());this.workingDir = getHomeDirectory();
}

通过以上代码,会创建DFSClient对象,并在创建DFSClient对象时创建NameNode Rpc Proxy对象,并赋值给其属性namenode,方便后续客户端和NameNode进行通信。

具体的 new DFSClient构造如下:

public DFSClient(URI nameNodeUri, Configuration conf, FileSystem.Statistics stats) throws IOException {//创建DFSClient ,传入了 NameNode的URIthis(nameNodeUri, null, conf, stats);
}

this调用到DFSClient实现,其中创建了NameNode Rpc Proxy 并赋值给了namenode属性。

public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,Configuration conf, FileSystem.Statistics stats) throws IOException {... ...//获取NameNode Rpc ProxyproxyInfo = NameNodeProxiesClient.createProxyWithLossyRetryHandler(conf,nameNodeUri, ClientProtocol.class, numResponseToDrop,nnFallbackToSimpleAuth);... ...//给DFSClient中的namenode 赋值 NameNode的Rpc Proxy对象this.namenode = proxyInfo.getProxy();... ...
}

后续客户端可以通过DFSClient.namenode获取到NameNode的RPC Proxy对象与NameNode进行通信。

2.连接NameNode创建目录

在我们编写的代码执行到 fs.create(path) 时,会在HDFS中创建目录并准备dataQueue,dataQueue用于客户端数据传输队列,并最后返回 FSDataOutputStream 对象,该对象用于向HDFS中写数据。

跟进fs.create() 源码一层层对象包装,会发现该create方法最终实际调到 DistributedFileSystem.create()方法,其源码如下:

@Override
public FSDataOutputStream create(Path f, FsPermission permission,boolean overwrite, int bufferSize, short replication, long blockSize,Progressable progress) throws IOException {//返回 FSDataOutputStream 对象return this.create(f, permission,overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE): EnumSet.of(CreateFlag.CREATE), bufferSize, replication,blockSize, progress, null);
}

以上create方法会继续调用到DistributedFileSystem.create()方法,只是参数不同,源码如下:

@Override
public FSDataOutputStream create(final Path f, final FsPermission permission,final EnumSet<CreateFlag> cflags, final int bufferSize,final short replication, final long blockSize, final Progressable progress,final ChecksumOpt checksumOpt) throws IOException {... ...return new FileSystemLinkResolver<FSDataOutputStream>() {@Overridepublic FSDataOutputStream doCall(final Path p)throws IOException, UnresolvedLinkException {//创建了一个DFSOutputStream,做了很多初始化操作/***  1.往文件目录树里面添加了INodeFile*  2.添加了契约管理*  3.启动了DataStreamer(写数据流程的关键服务)*///执行dfs.create方法,最终调用到 DFSClient.create方法final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,cflags, replication, blockSize, progress, bufferSize,checksumOpt);//FSDataOutputStream 是DFSOutputStream 进行了再一次的封装。【装饰模式】return dfs.createWrappedOutputStream(dfsos, statistics);}... ...
}

3.x版本,在上面代码最后会 return safelyCreateWrappedOutputStream(dfsos),感兴趣的小伙伴可以深入看看它们的区别。

以上代码执行dfs.create方法,最终调用到 DFSClient.create方法:

public DFSOutputStream create(String src, FsPermission permission, EnumSet<CreateFlag> flag, short replication,long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt) throws IOException {return create(src, permission, flag, true, replication, blockSize, progress, buffersize, checksumOpt, null);
}

DFSClient.create方法又经过一些列参数包装,最终调用到如下源码:

public DFSOutputStream create(String src, FsPermission permission, EnumSet<CreateFlag> flag, boolean createParent,short replication, long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt,InetSocketAddress[] favoredNodes) throws IOException {checkOpen();if (permission == null) {permission = FsPermission.getFileDefault();}FsPermission masked = permission.applyUMask(dfsClientConf.uMask);if (LOG.isDebugEnabled()) {LOG.debug(src + ": masked=" + masked);}// newStreamForCreate中获取到NameNoae Rpc Proxy 代理对象并连接创建目录,然后启动DataStreamer 线程用于接收客户端上传的packet/*** 总结:* 1.往文件目录树里面添加了文件* 2.添加了契约* 3.启动了DataStreamer*/final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this, src, masked, flag, createParent,replication, blockSize, progress, buffersize, dfsClientConf.createChecksum(checksumOpt),getFavoredNodesStr(favoredNodes));// 开启续约(契约)beginFileLease(result.getFileId(), result);return result;
}

以上代码我补充了很多注释,这里就不赘述了,下面接着看newStreamForCreate实现源码如下:

static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, FsPermission masked,EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, Progressable progress,int buffersize, DataChecksum checksum, String[] favoredNodes) throws IOException {... ...//重试的代码结构while (shouldRetry) {shouldRetry = false;try {/*** HDFS原理总结:* 创建目录:就是在 目录树(元数据)上面添加一个子Node (INodeDirectory)* 上传文件:*     1.在目录树里面添加一个字Node(InodeFile)*     2.再往文件里面写数据*     更新了元数据*     添加了契约*  往目录树里添加InodeFile,记录元数据日志和添加契约*  这儿都是需要跟Namenode的服务端进行交互的*/// dfsClient.namenode 就是 NameNode Rpc Proxy 对象,调用的create方法,调用到NameNodeRpcServer.create方法stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,new EnumSetWritable<CreateFlag>(flag), createParent, replication, blockSize,SUPPORTED_CRYPTO_VERSIONS);break;} catch (RemoteException re) {IOException e = re.unwrapRemoteException(AccessControlException.class,DSQuotaExceededException.class, FileAlreadyExistsException.class,FileNotFoundException.class, ParentNotDirectoryException.class,NSQuotaExceededException.class, RetryStartFileException.class, SafeModeException.class,UnresolvedPathException.class, SnapshotAccessControlException.class,UnknownCryptoProtocolVersionException.class);if (e instanceof RetryStartFileException) {//重试if (retryCount > 0) {shouldRetry = true;retryCount--;} else {throw new IOException("Too many retries because of encryption" + " zone operations", e);}} else {throw e;}}}Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");//普通写文件策略,out对象是DFSOutputStream//该DFSOutputStream构造中会创建DataStreamer 线程,负责向HDFS中写数据final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat, flag, progress, checksum,favoredNodes);//启动DataStreamer 线程 ,运行run方法out.start();return out;... ...
}

以上代码dfsClient.namenode.create()方法会通过NameNode Rpc Proxy 对象调用到NameNodeRpcServer.create方法,然后在HDFS中经过一些目录和权限判断来创建对应目录。NameNodeRpcServer.create源码如下:

@Override // ClientProtocol 客户端创建文件
public HdfsFileStatus create(String src, FsPermission masked,String clientName, EnumSetWritable<CreateFlag> flag,boolean createParent, short replication, long blockSize, CryptoProtocolVersion[] supportedVersions)throws IOException {//检查namenoe启动状态checkNNStartup();... ...//创建文件核心代码status = namesystem.startFile(src, perm, clientName, clientMachine,flag.get(), createParent, replication, blockSize, supportedVersions,cacheEntry != null);... ...return status;
}

以上startFile源码如下:

HdfsFileStatus startFile(String src, PermissionStatus permissions,String holder, String clientMachine, EnumSet<CreateFlag> flag,boolean createParent, short replication, long blockSize, CryptoProtocolVersion[] supportedVersions, boolean logRetryCache)throws AccessControlException, SafeModeException,FileAlreadyExistsException, UnresolvedLinkException,FileNotFoundException, ParentNotDirectoryException, IOException {... ...//创建文件目录status = startFileInt(src, permissions, holder, clientMachine, flag,createParent, replication, blockSize, supportedVersions,logRetryCache);... ...return status;
}

startFileInt实现源码如下:

private HdfsFileStatus startFileInt(String src,PermissionStatus permissions, String holder, String clientMachine,EnumSet flag, boolean createParent, short replication,long blockSize, CryptoProtocolVersion[] supportedVersions,String ecPolicyName, String storagePolicy, boolean logRetryCache)throws IOException {... ...//创建文件目录stat = FSDirWriteFileOp.startFile(this, iip, permissions, holder,clientMachine, flag, createParent, replication, blockSize, feInfo,toRemoveBlocks, shouldReplicate, ecPolicyName, storagePolicy,logRetryCache);... ...
}

以上代码中startFile实现如下:

static HdfsFileStatus startFile(FSNamesystem fsn, INodesInPath iip,PermissionStatus permissions, String holder, String clientMachine,EnumSet flag, boolean createParent,short replication, long blockSize,FileEncryptionInfo feInfo, INode.BlocksMapUpdateInfo toRemoveBlocks,boolean shouldReplicate, String ecPolicyName, String storagePolicy,boolean logRetryEntry)throws IOException {... ...//创建文件iip = addFile(fsd, parent, iip.getLastLocalName(), permissions,replication, blockSize, holder, clientMachine, shouldReplicate,ecPolicyName, storagePolicy);... ...
}

addFile中最终会执行 fsd.addINode(existing, newNode, permissions.getPermission()) 向HDFS中添加目录信息。

3.启动DataStreamer线程

顺着流程就走到 DFSOutputStream.newStreamForCreate() 源码的下半部分:

static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,FsPermission masked, EnumSet flag, boolean createParent,short replication, long blockSize, Progressable progress,DataChecksum checksum, String[] favoredNodes, String ecPolicyName,String storagePolicy)throws IOException {... ...// 普通写文件策略,out对象是DFSOutputStream// 该DFSOutputStream构造中会创建DataStreamer 线程,负责向HDFS中写数据out = new DFSOutputStream(dfsClient, src, stat,flag, progress, checksum, favoredNodes, true);... ...//启动DataStreamer 线程 ,运行run方法out.start();return out;
}

当向NameNode连接创建目录后,会执行new DFSOutputStream()创建DFSOutputStream对象并最终返回,在创建该对象的构造中同时创建了DataStreamer对象并赋值给streamer属性,DataStreamer对象负责后续接收客户端上传数据并将数据发送pipeline方式发送到DataNode上,该对象为一个线程,创建DFSOutputStream对象完成后会执行out.start()方法进行启动。

new DFSOutputStream实现源码如下:

private DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, EnumSet<CreateFlag> flag,Progressable progress, DataChecksum checksum, String[] favoredNodes) throws IOException {this(dfsClient, src, progress, stat, checksum);this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);//计算写入数据包的大小,默认每个packetSize大小为64kbcomputePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);//创建 DataStreamer 对象负责 向HDFS中写入数据streamer = new DataStreamer(stat, null);if (favoredNodes != null && favoredNodes.length != 0) {streamer.setFavoredNodes(favoredNodes);}
}

3.x对应源码有一些小区别

protected DFSOutputStream(DFSClient dfsClient, String src,HdfsFileStatus stat, EnumSet flag, Progressable progress,DataChecksum checksum, String[] favoredNodes, boolean createStreamer) {... ...computePacketChunkSize(dfsClient.getConf().getWritePacketSize(),bytesPerChecksum);... ...streamer = new DataStreamer(stat, null, dfsClient, src, progress,checksum, cachingStrategy, byteArrayManager, favoredNodes,addBlockFlags);... ...
}

可以看到,在DFSOutputStream创建同时,获取了后续写入数据时的packet大小(默认为64K),并给其streamer属性初始化了DataStreamer值(DataStreamer是一个线程)。

当创建好 DFSOutputStream对象后赋值给out对象,当执行out.start()方法时,实际上执行的就是streamer.start,由于DataStreamer是一个线程,所以最终调用到其中的run方法。

我们可以先看一下源码注释:

DFSOutputStream creates files from a stream of bytes.

The client application writes data that is cached internally by this stream.Data is broken up into packets, each packet is typically 64K in size. A packet comprises of chunks. Each chunk is typically 512 bytes and has an associated checksum with it.

 
When a client application fills up the currentPacket, it is enqueued into dataQueue. The DataStreamer thread picks up packets from the dataQueue, sends it to the first datanode in the pipeline and moves it from the dataQueue to the ackQueue. The ResponseProcessor receives acks from the datanodes. When an successful ack for a packet is received from all datanodes, the ResponseProcessor removes the corresponding packet from the ackQueue.

In case of error, all outstanding packets and moved from ackQueue. A new pipeline is setup by eliminating the bad datanode from the original pipeline.The DataStreamer now starts sending packets from the dataQueue.

翻译:

DFSOutputStream 从字节流创建文件。

 

客户端应用程序写入的数据由该流在内部进行缓存。数据被分解为数据包,每个数据包通常大小为 64K。一个数据包由数据块组成。每个数据块通常为 512 字节,并带有相关的校验和。

 

当客户端应用程序填满当前数据包时,它会被排入数据队列。数据流式处理线程从数据队列中取出数据包,将其发送到管道中的第一个数据节点,并将其从数据队列移动到确认队列。响应处理器从数据节点接收确认。当从所有数据节点收到一个数据包的成功确认时,响应处理器从确认队列中删除相应的数据包。

 

在出现错误的情况下,所有未完成的数据包都会从确认队列中移出。通过从原始管道中排除有问题的数据节点来设置新的管道。数据流式处理线程现在开始从数据队列发送数据包。

DataStreamer.run方法的源码如下:

/** streamer thread is the only thread that opens streams to datanode, and closes* them. Any error recovery is also done by this thread.* 数据流式处理线程是唯一打开与数据节点的流并关闭它们的线程。任何错误恢复也由这个线程完成。*/
@Override
public void run() {... ...synchronized (dataQueue) {// wait for a packet to be sent.// 等待packet 放入到  dataQueue,packet当客户端写入数据时才会放入到dataQueuelong now = Time.monotonicNow();//第一次进来的时候,因为没有数据所以代码走的是这儿// dataQueue.size() == 0while ((!streamerClosed && !hasError && dfsClient.clientRunning && dataQueue.size() == 0&& (stage != BlockConstructionStage.DATA_STREAMING|| stage == BlockConstructionStage.DATA_STREAMING&& now - lastPacket < dfsClient.getConf().socketTimeout / 2))|| doSleep) {long timeout = dfsClient.getConf().socketTimeout / 2 - (now - lastPacket);timeout = timeout <= 0 ? 1000 : timeout;timeout = (stage == BlockConstructionStage.DATA_STREAMING) ? timeout : 1000;try {//如果dataQueue里面没有数据,代码就会阻塞在这儿。dataQueue.wait(timeout);//notify} catch (InterruptedException e) {DFSClient.LOG.warn("Caught exception ", e);}doSleep = false;now = Time.monotonicNow();}... ...... ...//获取待发送的数据包one = dataQueue.getFirst(); // regular data packet... ...// get new block from namenode./*** 建立数据管道* 向NameNode申请Block*/if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {if (DFSClient.LOG.isDebugEnabled()) {DFSClient.LOG.debug("Allocating new block");}//步骤一:建立数据管道/*** nextBlockOutputStream 这个方法里面完成了两个事:* 1.向Namenode申请block* 2.建立数据管道*/setPipeline(nextBlockOutputStream());//步骤二:启动了ResponseProcessor 用来监听我们一个packet发送是否成功initDataStreaming();} else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {if (DFSClient.LOG.isDebugEnabled()) {DFSClient.LOG.debug("Append to block " + block);}setupPipelineForAppendOrRecovery();initDataStreaming();}long lastByteOffsetInBlock = one.getLastByteOffsetBlock();if (lastByteOffsetInBlock > blockSize) {throw new IOException("BlockSize " + blockSize + " is smaller than data size. "+ " Offset of packet in block " + lastByteOffsetInBlock + " Aborting file " + src);}if (one.isLastPacketInBlock()) {// wait for all data packets have been successfully ackedsynchronized (dataQueue) {while (!streamerClosed && !hasError && ackQueue.size() != 0 && dfsClient.clientRunning) {try {// wait for acks to arrive from datanodes// dataQueue 中目前没有数据,进入等待状态dataQueue.wait(1000);} catch (InterruptedException e) {DFSClient.LOG.warn("Caught exception ", e);}}}if (streamerClosed || hasError || !dfsClient.clientRunning) {continue;}stage = BlockConstructionStage.PIPELINE_CLOSE;}// send the packetSpan span = null;synchronized (dataQueue) {// move packet from dataQueue to ackQueueif (!one.isHeartbeatPacket()) {span = scope.detach();one.setTraceSpan(span);//步骤三:从dataQueue把要发送的这个packet移除出去dataQueue.removeFirst();//步骤四:然后往ackQueue里面添加这个packetackQueue.addLast(one);dataQueue.notifyAll();}}... ...//这个就是我们写数据代码one.writeTo(blockStream);blockStream.flush();... ...
}

以上代码中 dataQueue是一个Linkedlist对象,该对象会一直处于while循环中等待客户端上传文件的packet,当有数据放入该LinkedList后,会从该对象中获取一个个的packet写出到DN中。

4.向dataQueue队列中写入packet

向HDFS写入数据是通过执行自己编写代码out.write("hello chaos".getBytes())实现的。out对象为DFSOutputStream对象,所以write方法优先找该对象中的write方法,但是发现DFSOutputStream对象中没有write方法,所以找到DFSOutputStream对象的父类FSOutputSummer.write方法,因此最终执行到FSOutputSummer.write方法实现,其源码如下:

@Override
public synchronized void write(int b) throws IOException {buf[count++] = (byte)b;if(count == buf.length) {//刷新缓冲区,写出数据flushBuffer();}
}

以上代码中flushBuffer()实现源码如下:

protected synchronized void flushBuffer() throws IOException {//向packet中写入数据flushBuffer(false, true);
}

flushBuffer方法实现源码如下:

protected synchronized int flushBuffer(boolean keep,boolean flushPartial) throws IOException {... ...// 调用writeChecksumChunks方法将缓冲区的数据写入到输出流,并进行校验和writeChecksumChunks(buf, 0, lenToFlush);... ...
}

以上writeChecksumChunks()方法主要就是对写入buffer数据进行校验和生成并与数据一并写入packet。

writeChecksumChunks实现源码如下:

// 为给定的数据块生成校验和,并将输出块和校验和写入底层输出流
private void writeChecksumChunks(byte b[], int off, int len)
throws IOException {... ...//根据数据块的大小,计算数据块的校验和sum.calculateChunkedSums(b, off, len, checksum, 0);.. ...//将当前数据块和对应的校验块写入到底层输出流中writeChunk(b, off + i, chunkLen, checksum, ckOffset,getChecksumSize());... ...
}

以上代码中writeChunk()方法最终会调用到DFSOutputStream.writeChunk()实现,其源码如下:

protected synchronized void writeChunk(byte[] b, int offset, int len,byte[] checksum, int ckoff, int cklen) throws IOException {... ...// 将校验和写入当前数据包currentPacket.writeChecksum(checksum, ckoff, cklen);// 将数据块写入当前数据包currentPacket.writeData(b, offset, len);... ...// 如果数据包已满,则将其排队等待传输enqueueCurrentPacketFull();... ...
}

以上代码中,随着数据写入到packet中数据量达到默认64K时,会将packet写入到对应的dataQueue中。

enqueueCurrentPacketFull()方法实现源码如下:

synchronized void enqueueCurrentPacketFull() throws IOException {LOG.debug("enqueue full {}, src={}, bytesCurBlock={}, blockSize={},"+ " appendChunk={}, {}", currentPacket, src, getStreamer().getBytesCurBlock(), blockSize, getStreamer().getAppendChunk(),getStreamer());//当前数据包排队等待传输enqueueCurrentPacket();adjustChunkBoundary();endBlock();
}

以上enqueueCurrentPacket()方法实现原理如下:

void enqueueCurrentPacket() throws IOException {//当前数据包排队等待传输getStreamer().waitAndQueuePacket(currentPacket);currentPacket = null;
}

waitAndQueuePacket()方法实现如下:

void waitAndQueuePacket(DFSPacket packet) throws IOException {synchronized (dataQueue) {... ...//将当前packet 放入 dataQueue 中queuePacket(packet);... ...
}

queuePacket()实现代码如下:

void queuePacket(DFSPacket packet) {synchronized (dataQueue) {if (packet == null) return;packet.addTraceParent(Tracer.getCurrentSpan());//将packet 加入到dataQueue LinkedList 中dataQueue.addLast(packet);lastQueuedSeqno = packet.getSeqno();LOG.debug("Queued {}, {}", packet, this);//notifyAll()方法通知所有正在等待dataQueue对象锁的线程,告诉它们数据队列已经有数据包放入,可以继续执行dataQueue.notifyAll();}
}

以上代码中dataQueue.addLast(packet)就是将packet 加入到dataQueue LinkedList 中,当执行到dataQueue.notifyAll()时,会通知所有正在等待dataQueue对象锁的线程,告诉它们数据队列已经有数据包放入,可以继续执行。

5.副本放置策略源码

下面我们回到DataStreamer.run方法源码,该部分代码3.x和2.x有很多区别,上面已经贴过2.x实现,下面我们看看3.x版本是如何实现的:

public void run() {... ...synchronized (dataQueue) {// wait for a packet to be sent.//等待packet 放入到  dataQueue,packet当客户端写入数据时才会放入到dataQueuewhile ((!shouldStop() && dataQueue.isEmpty()) || doSleep) {... ...//dataQueue 中目前没有数据,进入等待状态dataQueue.wait(timeout);... ...}... ...//获取待发送的数据包one = dataQueue.getFirst(); // regular data packet... ...//构建写数据管道,通过管道连接到第一个DataNode,该DN将数据发送到管道的第二个DN,以此类推//nextBlockOutputStream 方法中连接NameNode申请写入数据的DataNode节点及副本分布策略,并设置客户端与第一个Block块所在的节点的socket连接setPipeline(nextBlockOutputStream());... ...//将packet 以流的方式写入到DataNode节点sendPacket(one);... ...//等待所有ackwaitForAllAcks();... ...
}

以上代码大体逻辑为:当dataQueue中有packet后,会执行one = dataQueue.getFirst()获取packet包并通过sendPacket(one)将packet数据写出到DataNode节点。

客户端向HDFS DataNode写入数据时,默认有3个副本,并且各个DataNode节点之间写出数据都是以pipeline方式依次传递到各个DataNode节点,所以在执行sendPacket(one)写出数据前,会执行setPipeline(nextBlockOutputStream())方法构建写数据管道,通过管道连接到第一个DataNode,将packet数据写入该节点,然后由第二个DataNode依次再将packet传递到第三个DataNode节点,副本多的依次类推。其中nextBlockOutputStream()方法中会连接NameNode申请写入数据的DataNode节点及副本分布策略,并设置客户端与第一个Block块所在的节点的socket连接,方便后续将数据写入到对应的DataNode节点。

nextBlockOutputStream()方法源码如下:

protected LocatedBlock nextBlockOutputStream() throws IOException {... ...//locateFollowingBlock方法中向NameNode申请副本写入的DN节点信息并设置副本策略lb = locateFollowingBlock(excluded.length ]]> 0 ? excluded : null, oldBlock);block.setCurrentBlock(lb.getBlock());... ...//获取Block块所在的所有节点信息nodes = lb.getLocations();... ...//连接到节点列表中的第一个 DataNode 节点并建立客户端与DataNode节点的socket连接,方便后续将数据写入到DataNodesuccess = createBlockOutputStream(nodes, nextStorageTypes, nextStorageIDs,0L, false);... ...
}

以上代码中locateFollowingBlock( excluded.length > 0 ? excluded : null, oldBlock)代码中会向NameNode申请副本写入DN节点的信息并设置副本分布策略。

locateFollowingBlock()源码如下:

private LocatedBlock locateFollowingBlock(DatanodeInfo[] excluded,ExtendedBlock oldBlock) throws IOException {//向NameNode 添加block 块信息return DFSOutputStream.addBlock(excluded, dfsClient, src, oldBlock,stat.getFileId(), favoredNodes, addBlockFlags);
}

以上代码中addBlock方法中会向NameNode申请block分布策略及写入DN节点信息。DFSOutputStream.addBlock()实现源码如下:

static LocatedBlock addBlock(DatanodeInfo[] excludedNodes,DFSClient dfsClient, String src, ExtendedBlock prevBlock, long fileId,String[] favoredNodes, EnumSet allocFlags)throws IOException {... ...//向NameNode申请block分布策略及写入DN节点信息return dfsClient.namenode.addBlock(src, dfsClient.clientName, prevBlock,excludedNodes, fileId, favoredNodes, allocFlags);... ...
}

以上代码中dfsClient.namenode获取到NameNode Rpc Proxy,所以addBlock方法最终会调用到NameNodeRpcServer.addBlock()方法。NameNodeRpcServer.addBlock()源码如下:

//客户端写入数据向NameNode 申请block位置
@Override
public LocatedBlock addBlock(String src, String clientName,ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId,String[] favoredNodes, EnumSet addBlockFlags)throws IOException {//检查NameNode是否启动checkNNStartup();//getAdditionalBlock方法设置副本存储节点策略,返回的 LocatedBlock 对象中包含 block写入数据的DN节点LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, fileId,clientName, previous, excludedNodes, favoredNodes, addBlockFlags);if (locatedBlock != null) {metrics.incrAddBlockOps();}return locatedBlock;
}

以上代码namesystem.getAdditionalBlock()源码如下:

LocatedBlock getAdditionalBlock(String src, long fileId, String clientName, ExtendedBlock previous,DatanodeInfo[] excludedNodes, String[] favoredNodes,EnumSet flags) throws IOException {... ...//为新数据块选择DataNode 节点,有几个副本选择几个节点DatanodeStorageInfo[] targets = FSDirWriteFileOp.chooseTargetForNewBlock(blockManager, src, excludedNodes, favoredNodes, flags, r);... ...
}

以上代码中chooseTargetForNewBlock()会为block找到存储DN节点,源码如下:

static DatanodeStorageInfo[] chooseTargetForNewBlock(BlockManager bm, String src, DatanodeInfo[] excludedNodes,String[] favoredNodes, EnumSet flags,ValidateAddBlockResult r) throws IOException {... ...// 为新数据块选择目标数据节点return bm.chooseTarget4NewBlock(src, r.numTargets, clientNode,excludedNodesSet, r.blockSize,favoredNodesList, r.storagePolicyID,r.blockType, r.ecPolicy, flags);
}

chooseTarget4NewBlock()中会为block选择目标数据节点:

public DatanodeStorageInfo[] chooseTarget4NewBlock(final String src,final int numOfReplicas, final Node client,final Set excludedNodes,final long blocksize,final List favoredNodes,final byte storagePolicyID,final BlockType blockType,final ErasureCodingPolicy ecPolicy,final EnumSet flags) throws IOException {... ...//存放数据副本的节点数组final DatanodeStorageInfo[] targets = blockplacement.chooseTarget(src,numOfReplicas, client, excludedNodes, blocksize, favoredDatanodeDescriptors, storagePolicy, flags);... ...//返回数据存放节点数组return targets;
}

以上代码blockplacement.chooseTarget()方法经过一层层对象封装,最终调用到BlockPlacementPolicyDefault.chooseTarget方法,该方法实现源码如下:

private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,Node writer,List chosenStorage,boolean returnChosenNodes,Set excludedNodes,long blocksize,final BlockStoragePolicy storagePolicy,EnumSet addBlockFlags,EnumMap sTypes) {... ...// 获取每个机架上的最大节点数int[] result = getMaxNodesPerRack(chosenStorage.size(), numOfReplicas);... ...List results = null;... ...//这里的results 与  chosenStorage 完全相同,但是目前没有数据results = new ArrayList<]]>(chosenStorage);//设置副本分布并返回第一个副本要写入的DN节点localNode = chooseTarget(numOfReplicas, writer, excludedNodes,blocksize, maxNodesPerRack, results, avoidStaleNodes,storagePolicy, EnumSet.noneOf(StorageType.class), results.isEmpty(),sTypes);... ...return getPipeline((writer != null && writer instanceof DatanodeDescriptor) ? writer: localNode,results.toArray(new DatanodeStorageInfo[results.size()]));
}

进入以上代码中chooseTarget方法,源码如下:

private Node chooseTarget(final int numOfReplicas,Node writer,final Set excludedNodes,final long blocksize,final int maxNodesPerRack,final List results,final boolean avoidStaleNodes,final BlockStoragePolicy storagePolicy,final EnumSet unavailableStorages,final boolean newBlock,EnumMap storageTypes) {... ...//准备多副本写入的DN节点分布,返回的writer为第一个副本要写入的DN节点writer = chooseTargetInOrder(numOfReplicas, writer, excludedNodes, blocksize,maxNodesPerRack, results, avoidStaleNodes, newBlock, storageTypes);... ...return writer;
}

以上代码中chooseTargetInOrder中实现副本分布并返回第一个副本要写入的DN节点。chooseTargetInOrder源码如下:

protected Node chooseTargetInOrder(int numOfReplicas, Node writer,final Set excludedNodes,final long blocksize,final int maxNodesPerRack,final List results,final boolean avoidStaleNodes,final boolean newBlock,EnumMap storageTypes)throws NotEnoughReplicasException {// 计算结果列表的大小,默认初始 results 为0,result集合表示副本所在的节点final int numOfResults = results.size();// 如果结果列表为空if (numOfResults == 0) {// 选择本地节点作为第一个副本存储位置,并向result中加入节点DatanodeStorageInfo storageInfo = chooseLocalStorage(writer,excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes,storageTypes, true);//writer第一个副本要写出的DataNode节点writer = (storageInfo != null) ? storageInfo.getDatanodeDescriptor(): null;//减去一个副本后,如果为0则返回,writer,否则不返回,继续if (--numOfReplicas == 0) {return writer;}}//第一个副本所在DN节点final DatanodeDescriptor dn0 = results.get(0).getDatanodeDescriptor();if (numOfResults <= 1) {//选择远程机架存放第二个副本chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,results, avoidStaleNodes, storageTypes);if (--numOfReplicas == 0) {//writer第一个副本要写出的DataNode节点return writer;}}if (numOfResults <= 2) {//第二个副本所在DN节点final DatanodeDescriptor dn1 = results.get(1).getDatanodeDescriptor();if (clusterMap.isOnSameRack(dn0, dn1)) {//如果dn0与dn1是同一机架,第三个副本选择不同机架chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,results, avoidStaleNodes, storageTypes);} else if (newBlock){//如果是新块,选择与dn1 第二个副本所在节点相同的机架上放第三个副本chooseLocalRack(dn1, excludedNodes, blocksize, maxNodesPerRack,results, avoidStaleNodes, storageTypes);} else {//随机选择一台节点存储第3个副本chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,results, avoidStaleNodes, storageTypes);}if (--numOfReplicas == 0) {//writer第一个副本要写出的DataNode节点return writer;}}//大于3个副本,随机选择节点存放副本chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,maxNodesPerRack, results, avoidStaleNodes, storageTypes);//writer第一个副本要写出的DataNode节点return writer;
}

chooseTargetInOrder方法代码逻辑为block 副本找到存储节点的策略,然后返回block所在的第一个节点,首先第一个block存储在本机,第二个block存储在远程机架,第三个副本存储时先判断是否第一个副本和第二个副本是否在同一机架,如果在同一机架,那么第三个副本选择不同机架进行存储,否则选择与第二个副本相同机架的随机节点进行存储。最终该方法返回存储第一个副本的DataNode节点。

6.客户端与DataNode建立socket通信

在DataNode启动源码部分,DataNode.initDataXceiver()方法进行初始化DataXceiver服务,该服务是 DataNode 接收客户端请求的核心组件,其核心实现源码如下:

private void initDataXceiver() throws IOException {... ...//TcpPeerServer 对象用于接收来自客户端的传输流量TcpPeerServer tcpPeerServer;... ...//DataXceiverServer 是一个线程xserver = new DataXceiverServer(tcpPeerServer, getConf(), this);//创建DataXceiverServer的后台线程,创建好DataNode后会启动this.dataXceiverServer = new Daemon(threadGroup, xserver);... ...
}

DataNode.crateDataNode()方法中,当DataNode对象创建完成后,当执行dn.runDatanodeDaemon()时会运行DataXceiverServer对象的run方法。

DataXceiverServer.run方法实现源码如下:

public void run() {... ...// 接受客户端的连接请求peer = peerServer.accept();... ...//创建线程并传入peer参数,然后并启动,会调用到DataXceiver.run 方法new Daemon(datanode.threadGroup,DataXceiver.create(peer, datanode, this)).start();... ...
}

以上代码中我们可以看到peerServer.accept()一直接受来自客户端传输数据socket通信,并且new Daemon(datanode.threadGroup,DataXceiver.create(peer, datanode, this)).start()代码中创建了DataXceiver线程并启动,该线程主要从DataXceiverServer中读取socket传入数据并将数据写入到DataNode节点磁盘。

下面继续回到DataStreamer.nextBlockOutputStream()源码中,查看客户端与DataNode节点建立的连接。

DataStreamer.nextBlockOutputStream()方法源码如下:

protected LocatedBlock nextBlockOutputStream() throws IOException {... ...//locateFollowingBlock方法中向NameNode申请副本写入的DN节点信息并设置副本策略lb = locateFollowingBlock(excluded.length ]]> 0 ? excluded : null, oldBlock);block.setCurrentBlock(lb.getBlock());... ...//获取Block块所在的所有节点信息nodes = lb.getLocations();... ...//连接到节点列表中的第一个 DataNode 节点并建立客户端与DataNode节点的socket连接,方便后续将数据写入到DataNodesuccess = createBlockOutputStream(nodes, nextStorageTypes, nextStorageIDs,0L, false);... ...
}

前面执行完locateFollowingBlock()方法,获取到了数据应该写往的DataNode节点后,后续会执行createBlockOutputStream(nodes, nextStorageTypes, nextStorageIDs,0L, false)方法与第一个写出的DataNode节点建立连接。

createBlockOutputStream()实现部分源码如下:

boolean createBlockOutputStream(DatanodeInfo[] nodes,StorageType[] nodeStorageTypes, String[] nodeStorageIDs,long newGS, boolean recoveryFlag) {... ...// 创建客户端用于数据传输管道的Socket,这里传入的nodes[0]就是第一个DataNode节点s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);... ...//当输出流有数据时,通过socket将数据写出到DataNode中... ... 
}

以上createSocketForPipeline(nodes[0], nodes.length, dfsClient)代码就是获取第一个写出数据的block所在的DataNode节点,并建立socket连接。

createSocketForPipeline源码如下:

static Socket createSocketForPipeline(final DatanodeInfo first,final int length, final DFSClient client) throws IOException {... ...//获取第一个 DataNode节点 socket地址final InetSocketAddress isa = NetUtils.createSocketAddr(dnAddr);... ...//客户端连接上 DataNode,DataNode 启动着DataXceiverServer 服务,该服务启动后一直会接收客户端scoket 通信NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(),conf.getSocketTimeout());... ...return sock;
}

7.向Datanode中上传数据

回到 DataStreamr.createBlockOutputStream()方法中,核心源码如下:

boolean createBlockOutputStream(DatanodeInfo[] nodes,StorageType[] nodeStorageTypes, String[] nodeStorageIDs,long newGS, boolean recoveryFlag) {... ...// 创建客户端用于数据传输管道的Socket,这里传入的nodes[0]就是第一个DataNode节点s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);... ...// 获取未缓冲的输出流和输入流OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout);InputStream unbufIn = NetUtils.getInputStream(s, readTimeout);... ...//包装 输出流 unbufOut 到 out 对象中out = new DataOutputStream(new BufferedOutputStream(unbufOut,DFSUtilClient.getSmallBufferSize(dfsClient.getConfiguration())));... ...//DataNode 启动着DataXceiverServer 服务,该服务启动后一直会接收客户端scoket 通信new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,nodes.length, block.getNumBytes(), bytesSent, newGS,checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile,(targetPinnings != null && targetPinnings[0]), targetPinnings,nodeStorageIDs[0], nodeStorageIDs);... ...
}

当写出数据的输出流out中有数据时,会通过new Sender(out).writeBlock()方法将数据发送到DataNode节点,writeBlock()实现具体源码如下:

public void writeBlock(final ExtendedBlock blk,final StorageType storageType,final Token blockToken,final String clientName,final DatanodeInfo[] targets,final StorageType[] targetStorageTypes,final DatanodeInfo source,final BlockConstructionStage stage,final int pipelineSize,final long minBytesRcvd,final long maxBytesRcvd,final long latestGenerationStamp,DataChecksum requestedChecksum,final CachingStrategy cachingStrategy,final boolean allowLazyPersist,final boolean pinning,final boolean[] targetPinnings,final String storageId,final String[] targetStorageIds) throws IOException {... ...//包装socket 流和 操作类型 “WRITE_BLOCK” ,通过socket 发送到DataNode 节点send(out, Op.WRITE_BLOCK, proto.build());
}

以上send方法会将数据发送到DataNode 中,DataNode启动的DataXceiverServer 服务会接收客户端socket通信。

再次回到DataXceiverServer.run()方法源码中:

public void run() {... ...// 接受客户端的连接请求peer = peerServer.accept();... ...//创建线程并传入peer参数,然后并启动,会调用到DataXceiver.run 方法new Daemon(datanode.threadGroup,DataXceiver.create(peer, datanode, this)).start();... ...
}

new Daemon(datanode.threadGroup,DataXceiver.create(peer, datanode, this)).start()代码中会将接受到客户端的连接包装到DataXceiver线程对象中并启动,在DataXceiver.run方法中会对从客户端接收到的数据进行写出到DataNode磁盘处理。

DataXceiver.run方法源码如下:

public void run() {... ...// 初始化操作对象Op op = null;... ...// 初始化输入流InputStream input = socketIn;... ...//读取客户端传入的数据给输入流赋值input = new BufferedInputStream(saslStreams.in,smallBufferSize);... ...// 初始化DataXceiver的输入流 ,就是将 input 流赋值给了Receiver 中的 in 属性,后续使用super.initialize(new DataInputStream(input));... ...//读取输入数据op = readOp();... ...//处理读取过来的数据流processOp(op);... ...
}

以上代码会将从客户端中接收过来的数据包装成数据输入流,最终执行processOp(op)写出到DataNode节点磁盘上。

processOp(op)实现源码如下,op默认从客户端传入类型值为 WRITE_BLOCK:

protected final void processOp(Op op) throws IOException {... ...//从客户端获取过来的操作属性为 “WRITE_BLOCK”case WRITE_BLOCK://向DataNode中写入Block块操作opWriteBlock(in);break;... ...
}

opWriteBlock(in)实现代码如下:

private void opWriteBlock(DataInputStream in) throws IOException {... ...// 调用writeBlock方法处理写入块操作writeBlock(PBHelperClient.convert(proto.getHeader().getBaseHeader().getBlock()),PBHelperClient.convertStorageType(proto.getStorageType()),PBHelperClient.convert(proto.getHeader().getBaseHeader().getToken()),proto.getHeader().getClientName(),targets,PBHelperClient.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length),PBHelperClient.convert(proto.getSource()),fromProto(proto.getStage()),proto.getPipelineSize(),proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),proto.getLatestGenerationStamp(),fromProto(proto.getRequestedChecksum()),(proto.hasCachingStrategy() ?getCachingStrategy(proto.getCachingStrategy()) :CachingStrategy.newDefaultStrategy()),(proto.hasAllowLazyPersist() ? proto.getAllowLazyPersist() : false),(proto.hasPinning() ? proto.getPinning(): false),(PBHelperClient.convertBooleanList(proto.getTargetPinningsList())),proto.getStorageId(),proto.getTargetStorageIdsList().toArray(new String[0]));... ...
}

以上 writeBlock最终调用到DataXceiver.writeBlock()方法,其源码实现如下:

public void writeBlock(...){... ...// 创建blockReceiver 并赋值给 DataXceiver.blockReceiver,后续使用到该对象写出数据到磁盘setCurrentBlockReceiver(getBlockReceiver(block, storageType, in,peer.getRemoteAddressString(),peer.getLocalAddressString(),stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,clientname, srcDataNode, datanode, requestedChecksum,cachingStrategy, allowLazyPersist, pinning, storageId));... ...//发送数据到下游DN节点,对于下游DataNode节点,仍然要走一遍当前节点的流程,形成DataNode 依次向后写出数据new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],blockToken, clientname, targets, targetStorageTypes,srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd,latestGenerationStamp, requestedChecksum, cachingStrategy,allowLazyPersist, targetPinnings[0], targetPinnings,targetStorageId, targetStorageIds);... ...//receiveBlock 会接收packets 将数据写出到磁盘blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut, mirrorAddr,dataXceiverServer.getWriteThrottler(), targets, false);... ...
}

以上代码中new Sender(mirrorOut).writeBlock()这部分代码是将写入到该DataNode节点的packet数据继续写往下个DataNode节点,如果block有多个副本,都是在下一个DataNode节点向后续DN节点发送写出数据。

最终执行blockReceiver.receiveBlock()代码将数据写出到磁盘中,receiverBlock()实现关键源码如下:

void receiveBlock(DataOutputStream mirrOut, // output to next datanodeDataInputStream mirrIn,   // input from next datanodeDataOutputStream replyOut,  // output to previous datanodeString mirrAddr, DataTransferThrottler throttlerArg,DatanodeInfo[] downstreams,boolean isReplaceBlock) throws IOException {... ...//receivePacket负责接收上游的packetwhile (receivePacket() ]]>= 0) { /* Receive until the last packet */ }... ...
}

以上代码中receivePacket()会一直接受从客户端发送过来的packet并写入到DataNode节点磁盘,直到客户端数据传输完毕。

reveiverPacket()关键源码实现如下:

private int receivePacket() throws IOException {... ...//将数据写出到DataNode节点磁盘streams.writeDataToDisk(dataBuf.array(),startByteToDisk, numBytesToDisk);... ...
}

总结

我们一步步把数据上传源码梳理了一边,可以看到,HDFS确实帮我们屏蔽了很多底层的复杂实现逻辑。

今天的内容很多,有很多细节我没有去进一步拓展了,感兴趣的小伙伴可以跟着我的思路再深入拓展看看。

相关文章:

深入HDFS——数据上传源码

引入 就如RPC篇章里提到的观点一样&#xff0c;任何一种能广为传播的技术&#xff0c;都是通过抽象和封装的思想&#xff0c;屏蔽底层底层复杂实现&#xff0c;提供简单且强大的工具&#xff0c;来降低使用门槛的。 HDFS的风靡自然也是如此。 通过前面深入了NameNode和DataN…...

归并排序算法

归并排序 1算法介绍 和选择排序一样&#xff0c;归并排序的性能不受输入数据的影响&#xff0c;但表现比选择排序好的多&#xff0c;因为始终都是O(n log n&#xff09;的时间复杂度。代价是需要额外的内存空间。归并排序是建立在归并操作上的一种有效的排序算法。该算法是采用…...

优雅解决webview_flutter不支持安卓选择图片视频文件问题

这个问题&#xff0c;网上一搜索&#xff0c;就是要去修改别人写好的插件。 搞Flutter开发&#xff0c;尽量不要去修改别人的插件 &#xff0c;特别是像 webview_flutter 这种比较大的官方插件。 相信我&#xff0c;你拿捏不了它。 主要问题就是&#xff1a; webview_flutter…...

Linux UDP 编程详解

一、引言 在网络编程领域&#xff0c;UDP&#xff08;User Datagram Protocol&#xff0c;用户数据报协议&#xff09;作为一种轻量级的传输层协议&#xff0c;具有独特的优势和适用场景。与 TCP&#xff08;Transmission Control Protocol&#xff0c;传输控制协议&#xff0…...

UllnnovationHub,一个开源的WPF控件库

目录 UllnnovationHub1.项目概述2.开发环境3.使用方法4.项目简介1.WPF原生控件1.Button2.GroupBox3.TabControl4.RadioButton5.SwitchButton6.TextBox7.PasswordBox8.CheckBox9.DateTimePicker10.Expander11.Card12.ListBox13.Treeview14.Combox15.Separator16.ListView17.Data…...

[Python学习日记-78] 基于 TCP 的 socket 开发项目 —— 模拟 SSH 远程执行命令

[Python学习日记-78] 基于 TCP 的 socket 开发项目 —— 模拟 SSH 远程执行命令 简介 项目分析 如何执行系统命令并拿到结果 代码实现 简介 在Python学习日记-77中我们介绍了 socket 基于 TCP 和基于 UDP 的套接字&#xff0c;还实现了服务器端和客户端的通信&#xff0c;本…...

css3过渡总结

一、过渡的定义与作用 CSS3 过渡&#xff08;Transitions&#xff09;允许 CSS 属性在一定的时间区间内平滑地过渡&#xff0c;从一个值转变为另一个值。它能够让网页元素的状态变化更加自然、流畅&#xff0c;给用户带来更好的视觉体验。例如&#xff0c;当一个元素从隐藏状态…...

生产环境中常用的设计模式

生产环境中常用的设计模式 设计模式目的使用场景示例单例模式保证一个类仅有一个实例&#xff0c;并提供一个访问它的全局访问点- 日志记录器- 配置管理器工厂方法模式定义一个创建对象的接口&#xff0c;让子类决定实例化哪个类- 各种工厂类&#xff08;如视频游戏工厂模式创…...

【STM32-学习笔记-4-】PWM、输入捕获(PWMI)

文章目录 1、PWMPWM配置 2、输入捕获配置3、编码器 1、PWM PWM配置 配置时基单元配置输出比较单元配置输出PWM波的端口 #include "stm32f10x.h" // Device headervoid PWM_Init(void) { //**配置输出PWM波的端口**********************************…...

游戏行业销售数据分析可视化

完整源码项目包获取→点击文章末尾名片&#xff01; &#x1f31f;分析&#xff1a; 可看出最近五年用户最喜爱的游戏类型依然还是Action-动作类&#xff08;当然市场发行的也很多&#xff09; Sports-运动类和Shooter-射击类顺序互换,但我估计现在大环境局势紧张可以会推动射击…...

微服务网关,如何选择?

什么是API网关 API网关&#xff08;API Gateway&#xff09;是微服务架构中的一个关键组件&#xff0c;它充当了客户端与后端服务之间的中间层。其主要功能包括请求路由、协议转换、负载均衡、安全认证、限流熔断等。通过API网关&#xff0c;客户端无需直接与多个微服务交互&a…...

Java开发提速秘籍:巧用Apache Commons Lang工具库

一、Java 开发效率之困 在当今数字化时代&#xff0c;Java 作为一门广泛应用的编程语言&#xff0c;在各类软件开发项目中占据着举足轻重的地位。无论是大型企业级应用、互联网平台&#xff0c;还是移动应用后端&#xff0c;都能看到 Java 的身影。然而&#xff0c;Java 开发者…...

多用户商城系统源码哪家好用?怎么选?

想拥有属于自己的多用户商城系统&#xff0c;但回头一看自己又是个技术小白&#xff0c;看着网上眼花缭乱的b2b2c商城系统&#xff0c;更是不知道如何选起&#xff1f;下面给大家分享一条较为成熟的选择思路&#xff0c;以作参考。 1、从需求上决定系统方向 企业在选型b2b2c商…...

聊聊如何实现Android 放大镜效果

一、前言 很久没有更新Android 原生技术内容了&#xff0c;前些年一直在做跨端方向开发&#xff0c;最近换工作用重新回到原生技术&#xff0c;又回到了熟悉但有些生疏的环境&#xff0c;真是感慨万分。 近期也是因为准备做地图交互相关的需求&#xff0c;功能非常复杂&#x…...

NVIDIA视频编解码

开源了两个项目&#xff1a;英伟达显卡视频编解码、jetson视频编解码。都是基于官方SDK进行的封装&#xff0c;由于官方自带的demo晦涩难懂并且每块都是独立的&#xff0c;我对SDK进行二次封装并形成了一套较为完整的视频编解码流程&#xff0c;调用简单&#xff0c;有完整的测…...

Mysql事务隔离级

什么是事务的隔离级别 数据库事务的隔离级别是指事务在并发执行时&#xff0c;如何控制事务之间相互影响的程度。它决定了多个事务并发执行时&#xff0c;事务中的操作对其他事务的可见性&#xff0c;进而影响数据的一致性和并发性。 为什么会有隔离级别的概念&#xff1f; …...

K210视觉识别模块

K210视觉识别模块是一款功能强大的AI视觉模块&#xff0c;以下是对其的详细介绍&#xff1a; 一、核心特性 强大的视觉识别功能&#xff1a;K210视觉识别模块支持多种视觉功能&#xff0c;包括但不限于人脸识别、口罩识别、条形码和二维码识别、特征检测、数字识别、颜色识别…...

springboot使用websocket

文章目录 一、概述1、简介 二、 使用1、引包2、配置处理器3、前端测试 一、概述 1、简介 简介略&#xff0c;附上官方文档&#xff0c;spring5和spring6的官方文档内容大致是一样的&#xff1a; https://docs.spring.io/spring-framework/docs/5.2.25.RELEASE/spring-framewo…...

线程池底部工作原理

线程池内部是通过线程和队列实现的&#xff0c;当我们通过线程池处理任务时&#xff1a; 如果线程池中的线程数量小于corePoolSize&#xff0c;无论是否有处于空闲的线程&#xff0c;都创建新的线程来处理被添加的任务。 如果线程池中的线程数量等于corePoolSize&#xff0c;…...

DevUI 2024 年度运营报告:开源生态的成长足迹与未来蓝图

在当今数字化飞速发展的时代&#xff0c;开源已成为推动技术创新与协作的重要力量。DevUI 作为开源领域的重要一员&#xff0c;其发展历程与成果备受关注。值此之际&#xff0c;GitCode 精心整理了 DevUI 年度运营报告&#xff0c;为您全面呈现 DevUI 社区在过去一年里的开源之…...

Mybatis面试题

Mybatis面试题 什么是 MyBatis&#xff1f;讲下 MyBatis 的缓存Mybatis 是如何进行分页的&#xff1f;分页插件的原理是什么&#xff1f;简述 Mybatis 的插件运行原理&#xff0c;以及如何编写一个插件&#xff1f;Mybatis 动态 sql 是做什么的&#xff1f;都有哪些动态 sql&am…...

Python获取系统运行时间

有时候想获取系统启动后到现在的运行时间&#xff0c;在Linux C可以使用clock_gettime()来获得&#xff0c;如下&#xff0c; #include <time.h> #include <stdio.h>int main() {struct timespec ts;clock_gettime(CLOCK_MONOTONIC, &ts);printf("syste…...

软考高级5个资格、中级常考4个资格简介及难易程度排序

一、软考高级5个资格 01、网络规划设计师 资格简介&#xff1a;网络规划设计师要求考生具备全面的网络规划、设计、部署和管理能力&#xff1b;该资格考试适合那些在网络规划和设计方面具有较好理论基础和较丰富从业经验的人员参加。 02、系统分析师 资格简介&#xff1a;系统分…...

【18】Word:明华中学-儿童医保❗

目录 题目​ NO2 NO3 NO4 NO5 NO6 NO7 NO8 NO9 题目 NO2 布局→页面设置对话框→纸张方向&#xff1a;横向→纸张大小&#xff1a;A3 &#xff1b;页面设置对话框&#xff1a;直接输入纸张大小的宽度和高度即可→页面设置对话框&#xff1a;上下左右边距→版式&…...

Vue 中实现修改数组,并保持页面渲染数据是响应式更改

如果你在 Vue 中使用数组并希望确保对数组项的修改是响应式的&#xff0c;直接替换数组项可能不会触发 Vue 的响应式更新。为了确保响应式更新&#xff0c;你可以使用 Vue 提供的 Vue.set() 方法&#xff08;在 Vue 2 中&#xff09;或使用 this.$set() 方法&#xff08;在 Vue…...

MATLAB算法实战应用案例精讲-【数模应用】图形变换和复杂图形组合(附python和MATLAB代码实现)

目录 前言 算法原理 变换 1二维变换 1.1缩放 1.2 翻转 1.3剪切 1.4 旋转 2齐次坐标 2.1引入齐次坐标的原因 2.2 二维齐次坐标 2.3二维仿射变换 2.4逆变换 4组合变换 5三维变换(由二维变换推理而来) 5.1三维齐次坐标 5.2 三维仿射变换 5.3 缩放和平移 5.4…...

GCC支持Objective C的故事?Objective-C?GCC只能编译C语言吗?Objective-C 1.0和2.0有什么区别?

GCC支持Objective C的故事 Objective-C 主要由 Stepstone 公司的Brad Cox和 Tom Love 在1980 年左右发明。乔布斯离开苹果公司后成立了NeXT STEP公司&#xff0c; 买下了Objective-C 语言的授权。GCC对Objective-C语言的支持是在1992年加入的&#xff0c;具体是在GCC 1.3版本中…...

自动驾驶占用网格预测

文章目录 需要阅读的文献&#xff1a;github论文仓库论文idea提取BEVFormer 需要阅读的文献&#xff1a; ⭐[ECCV 2024] SparseOcc 纯稀疏3D占用网络和 RayIoU 评估指标 ECCV 2024&#xff5c;OSP&#xff1a;自动驾驶全新建模方法&#xff0c;端到端输出任意位置的占用结果 S…...

1.17组会汇报

STRUC-BENCH: Are Large Language Models Good at Generating Complex Structured Tabular Data? STRUC-BENCH&#xff1a;大型语言模型擅长生成复杂的结构化表格数据吗&#xff1f;23年arXiv.org 1概括 这篇论文旨在评估大型语言模型&#xff08;LLMs&#xff09;在生成结构…...

使用 Ansys Motor-CAD 的自适应模板加速创新

应对现代电机设计挑战 电机设计不断发展&#xff0c;Ansys 正在通过创新解决方案引领潮流&#xff0c;不断突破可能的界限。随着电动汽车、工业自动化和可再生能源系统的快速增长&#xff0c;对优化电机的需求从未如此之高。工程师面临着越来越大的压力&#xff0c;他们需要开发…...

用nginx正向代理https网站

目录 1. 缘起2. 部署nginx3. 测试3.1 http测试3.2 https测试4 给centos设置代理访问外网 1. 缘起 最近碰到了一个麻烦事情&#xff0c;就是公司的centos测试服务器放在内网环境&#xff0c;而且不能直接上外网&#xff0c;导致无法通过yum安装软件&#xff0c;非常捉急。 幸…...

PyTorch使用教程(6)一文讲清楚torch.nn和torch.nn.functional的区别

torch.nn 和 torch.nn.functional 在 PyTorch 中都是用于构建神经网络的重要组件&#xff0c;但它们在设计理念、使用方式和功能上存在一些显著的区别。以下是关于这两个模块的详细区别&#xff1a; 1. 继承方式与结构 torch.nn torch.nn 中的模块大多数是通过继承 torch.nn…...

图论DFS:黑红树

我的个人主页 {\large \mathsf{{\color{Red} 我的个人主页} } } 我的个人主页 往 {\color{Red} {\Huge 往} } 往 期 {\color{Green} {\Huge 期} } 期 文 {\color{Blue} {\Huge 文} } 文 章 {\color{Orange} {\Huge 章}} 章 DFS 算法&#xff1a;记忆化搜索DFS 算法&#xf…...

StarRocks 怎么让特定的SQL路由到FE master节点的

背景 本文基于 StarRocks 3.1.7 大家都知道对于Starrocks来说 FE 是分 master和follower的&#xff0c;而只有master节点才能对元数据进行写操作。但是为什么呢&#xff1f;哪里有体现呢&#xff1f; 这其中的原因在网上是搜不到的&#xff0c;所以大家只知道只有master节点才…...

蓝桥杯真题 - 公因数匹配 - 题解

题目链接&#xff1a;https://www.lanqiao.cn/problems/3525/learning/ 个人评价&#xff1a;难度 2 星&#xff08;满星&#xff1a;5&#xff09; 前置知识&#xff1a;调和级数 整体思路 题目描述不严谨&#xff0c;没说在无解的情况下要输出什么&#xff08;比如 n n n …...

Java 8 Stream API

文章目录 Java 8 Stream API1. Stream2. Stream 的创建3. 常见的 Stream 操作3.1 中间操作3.2 终止操作 4. Stream 的并行操作 Java 8 Stream API Java 8 引入了 Stream API&#xff0c;使得对集合类&#xff08;如 List、Set 等&#xff09;的操作变得更加简洁和直观。Stream…...

AI刷题-还原原始字符串、大数和中的极值位距离

目录 一、还原原始字符串 问题描述 举例 输入格式 输出格式 输入 输出 输入 输出 输入 输出 输入 输出 输入 输出 输入 输出 数据范围 解题思路&#xff1a; 数据结构选择 最终代码&#xff1a; 运行结果&#xff1a; 二、大数和中的极值位距离 问题…...

Ubuntu20.04取消root账号自动登录的方法,触觉智能RK3568开发板演示

Ubuntu20.04默认情况下为root账号自动登录&#xff0c;本文介绍如何取消root账号自动登录&#xff0c;改为通过输入账号密码登录&#xff0c;使用触觉智能EVB3568鸿蒙开发板演示&#xff0c;搭载瑞芯微RK3568&#xff0c;四核A55处理器&#xff0c;主频2.0Ghz&#xff0c;1T算力…...

MySQL 数据库 :SQL 语句规约(不得使用外键与级联,一切外键概念必须在应用层解决。)

文章目录 I 强制规约表名限定数据订正禁止使用存储过程,存储过程难以调试和扩展,更没有移植性。不得使用外键与级联,一切外键概念必须在应用层解决。使用 ISNULL() 来判断是否为 NULL 值NPE 问题不要使用 count(列名) 或 count(常量) 来替代 count(*)II 建议in 操作能避免则…...

深入理解 SQL 中的 DATEDIFF 函数

深入理解 SQL 中的 DATEDIFF 函数 DATEDIFF 函数在 SQL 中是一个用于计算两个日期之间差值的重要工具。不同数据库实现了不同版本的 DATEDIFF&#xff0c;它们在功能和语法上有所不同。本文将详细解析 DATEDIFF 的用法、数据库间差异、复杂场景中的应用&#xff0c;以及替代方…...

【Linux】15.Linux进程概念(4)

文章目录 程序地址空间前景回顾C语言空间布局图&#xff1a;代码1代码2代码3代码4代码5代码6代码7 程序地址空间前景回顾 历史核心问题&#xff1a; pid_t id fork(); if(id 0) else if(id>0) 为什么一个id可以放两个值呢&#xff1f;之前没有仔细讲。 C语言空间布局图&am…...

KubeSphere部署安装,接入KubeKey安装的k8s集群

KubeSphere安装接入KubeKey安装的k8s集群 文章目录 KubeSphere安装接入KubeKey安装的k8s集群 一.NFS安装配置1.服务器安装NFS服务2.下载并部署 NFS Subdir External Provisioner1).下载部署文件2).创建 NameSpace3).创建 RBAC 资源4).配置 deployment.yaml5).部署 Storage Clas…...

opencv3.4 ffmpeg3.4 arm-linux 交叉编译

一些依赖安装&#xff1a; sudo apt-get install pkg-config libgtk2.0-dev libavcodec-dev libavformat-dev libswscale-dev 交叉编译工具链准备&#xff1a;gcc-linaro-6.3.1 1、下载 https://github.com/FFmpeg/FFmpeg 解压后新建目录&#xff1a;Fmpeg-n3.4.13/ffmpeg…...

Java基础(二)

提示:这部分内容对逆向重要,需重点掌握。 1.常见数据类型 1.1 List系列 类似于Python中的列表 List是一个接口,接口下面有两个常见的类型(目的是可以存放动态的多个数据) ArrayList,连续的内存地址存储(内部自动扩容) -> Python列表的特点LinkedList,底层基于链表…...

网络IO与IO多路复用

一、网络IO基础 系统对象&#xff1a; 网络IO涉及用户空间调用IO的进程或线程以及内核空间的内核系统。例如&#xff0c;当进行read操作时&#xff0c;会经历两个阶段&#xff1a; 等待数据准备就绪。将数据从内核拷贝到进程或线程中。 多种网络IO模型的出现原因&#xff1a;…...

C# OpenCvSharp 部署3D人脸重建3DDFA-V3

目录 说明 效果 模型信息 landmark.onnx net_recon.onnx net_recon_mbnet.onnx retinaface_resnet50.onnx 项目 代码 下载 参考 C# OpenCvSharp 部署3D人脸重建3DDFA-V3 说明 地址&#xff1a;https://github.com/wang-zidu/3DDFA-V3 3DDFA_V3 uses the geometri…...

【机器学习实战入门】使用OpenCV进行性别和年龄检测

Gender and Age Detection Python 项目 首先,向您介绍用于此高级 Python 项目的性别和年龄检测中的术语: 什么是计算机视觉? 计算机视觉是一门让计算机能够像人类一样观察和识别数字图像和视频的学科。它面临的挑战大多源于对生物视觉有限的了解。计算机视觉涉及获取、处…...

Android SystemUI——StatusBar视图创建(六)

上一篇文章我们介绍了 StatusBar 的构建过程,在 makeStatusBarView() 中获得 FragmentHostManager,用来管理 StatusBar 的窗口。 一、状态栏视图 在得到 FragmentHostManager 实例对象之后,还会继续调用 addTagListener() 方法设置监听对象,然后获取 FragmentManager 并开…...

解决 Error: Invalid or corrupt jarfile day04_studentManager.jar 报错问题

在 Java 开发过程中&#xff0c;我们可能会遇到这样的报错信息&#xff1a;Error: Invalid or corrupt jarfile day04_studentManager.jar。这个错误通常表示 day04_studentManager.jar 文件可能已损坏或无效&#xff0c;下面将为大家详细介绍如何解决这个问题。 一、错误点分…...

《MambaIR:一种基于状态空间模型的简单图像修复基线方法》学习笔记

paper&#xff1a;2402.15648 目录 摘要 一、引言 1、模型性能的提升依赖于网络感受野的扩大&#xff1a; 2、全局感受野和高效计算之间存在固有矛盾&#xff1a; 3、改进版 Mamba的巨大潜力 4、Mamba 在图像修复任务中仍面临以下挑战&#xff1a; 5、方法 6、主要贡献…...