Java 层面零拷贝原理
transferTo 方法定义
public abstract class FileChannelextends AbstractInterruptibleChannelimplements SeekableByteChannel, GatheringByteChannel, ScatteringByteChannel{public abstract long transferTo(long position, long count,WritableByteChannel target)throws IOException;}transferTo 方法实现原理
public class FileChannelImpl extends FileChannel{public long transferTo(long position, long count,WritableByteChannel target)throws IOException{... // 参数校验long n;// 尝试直接传输,需要内核支持if ((n = transferToDirectly(position, icount, target)) >= 0)return n;// 尝试通过mmap共享内存的方式进行可信通道的传输if ((n = transferToTrustedChannel(position, icount, target)) >= 0)return n;// 否则通过传统方式进行传输,这种方式最慢return transferToArbitraryChannel(position, icount, target);}}
transferToDirectly 方法实现原理
private long transferToDirectly(long position, int icount,WritableByteChannel target) throws IOException {...// 获取当前文件通道和目标通道对象的fdint thisFDVal = IOUtil.fdVal(fd);int targetFDVal = IOUtil.fdVal(targetFD);if (thisFDVal == targetFDVal) // 不允许自己传送给自己return IOStatus.UNSUPPORTED;// 如果需要使用position锁,那么获取该锁调用transferToDirectlyInternal方法完成数据传送,通常transferToDirectlyNeedsPositionLock方法始终返回trueif (nd.transferToDirectlyNeedsPositionLock()) {synchronized (positionLock) {long pos = position();try {return transferToDirectlyInternal(position, icount,target, targetFD);} finally {position(pos);}}} else {return transferToDirectlyInternal(position, icount, target, targetFD);}}
我们接着看transferToDirectlyInternal方法的实现,我们看到这里我们看到最终通过JNI调用本地方法transferTo0完成传送。
详细实现如下:
private long transferToDirectlyInternal(long position, int icount, WritableByteChannel target,FileDescriptor targetFD) throws IOException{...do {// JNI调用本地方法transferTo0完成传送n = transferTo0(fd, position, icount, targetFD);} while ((n == IOStatus.INTERRUPTED) && isOpen());...}// 如果操作系统不支持,那么将会返回-2private native long transferTo0(FileDescriptor src, long position,long count, FileDescriptor dst);
transferToTrustedChannel方法实现原理
private long transferToTrustedChannel(long position, long count,WritableByteChannel target)throws IOException{...long remaining = count;// 我们这里设置了最大mmap的大小为MAPPED_TRANSFER_SIZE 8M,如果需要传送的文件数据大于这个值,那么我们需要分阶段映射while (remaining > 0L) {long size = Math.min(remaining, MAPPED_TRANSFER_SIZE);try {// 获取当前文件映射缓冲区MappedByteBuffer dbb = map(MapMode.READ_ONLY, position, size);try {// 调用目标通道进行数据写入int n = target.write(dbb);...} finally {// 写入完成,结束内存映射unmap(dbb);}} catch (ClosedByInterruptException e) {...} catch (IOException ioe) {...}}return count - remaining;}
public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException {...// 映射并且返回解除映射对象Unmapper unmapper = mapInternal(mode, position, size, prot, isSync);// 根据模式构建只读MappedByteBufferR或者读写MappedByteBufferif (unmapper == null) {FileDescriptor dummy = new FileDescriptor();if ((!writable) || (prot == MAP_RO))return Util.newMappedByteBufferR(0, 0, dummy, null, isSync);elsereturn Util.newMappedByteBuffer(0, 0, dummy, null, isSync);} else if ((!writable) || (prot == MAP_RO)) {return Util.newMappedByteBufferR((int)unmapper.cap,unmapper.address + unmapper.pagePosition,unmapper.fd,unmapper, isSync);} else {return Util.newMappedByteBuffer((int)unmapper.cap,unmapper.address + unmapper.pagePosition,unmapper.fd,unmapper, isSync);}}
private Unmapper mapInternal(MapMode mode, long position, long size, int prot, boolean isSync)throws IOException{...long addr = -1;int ti = -1;try {...synchronized (positionLock) {...try {// JNI调用本地方法完成映射addr = map0(prot, mapPosition, mapSize, isSync);} catch (OutOfMemoryError x) {...}}...// 构建解除映射对象,这里我们使用DefaultUnmapperUnmapper um = (isSync? new SyncUnmapper(addr, mapSize, size, mfd, pagePosition): new DefaultUnmapper(addr, mapSize, size, mfd, pagePosition));return um;} finally {...}}private native long map0(int prot, long position, long length, boolean isSync)throws IOException;
transferToArbitraryChannel方法实现原理
private long transferToArbitraryChannel(long position, int icount, WritableByteChannel target)throws IOException{int c = Math.min(icount, TRANSFER_SIZE);// 创建堆内缓冲区ByteBuffer bb = ByteBuffer.allocate(c);long tw = 0; // 总写入数据long pos = position;try {// 循环写入while (tw < icount) {// 将文件数据读入到堆内缓冲区中bb.limit(Math.min((int)(icount - tw), TRANSFER_SIZE));int nr = read(bb, pos);if (nr <= 0)break;// 反转模式,从读模式切换到写模式bb.flip();// 将文件数据写入到堆内缓冲区中int nw = target.write(bb);tw += nw;if (nw != nr)break;pos += nw;// 清空缓冲区,方便下一次读取bb.clear();}return tw;} catch (IOException x) {...}}
JVM 层面零拷贝原理
JNI原理之transferTo0方法原理
JNIEXPORT jlong JNICALL Java_sun_nio_ch_FileChannelImpl_transferTo0(JNIEnv *env, jobject this,jint srcFD,jlong position, jlong count,jint dstFD){off64_t offset = (off64_t)position;jlong n = sendfile64(dstFD, // 目标描述符srcFD, // 源描述符&offset, // 源传送偏移量(size_t)count); // 传送大小...return n;}
JNI原理之map0方法原理
JNIEXPORT jlong JNICALL Java_sun_nio_ch_FileChannelImpl_map0(JNIEnv *env, jobject this,jint prot, jlong off, jlong len){void *mapAddress = 0;jobject fdo = (*env)->GetObjectField(env, this, chan_fd);jint fd = fdval(env, fdo);int protections = 0;int flags = 0;// 设置映射标志位if (prot == sun_nio_ch_FileChannelImpl_MAP_RO) {// 只读映射protections = PROT_READ;flags = MAP_SHARED;} else if (prot == sun_nio_ch_FileChannelImpl_MAP_RW) {// 读写映射protections = PROT_WRITE | PROT_READ;flags = MAP_SHARED;} else if (prot == sun_nio_ch_FileChannelImpl_MAP_PV) {// 私有映射protections = PROT_WRITE | PROT_READ;flags = MAP_PRIVATE;}// 使用Linux mmap64函数进行映射mapAddress = mmap64(0, // 传入期望映射地址为0,表明让内核决定映射起始虚拟地址len, // 映射的长度protections, // 映射地址权限:读、读写flags, // 是否为私有映射fd, // 映射的文件描述符off); // 映射的文件数据偏移量...return ((jlong) (unsigned long) mapAddress);}
Linux内核层面零拷贝原理
Linux内核原理之sendfile64方法原理
asmlinkage ssize_t sys_sendfile64(int out_fd, int in_fd, loff_t __user *offset, size_t count){loff_t pos;ssize_t ret;// 如果指定了偏移量,那么我们这里需要将用户空间传递的数据传入到pos中然后调用do_sendfile完成传送if (offset) {if (unlikely(copy_from_user(&pos, offset, sizeof(loff_t))))return -EFAULT;ret = do_sendfile(out_fd, in_fd, &pos, count, 0);// 将最新的pos放入到offset地址中if (unlikely(put_user(pos, offset)))return -EFAULT;return ret;}return do_sendfile(out_fd, in_fd, NULL, count, 0);}
我们继续跟进do_sendfile函数,该函数首先获取到输入、输出的文件对象in_file、out_file和innode对象in_inode、out_inode,在进行参数校验后调用输入文件对象in_file的操作函数f_op结构体的sendfile函数完成数据写出。
详细实现如下:
static ssize_t do_sendfile(int out_fd, int in_fd, loff_t *ppos,size_t count, loff_t max){...// 获取输入文件对象in_file = fget_light(in_fd, &fput_needed_in);...// 获取输入文件对象innodein_inode = in_file->f_dentry->d_inode;...// 获取输出文件对象out_file = fget_light(out_fd, &fput_needed_out);...// 获取输出文件对象innodeout_inode = out_file->f_dentry->d_inode;...// 通过输入文件对象的sendfile完成写入,file_send_actor函数地址为内核读入数据放入Page页中回调retval = in_file->f_op->sendfile(in_file, ppos, count, file_send_actor, out_file);...return retval;}现在我们知道最终调用的是文件对象的操作函数sendfile函数进行数据写入,我们先来看generic_file_sendfile函数的实现原理,该函数为诸多文件系统innode操作函数。
ssize_t generic_file_sendfile(struct file *in_file, loff_t *ppos,size_t count, read_actor_t actor, void __user *target){// 创建文件读取描述结构read_descriptor_t desc;if (!count)return 0;desc.written = 0;desc.count = count;desc.buf = target;desc.error = 0;// 完成具体数据传送操作do_generic_file_read(in_file, ppos, &desc, actor);// 返回写入target的数量if (desc.written)return desc.written;return desc.error;}
static inline void do_generic_file_read(struct file * filp, loff_t *ppos,read_descriptor_t * desc,read_actor_t actor){do_generic_mapping_read(filp->f_dentry->d_inode->i_mapping, // 文件数据页信息&filp->f_ra, // 预读状态信息filp, // 当前文件结构ppos, // 读取的文件position地址desc, // 文件读取描述结构actor); // 回调函数}
void do_generic_mapping_read(struct address_space *mapping,struct file_ra_state *ra,struct file * filp,loff_t *ppos,read_descriptor_t * desc,read_actor_t actor){// 获取address_space结构所属的inodestruct inode *inode = mapping->host;...for (;;) {...find_page:// 尝试从address_space地址空间中直接获取物理页帧(为了方便理解原理,我这里没有写index下标的计算)page = find_get_page(mapping, index);// 页为空,那么我们需要调用handle_ra_miss获取一个新的物理页帧,随后跳转到no_cached_page地址处执行if (unlikely(page == NULL)) {handle_ra_miss(mapping, ra, index);goto no_cached_page;}// 验证此时获取的物理页帧中的内容是否有效,如果无效,那么需要调用page_not_up_to_date方法从磁盘中读取数据if (!PageUptodate(page))goto page_not_up_to_date;page_ok:...// 调用回调函数处理该页ret = actor(desc, page, offset, nr);...page_not_up_to_date:// 此时物理页帧的数据无效if (PageUptodate(page))goto page_ok;...readpage:// 从磁盘中读取数据放入该页error = mapping->a_ops->readpage(filp, page);...no_cached_page:// 此时表明还未缓存该页,那么我们需要获取一个新的物理页帧if (!cached_page) {// 从内存管理系统中分配一个不再CPU 高速缓存中的物理页帧放入address_space中cached_page = page_cache_alloc_cold(mapping);if (!cached_page) {desc->error = -ENOMEM;break;}}// 将其添加到页缓存的lru队列error = add_to_page_cache_lru(cached_page, mapping,index, GFP_KERNEL);...// 获取到新页后读取磁盘数据放入其中goto readpage;}...}
int file_send_actor(read_descriptor_t * desc, struct page *page, unsigned long offset, unsigned long size){...written = file->f_op->sendpage(file, page, offset,size, &file->f_pos, size<count);...// 返回实际写入数量return written;}
我们这里假设我们使用的输出通道对象为SocketChannel,也即网络端。
那么我们就会进入Socket的sock_sendpage方法中,我们看到该方法获取到了file代表的socket结构,随后调用该结构的sendpage函数。
详细实现如下:
ssize_t sock_sendpage(struct file *file, struct page *page,int offset, size_t size, loff_t *ppos, int more){...// 根据d_inode结构获取socket的地址sock = SOCKET_I(file->f_dentry->d_inode);...// 调用socket的sendpage操作return sock->ops->sendpage(sock, page, offset, size, flags);}
由于TCP相较于UDP较为复杂,我们这里只关注页的去向,所以我们以UDP协议为例。
在udp_sendpage中调用ip_append_page将该页放入写缓冲区中,而在ip_append_page中将会调用skb_fill_page_desc(skb, i, page, offset, len)方法,向UDP发送队列添加一个数据报。
同时ip_append_page不会拷贝物理页帧的数据,只是将skb_frag_t结构指向该页帧。
int udp_sendpage(struct sock *sk, struct page *page, int offset, size_t size, int flags){...ret = ip_append_page(sk, page, offset, size, flags);...return ret;}



文章转载自马士兵,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。





