一、IO分类
1、IO理解分类 - 从传输方式上
从数据传输方式或者说是运输方式角度看,可以将 IO 类分为:
字节流 字符流
字节
是个计算机看的,字符
才是给人看的
1、字节流
(整体结构如下,部分派生类有缺失)

2、字符流
(整体结构如下,部分派生类有缺失)

3、字节流和字符流的区别
字节流读取单个字节,字符流读取单个字符(一个字符根据编码的不同,对应的字节也不同,如 UTF-8 编码中文汉字是 3 个字节,GBK编码中文汉字是 2 个字节。) 字节流用来处理二进制文件(图片、MP3、视频文件),字符流用来处理文本文件(可以看做是特殊的二进制文件,使用了某种编码,人可以阅读)。
简而言之,字节是给计算机看的,字符才是给人看的。
4、字节转字符Input/OutputStreamReader/Writer
编码就是把字符转换为字节,而解码是把字节重新组合成字符。
如果编码和解码过程使用不同的编码方式那么就出现了乱码。
GBK 编码中,中文字符占 2 个字节,英文字符占 1 个字节; UTF-8 编码中,中文字符占 3 个字节,英文字符占 1 个字节; UTF-16be 编码中,中文字符和英文字符都占 2 个字节。
UTF-16be 中的 be 指的是 Big Endian,也就是大端。相应地也有 UTF-16le,le 指的是 Little Endian,也就是小端。
Java 使用双字节编码 UTF-16be,这不是指 Java 只支持这一种编码方式,而是说 char 这种类型使用 UTF-16be 进行编码。char 类型占 16 位,也就是两个字节,Java 使用这种双字节编码是为了让一个中文或者一个英文都能使用一个 char 来存储。

2、IO理解分类 - 从数据操作上
从数据来源或者说是操作对象角度看,IO 类可以分为:

1、文件(file)
FileInputStream、FileOutputStream、FileReader、FileWriter
2、数组([])
字节数组(byte[]): ByteArrayInputStream、ByteArrayOutputStream 字符数组(char[]): CharArrayReader、CharArrayWriter
3、管道操作
PipedInputStream、PipedOutputStream、PipedReader、PipedWriter
4、基本数据类型
DataInputStream、DataOutputStream
5、缓冲操作
BufferedInputStream、BufferedOutputStream、BufferedReader、BufferedWriter
6、打印
PrintStream、PrintWriter
7、对象序列化反序列化
ObjectInputStream、ObjectOutputStream
8、转换
InputStreamReader、OutputStreamWriter
二、Java IO - 设计模式(装饰者模式)
1、装饰者模式
装饰者(Decorator)和具体组件(ConcreteComponent)都继承自组件(Component),具体组件的方法实现不需要依赖于其它对象,而装饰者组合了一个组件,这样它可以装饰其它装饰者或者具体组件。所谓装饰,就是把这个装饰者套在被装饰者之上,从而动态扩展被装饰者的功能。装饰者的方法有一部分是自己的,这属于它的功能,然后调用被装饰者的方法实现,从而也保留了被装饰者的功能。可以看到,具体组件应当是装饰层次的最低层,因为只有具体组件的方法实现不需要依赖于其它对象。

2、IO 装饰者模式
以 InputStream 为例,
InputStream 是抽象组件; FileInputStream 是 InputStream 的子类,属于具体组件,提供了字节流的输入操作; FilterInputStream 属于抽象装饰者,装饰者用于装饰组件,为组件提供额外的功能。例如 BufferedInputStream 为 FileInputStream 提供缓存的功能。

实例化一个具有缓存功能的字节流对象时,只需要在 FileInputStream 对象上再套一层 BufferedInputStream 对象即可。
FileInputStream fileInputStream = new FileInputStream(filePath);
BufferedInputStream bufferedInputStream = new BufferedInputStream(fileInputStream);
DataInputStream 装饰者提供了对更多数据类型进行输入的操作,比如 int、double 等基本类型。
三、InputStream
本文主要从JDK 11 源码角度分析InputStream。
1、InputStream 类实现关系
InputStream是输入字节流,具体的实现类层次结构如下:

2、InputStream 抽象类
InputStream 类重要方法设计如下:
// 读取下一个字节,如果没有则返回-1
public abstract int read()
// 将读取到的数据放在 byte 数组中,该方法实际上调用read(byte b[], int off, int len)方法
public int read(byte b[])
// 从第 off 位置读取<b>最多(实际可能小于)</b> len 长度字节的数据放到 byte 数组中,流是以 -1 来判断是否读取结束的; 此方法会一直阻止,直到输入数据可用、检测到stream结尾或引发异常为止。
public int read(byte b[], int off, int len)
// JDK9新增:读取 InputStream 中的所有剩余字节,调用readNBytes(Integer.MAX_VALUE)方法
public byte[] readAllBytes()
// JDK11更新:读取 InputStream 中的剩余字节的指定上限大小的字节内容;此方法会一直阻塞,直到读取了请求的字节数、检测到流结束或引发异常为止。此方法不会关闭输入流。
public byte[] readNBytes(int len)
// JDK9新增:从输入流读取请求的字节数并保存在byte数组中; 此方法会一直阻塞,直到读取了请求的字节数、检测到流结束或引发异常为止。此方法不会关闭输入流。
public int readNBytes(byte[] b, int off, int len)
// 跳过指定个数的字节不读取
public long skip(long n)
// 返回可读的字节数量
public int available()
// 读取完,关闭流,释放资源
public void close()
// 标记读取位置,下次还可以从这里开始读取,使用前要看当前流是否支持,可以使用 markSupport() 方法判断
public synchronized void mark(int readlimit)
// 重置读取位置为上次 mark 标记的位置
public synchronized void reset()
// 判断当前流是否支持标记流,和上面两个方法配套使用
public boolean markSupported()
// JDK9新增:读取 InputStream 中的全部字节并写入到指定的 OutputStream 中
public long transferTo(OutputStream out)
3、源码实现
梳理部分InputStream及其实现类的源码分析。
1、InputStream
InputStream抽象类源码如下:
public abstract class InputStream implements Closeable {
// 当使用skip方法时,最大的buffer size大小
private static final int MAX_SKIP_BUFFER_SIZE = 2048;
// 默认的buffer size
private static final int DEFAULT_BUFFER_SIZE = 8192;
// JDK11中增加了一个nullInputStream,即空模式实现,以便可以直接调用而不用判空(可以看如下的补充说明)
public static InputStream nullInputStream() {
return new InputStream() {
private volatile boolean closed;
private void ensureOpen() throws IOException {
if (closed) {
throw new IOException("Stream closed");
}
}
@Override
public int available () throws IOException {
ensureOpen();
return 0;
}
@Override
public int read() throws IOException {
ensureOpen();
return -1;
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
Objects.checkFromIndexSize(off, len, b.length);
if (len == 0) {
return 0;
}
ensureOpen();
return -1;
}
@Override
public byte[] readAllBytes() throws IOException {
ensureOpen();
return new byte[0];
}
@Override
public int readNBytes(byte[] b, int off, int len)
throws IOException {
Objects.checkFromIndexSize(off, len, b.length);
ensureOpen();
return 0;
}
@Override
public byte[] readNBytes(int len) throws IOException {
if (len < 0) {
throw new IllegalArgumentException("len < 0");
}
ensureOpen();
return new byte[0];
}
@Override
public long skip(long n) throws IOException {
ensureOpen();
return 0L;
}
@Override
public long transferTo(OutputStream out) throws IOException {
Objects.requireNonNull(out);
ensureOpen();
return 0L;
}
@Override
public void close() throws IOException {
closed = true;
}
};
}
// 读取下一个字节的数据,如果没有则返回-1
public abstract int read() throws IOException;
// 将读取到的数据放在 byte 数组中,该方法实际上调用read(byte b[], int off, int len)方法
public int read(byte b[]) throws IOException {
return read(b, 0, b.length);
}
// 从第 off 位置读取<b>最多(实际可能小于)</b> len 长度字节的数据放到 byte 数组中,流是以 -1 来判断是否读取结束的; 此方法会一直阻止,直到输入数据可用、检测到stream结尾或引发异常为止。
public int read(byte b[], int off, int len) throws IOException {
// 检查边界
Objects.checkFromIndexSize(off, len, b.length);
if (len == 0) {
return 0;
}
// 读取下一个字节
int c = read();
if (c == -1) { // 读到stream末尾,则返回读取的字节数量为-1
return -1;
}
b[off] = (byte)c;
// i用来记录取了多少个字节
int i = 1;
try {
// 循环读取
for (; i < len ; i++) {
c = read();
if (c == -1) {// 读到stream末尾,则break
break;
}
b[off + i] = (byte)c;
}
} catch (IOException ee) {
}
// 返回读取到的字节个数
return i;
}
// 分配的最大数组大小。
// 由于一些VM在数组中保留一些头字,所以尝试分配较大的阵列可能会导致OutOfMemoryError(请求的阵列大小超过VM限制)
private static final int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 8;
// JDK9新增:读取 InputStream 中的所有剩余字节,调用readNBytes(Integer.MAX_VALUE)方法
public byte[] readAllBytes() throws IOException {
return readNBytes(Integer.MAX_VALUE);
}
// JDK11更新:读取 InputStream 中的剩余字节的指定上限大小的字节内容;此方法会一直阻塞,直到读取了请求的字节数、检测到流结束或引发异常为止。此方法不会关闭输入流。
public byte[] readNBytes(int len) throws IOException {
// 边界检查
if (len < 0) {
throw new IllegalArgumentException("len < 0");
}
List<byte[]> bufs = null; // 缓存每次读取到的内容放到bufs,最后组装成result
byte[] result = null; // 最后读取到的内容
int total = 0;
int remaining = len; // 剩余字节长度
int n;
do {
byte[] buf = new byte[Math.min(remaining, DEFAULT_BUFFER_SIZE)];
int nread = 0;
// 读取到结束为止,读取大小n可能大于或小于缓冲区大小
while ((n = read(buf, nread,
Math.min(buf.length - nread, remaining))) > 0) {
nread += n;
remaining -= n;
}
if (nread > 0) {
if (MAX_BUFFER_SIZE - total < nread) {
throw new OutOfMemoryError("Required array size too large");
}
total += nread;
if (result == null) {
result = buf;
} else {
if (bufs == null) {
bufs = new ArrayList<>();
bufs.add(result);
}
bufs.add(buf);
}
}
// 如果读不到内容(返回-1)或者没有剩余的字节,则跳出循环
} while (n >= 0 && remaining > 0);
if (bufs == null) {
if (result == null) {
return new byte[0];
}
return result.length == total ?
result : Arrays.copyOf(result, total);
}
// 组装最后的result
result = new byte[total];
int offset = 0;
remaining = total;
for (byte[] b : bufs) {
int count = Math.min(b.length, remaining);
System.arraycopy(b, 0, result, offset, count);
offset += count;
remaining -= count;
}
return result;
}
// JDK9新增:从输入流读取请求的字节数并保存在byte数组中; 此方法会一直阻塞,直到读取了请求的字节数、检测到流结束或引发异常为止。此方法不会关闭输入流。
public int readNBytes(byte[] b, int off, int len) throws IOException {
Objects.checkFromIndexSize(off, len, b.length);
int n = 0;
while (n < len) {
int count = read(b, off + n, len - n);
if (count < 0)
break;
n += count;
}
return n;
}
// 跳过指定个数的字节不读取
public long skip(long n) throws IOException {
long remaining = n;
int nr;
if (n <= 0) {
return 0;
}
int size = (int)Math.min(MAX_SKIP_BUFFER_SIZE, remaining);
byte[] skipBuffer = new byte[size];
while (remaining > 0) {
nr = read(skipBuffer, 0, (int)Math.min(size, remaining));
if (nr < 0) {
break;
}
remaining -= nr;
}
return n - remaining;
}
// 返回可读的字节数量
public int available() throws IOException {
return 0;
}
// 读取完,关闭流,释放资源
public void close() throws IOException {}
// 标记读取位置,下次还可以从这里开始读取,使用前要看当前流是否支持,可以使用 markSupport() 方法判断
public synchronized void mark(int readlimit) {}
// 重置读取位置为上次 mark 标记的位置
public synchronized void reset() throws IOException {
throw new IOException("mark/reset not supported");
}
// 判断当前流是否支持标记流,和上面两个方法配套使用。默认是false,由子类方法重写
public boolean markSupported() {
return false;
}
// JDK9新增:读取 InputStream 中的全部字节并写入到指定的 OutputStream 中
public long transferTo(OutputStream out) throws IOException {
Objects.requireNonNull(out, "out");
long transferred = 0;
byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
int read;
while ((read = this.read(buffer, 0, DEFAULT_BUFFER_SIZE)) >= 0) {
out.write(buffer, 0, read);
transferred += read;
}
return transferred;
}
总结下JDK9的更新点
类 java.io.InputStream 中增加了新的方法来读取和复制 InputStream 中包含的数据。
readAllBytes
:读取 InputStream 中的所有剩余字节。readNBytes
: 从 InputStream 中读取指定数量的字节到数组中。transferTo
:读取 InputStream 中的全部字节并写入到指定的 OutputStream 中 。
public class TestInputStream {
private InputStream inputStream;
private static final String CONTENT = "Hello World";
@Before
public void setUp() throws Exception {
this.inputStream =
TestInputStream.class.getResourceAsStream("/input.txt");
}
@Test
public void testReadAllBytes() throws Exception {
final String content = new String(this.inputStream.readAllBytes());
assertEquals(CONTENT, content);
}
@Test
public void testReadNBytes() throws Exception {
final byte[] data = new byte[5];
this.inputStream.readNBytes(data, 0, 5);
assertEquals("Hello", new String(data));
}
@Test
public void testTransferTo() throws Exception {
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
this.inputStream.transferTo(outputStream);
assertEquals(CONTENT, outputStream.toString());
}
}
read(byte[], int, int)
和readNBytes(byte[], int, int)
看似是实现的相同功能,为何会设计readNBytes方法呢?
这个问题可以参看这里 (opens new window)
read(byte[], int, int)是尝试读到最多len个bytes,但是读取到的内容长度可能是小于len的。 readNBytes(byte[], int, int) 会一直(while循环)查找直到stream尾为止
举个例子:如果文本内容是12345<end>
, read(s,0,10)是允许返回123
的, 而readNbytes(s,0,10)会一直(while循环)查找直到stream尾为止,并返回12345
.
补充下JDK11为什么会增加nullInputStream方法的设计?即空对象模式
空对象模式
举个例子:
public class MyParser implements Parser {
private static Action NO_ACTION = new Action() {
public void doSomething() { /* do nothing */ }
};
public Action findAction(String userInput) {
// ...
if ( /* we can't find any actions */ ) {
return NO_ACTION;
}
}
}
然后便可以始终可以这么调用,而不用再判断空了
ParserFactory.getParser().findAction(someInput).doSomething();
2、FilterInputStream
FilterInputStream 源码如下
public class FilterInputStream extends InputStream {
// 被装饰的inputStream
protected volatile InputStream in;
// 构造函数,注入被装饰的inputStream
protected FilterInputStream(InputStream in) {
this.in = in;
}
// 本质是调用被装饰的inputStream的方法
public int read() throws IOException {
return in.read();
}
public int read(byte b[]) throws IOException {
return read(b, 0, b.length);
}
public int read(byte b[], int off, int len) throws IOException {
return in.read(b, off, len);
}
public long skip(long n) throws IOException {
return in.skip(n);
}
public int available() throws IOException {
return in.available();
}
public void close() throws IOException {
in.close();
}
public synchronized void mark(int readlimit) {
in.mark(readlimit);
}
public synchronized void reset() throws IOException {
in.reset();
}
public boolean markSupported() {
return in.markSupported();
}
}
为什么被装饰的inputStream是volatile类型的?
请参看: 关键字: volatile详解
3、ByteArrayInputStream
ByteArrayInputStream源码如下
public class ByteArrayInputStream extends InputStream {
// 内部保存的byte 数组
protected byte buf[];
// 读取下一个字节的数组下标,byte[pos]就是read获取的下个字节
protected int pos;
// mark的数组下标位置
protected int mark = 0;
// 保存的有效byte的个数
protected int count;
// 构造方法
public ByteArrayInputStream(byte buf[]) {
this.buf = buf;
this.pos = 0;
this.count = buf.length;
}
// 构造方法,带offset的
public ByteArrayInputStream(byte buf[], int offset, int length) {
this.buf = buf;
this.pos = offset;
this.count = Math.min(offset + length, buf.length);
this.mark = offset;
}
// 从流中读取下一个字节,没有读取到返回 -1
public synchronized int read() {
return (pos < count) ? (buf[pos++] & 0xff) : -1;
}
// 从第 off 位置读取<b>最多(实际可能小于)</b> len 长度字节的数据放到 byte 数组中,流是以 -1 来判断是否读取结束的
public synchronized int read(byte b[], int off, int len) {
// 边界检查
if (b == null) {
throw new NullPointerException();
} else if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException();
}
if (pos >= count) {
return -1;
}
int avail = count - pos;
if (len > avail) {
len = avail;
}
if (len <= 0) {
return 0;
}
// 从buf拷贝到byte 数组b中
System.arraycopy(buf, pos, b, off, len);
pos += len;
return len;
}
// 跳过指定个数的字节不读取
public synchronized long skip(long n) {
long k = count - pos;
if (n < k) {
k = n < 0 ? 0 : n;
}
pos += k;
return k;
}
// 还有稍稍byte在buffer中未读取,即总的count 减去 当前byte位置
public synchronized int available() {
return count - pos;
}
// 支持mark所以返回true
public boolean markSupported() {
return true;
}
// 在流中当前位置mark, readAheadLimit参数未使用
public void mark(int readAheadLimit) {
mark = pos;
}
// 重置流,即回到mark的位置
public synchronized void reset() {
pos = mark;
}
// 关闭ByteArrayInputStream不会产生任何动作
public void close() throws IOException {
}
}
4、BufferedInputStream
BufferedInputStream源码如下
public class BufferedInputStream extends FilterInputStream {
// 默认的buffer大小
private static int DEFAULT_BUFFER_SIZE = 8192;
// 分配的最大数组大小。
// 由于一些VM在数组中保留一些头字,所以尝试分配较大的阵列可能会导致OutOfMemoryError(请求的阵列大小超过VM限制)
private static int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 8;
// 内部保存在byte 数组中
protected volatile byte buf[];
// 关闭流的方法可能是异步的,所以使用原子AtomicReferenceFieldUpdater提供CAS无锁方式(可以解决CAS的ABA问题)来保证
private static final AtomicReferenceFieldUpdater<BufferedInputStream, byte[]> bufUpdater =
AtomicReferenceFieldUpdater.newUpdater(BufferedInputStream.class, byte[].class, "buf");
// 有效byte的大小
protected int count;
// 当前位置
protected int pos;
// 最后一次,调用mark方法,标记的位置
protected int markpos = -1;
/**
* 该变量惟一入口就是mark(int readLimit),好比调用方法mark(1024),那么后面读取的数据若是
* 超过了1024字节,那么这次mark就为无效标记,子类能够选择抛弃该mark标记,从头开始。不过具体实现
* 跟具体的子类有关,在BufferedInputStream中,会抛弃mark标记,从新将markpos赋值为-1
*/
protected int marklimit;
// 获取被装饰的stream
private InputStream getInIfOpen() throws IOException {
InputStream input = in;
if (input == null)
throw new IOException("Stream closed");
return input;
}
// 获取实际内部的buffer数组
private byte[] getBufIfOpen() throws IOException {
byte[] buffer = buf;
if (buffer == null)
throw new IOException("Stream closed");
return buffer;
}
// 构造函数,buffer是8kb
public BufferedInputStream(InputStream in) {
this(in, DEFAULT_BUFFER_SIZE);
}
// 构造函数,指定buffer大小
public BufferedInputStream(InputStream in, int size) {
super(in);
if (size <= 0) {
throw new IllegalArgumentException("Buffer size <= 0");
}
buf = new byte[size];
}
/**
* 用更多的数据填充缓冲区,考虑到shuffling和其他处理标记的技巧,
* 假设它是由同步方法调用的。该方法还假设所有数据已经被读入,因此pos >count。
*/
private void fill() throws IOException {
// 得到内部缓冲区buffer
byte[] buffer = getBufIfOpen();
// 没有mark的情况下, pos为0
if (markpos < 0)
pos = 0; /* no mark: throw away the buffer */
// pos >= buffer.length buffer已经被读取完了
else if (pos >= buffer.length) /* no room left in buffer */
// markpos > 0 有标记,标记处在缓存中间
if (markpos > 0) { /* can throw away early part of the buffer */
// 把buffer中,markpos到pos的部分移动到0-sz处,pos设置为sz,markpos为0
int sz = pos - markpos;
System.arraycopy(buffer, markpos, buffer, 0, sz);
pos = sz;
markpos = 0;
// markpos已经为0了,marklimit比buffer.length小,再读取buffer已经没有地方了
} else if (buffer.length >= marklimit) {
// 清空缓存,清空标记,markpos为-1,pos为0
markpos = -1; /* buffer got too big, invalidate mark */
pos = 0; /* drop buffer contents */
// markpos已经为0了,marklimit比buffer.length大,而buffer.length已经最大了,不能扩容
} else if (buffer.length >= MAX_BUFFER_SIZE) {
throw new OutOfMemoryError("Required array size too large");
// markpos已经为0了,marklimit比buffer.length大
} else { /* grow buffer */
// 建立一个长度为min(2*pos,marklimit,MAX_BUFFER_SIZE),的缓存数组,然后把原来0-pos移动到新数组的0-pos处
int nsz = (pos <= MAX_BUFFER_SIZE - pos) ?
pos * 2 : MAX_BUFFER_SIZE;
if (nsz > marklimit)
nsz = marklimit;
byte nbuf[] = new byte[nsz];
System.arraycopy(buffer, 0, nbuf, 0, pos);
// 用bufUpdater替换buffer
if (!bufUpdater.compareAndSet(this, buffer, nbuf)) {
// Can't replace buf if there was an async close.
// Note: This would need to be changed if fill()
// is ever made accessible to multiple threads.
// But for now, the only way CAS can fail is via close.
// assert buf == null;
throw new IOException("Stream closed");
}
buffer = nbuf;
}
// 当前读取上限count为pos
count = pos;
// 从内部的输入流,读取pos到buffer.length部分,读取的字节数加到count
int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
if (n > 0)
count = n + pos;
}
// 读取byte
public synchronized int read() throws IOException {
// 说明当前buf[]数组大小不够了,须要fill()
if (pos >= count) {
fill();
// 说明没有读取到任何数据
if (pos >= count)
return -1;
}
return getBufIfOpen()[pos++] & 0xff;
}
/**
* Read characters into a portion of an array, reading from the underlying
* stream at most once if necessary.
*/
private int read1(byte[] b, int off, int len) throws IOException {
int avail = count - pos;
if (avail <= 0) {
// 当写入指定数组b的长度大小超过BufferedInputStream中核心缓存数组buf[]的大小而且 markpos < 0,那么就直接从数据流中读取数据给b数组,而不经过buf[]缓存数组,避免buf[]数组急剧增大
if (len >= getBufIfOpen().length && markpos < 0) {
return getInIfOpen().read(b, off, len);
}
fill();
avail = count - pos;
if (avail <= 0) return -1;
}
int cnt = (avail < len) ? avail : len;
System.arraycopy(getBufIfOpen(), pos, b, off, cnt);
pos += cnt;
return cnt;
}
// 读取到byte数组b中
public synchronized int read(byte b[], int off, int len)
throws IOException
{
getBufIfOpen(); // Check for closed stream
if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return 0;
}
int n = 0;
for (;;) {
int nread = read1(b, off + n, len - n);
if (nread <= 0)
return (n == 0) ? nread : n;
n += nread;
if (n >= len)
return n;
// if not closed but no bytes available, return
InputStream input = in;
if (input != null && input.available() <= 0)
return n;
}
}
// 跳过n个
public synchronized long skip(long n) throws IOException {
getBufIfOpen(); // Check for closed stream
if (n <= 0) {
return 0;
}
long avail = count - pos;
if (avail <= 0) {
// If no mark position set then don't keep in buffer
if (markpos <0)
return getInIfOpen().skip(n);
// Fill in buffer to save bytes for reset
fill();
avail = count - pos;
if (avail <= 0)
return 0;
}
long skipped = (avail < n) ? avail : n;
pos += skipped;
return skipped;
}
// buf[]数组剩余字节数+输入流中剩余字节数
public synchronized int available() throws IOException {
int n = count - pos;
int avail = getInIfOpen().available();
return n > (Integer.MAX_VALUE - avail)
? Integer.MAX_VALUE
: n + avail;
}
// 标记位置,marklimit只有在这里才可以被赋值,readlimit表示mark()方法执行后,最多可以从流中读取的数据
// 若是超过该字节大小,那么在fill()的时候,就会认为此mark()标记无效,从新将 markpos = -1,pos = 0
public synchronized void mark(int readlimit) {
marklimit = readlimit;
markpos = pos;
}
// 重置位置
public synchronized void reset() throws IOException {
getBufIfOpen(); // 如果已经close, 则直接报错
if (markpos < 0)
throw new IOException("Resetting to invalid mark");
pos = markpos;
}
// 支持mark, 所以返回true
public boolean markSupported() {
return true;
}
// 通过AtomicReferenceFieldUpdater的CAS无锁方式close
public void close() throws IOException {
byte[] buffer;
while ( (buffer = buf) != null) {
if (bufUpdater.compareAndSet(this, buffer, null)) {
InputStream input = in;
in = null;
if (input != null)
input.close();
return;
}
// Else retry in case a new buf was CASed in fill()
}
}
}
4、参考文章
JDK 11 源码 https://www.cnblogs.com/winterfells/p/8745297.html https://www.cnblogs.com/AdaiCoffee/p/11369699.html
四、OutputStream
本文主要从JDK 11源码角度分析 OutputStream。
1、OutputStream 类实现关系
OutputStream是输出字节流,具体的实现类层次结构如下:

2、OutputStream 抽象类
OutputStream 类重要方法设计如下:
// 写入一个字节,可以看到这里的参数是一个 int 类型,对应上面的读方法,int 类型的 32 位,只有低 8 位才写入,高 24 位将舍弃。
public abstract void write(int b)
// 将数组中的所有字节写入,实际调用的是write(byte b[], int off, int len)方法。
public void write(byte b[])
// 将 byte 数组从 off 位置开始,len 长度的字节写入
public void write(byte b[], int off, int len)
// 强制刷新,将缓冲中的数据写入; 默认是空实现,供子类覆盖
public void flush()
// 关闭输出流,流被关闭后就不能再输出数据了; 默认是空实现,供子类覆盖
public void close()
3、源码实现
梳理部分OutputStream及其实现类的源码分析。
1、OutputStream
OutputStream抽象类源码如下:
public abstract class OutputStream implements Closeable, Flushable {
// JDK11中增加了一个nullOutputStream,即空模式实现,以便可以直接调用而不用判空(可以看如下的补充说明)
public static OutputStream nullOutputStream() {
return new OutputStream() {
private volatile boolean closed;
private void ensureOpen() throws IOException {
if (closed) {
throw new IOException("Stream closed");
}
}
@Override
public void write(int b) throws IOException {
ensureOpen();
}
@Override
public void write(byte b[], int off, int len) throws IOException {
Objects.checkFromIndexSize(off, len, b.length);
ensureOpen();
}
@Override
public void close() {
closed = true;
}
};
}
// 写入一个字节,可以看到这里的参数是一个 int 类型,对应上面的读方法,int 类型的 32 位,只有低 8 位才写入,高 24 位将舍弃。
public abstract void write(int b) throws IOException;
// 将数组中的所有字节写入,实际调用的是write(byte b[], int off, int len)方法
public void write(byte b[]) throws IOException {
write(b, 0, b.length);
}
// 将 byte 数组从 off 位置开始,len 长度的字节写入
public void write(byte b[], int off, int len) throws IOException {
// 检查边界合理性
Objects.checkFromIndexSize(off, len, b.length);
// len == 0 的情况已经在如下的for循环中隐式处理了
for (int i = 0 ; i < len ; i++) {
write(b[off + i]);
}
}
// 强制刷新,将缓冲中的数据写入; 默认是空实现,供子类覆盖
public void flush() throws IOException {
}
// 关闭输出流,流被关闭后就不能再输出数据了; 默认是空实现,供子类覆盖
public void close() throws IOException {
}
}
补充下JDK11为什么会增加nullOutputStream方法的设计?即空对象模式
空对象模式
举个例子:
public class MyParser implements Parser {
private static Action NO_ACTION = new Action() {
public void doSomething() { /* do nothing */ }
};
public Action findAction(String userInput) {
// ...
if ( /* we can't find any actions */ ) {
return NO_ACTION;
}
}
}
然后便可以始终可以这么调用,而不用再判断空了
ParserFactory.getParser().findAction(someInput).doSomething();
2、FilterOutputStream
FilterOutputStream 源码如下
public class FilterOutputStream extends OutputStream {
// 被装饰的实际outputStream
protected OutputStream out;
// 当前stream是否已经被close
private volatile boolean closed;
// close stream时加锁,防止其它线程同时close
private final Object closeLock = new Object();
// 初始化构造函数,传入被装饰的实际outputStream
public FilterOutputStream(OutputStream out) {
this.out = out;
}
// 写入数据,本质调用被装饰outputStream的方法
@Override
public void write(int b) throws IOException {
out.write(b);
}
// 将数组中的所有字节写入
@Override
public void write(byte b[]) throws IOException {
write(b, 0, b.length);
}
// 一个个写入
@Override
public void write(byte b[], int off, int len) throws IOException {
if ((off | len | (b.length - (len + off)) | (off + len)) < 0)
throw new IndexOutOfBoundsException();
for (int i = 0 ; i < len ; i++) {
write(b[off + i]);
}
}
// 强制刷新,将缓冲中的数据写入; 本质调用被装饰outputStream的方法
@Override
public void flush() throws IOException {
out.flush();
}
// 关闭Stream
@Override
public void close() throws IOException {
// 如果已经close, 直接退出
if (closed) {
return;
}
// 加锁处理,如果已经有线程正在closing则退出;
synchronized (closeLock) {
if (closed) {
return;
}
closed = true;
}
// close前调用flush
Throwable flushException = null;
try {
flush();
} catch (Throwable e) {
flushException = e;
throw e;
} finally {
if (flushException == null) {
out.close();
} else {
try {
out.close();
} catch (Throwable closeException) {
// evaluate possible precedence of flushException over closeException
if ((flushException instanceof ThreadDeath) &&
!(closeException instanceof ThreadDeath)) {
flushException.addSuppressed(closeException);
throw (ThreadDeath) flushException;
}
if (flushException != closeException) {
closeException.addSuppressed(flushException);
}
throw closeException;
}
}
}
}
}
对比下JDK8中,close方法是没有加锁处理的。这种情况下你可以看JDK8源码中,直接利用java7的try with resources方式,优雅的调用flush方法后对out进行关闭。
public void close() throws IOException {
try (OutputStream ostream = out) {
flush();
}
}
3、ByteArrayOutputStream
ByteArrayOutputStream 源码如下
public class ByteArrayOutputStream extends OutputStream {
// 实际的byte数组
protected byte buf[];
// 数组中实际有效的byte的个数
protected int count;
// 初始化默认构造,初始化byte数组大小为32
public ByteArrayOutputStream() {
this(32);
}
// 初始化byte的大小
public ByteArrayOutputStream(int size) {
if (size < 0) {
throw new IllegalArgumentException("Negative initial size: "
+ size);
}
buf = new byte[size];
}
// 扩容,确保它至少可以容纳由最小容量参数指定的元素数
private void ensureCapacity(int minCapacity) {
// overflow-conscious code
if (minCapacity - buf.length > 0)
grow(minCapacity);
}
// 分配的最大数组大小。
// 由于一些VM在数组中保留一些头字,所以尝试分配较大的阵列可能会导致OutOfMemoryError(请求的阵列大小超过VM限制)
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
// 扩容的实质方法,确保它至少可以容纳由最小容量参数指定的元素数
private void grow(int minCapacity) {
// overflow-conscious code
int oldCapacity = buf.length;
int newCapacity = oldCapacity << 1;
if (newCapacity - minCapacity < 0)
newCapacity = minCapacity;
if (newCapacity - MAX_ARRAY_SIZE > 0)
newCapacity = hugeCapacity(minCapacity);
buf = Arrays.copyOf(buf, newCapacity);
}
private static int hugeCapacity(int minCapacity) {
if (minCapacity < 0) // overflow
throw new OutOfMemoryError();
return (minCapacity > MAX_ARRAY_SIZE) ?
Integer.MAX_VALUE :
MAX_ARRAY_SIZE;
}
// 写入,写入前确保byte数据长度
public synchronized void write(int b) {
ensureCapacity(count + 1);
buf[count] = (byte) b;
count += 1;
}
public synchronized void write(byte b[], int off, int len) {
Objects.checkFromIndexSize(off, len, b.length);
ensureCapacity(count + len);
System.arraycopy(b, off, buf, count, len);
count += len;
}
public void writeBytes(byte b[]) {
write(b, 0, b.length);
}
public synchronized void writeTo(OutputStream out) throws IOException {
out.write(buf, 0, count);
}
// 重置,显然将实际有效的byte数量置为0
public synchronized void reset() {
count = 0;
}
public synchronized byte[] toByteArray() {
return Arrays.copyOf(buf, count);
}
// 长度,即count
public synchronized int size() {
return count;
}
// 转成string
public synchronized String toString() {
return new String(buf, 0, count);
}
// 转成string,指定的字符集
public synchronized String toString(String charsetName)
throws UnsupportedEncodingException
{
return new String(buf, 0, count, charsetName);
}
public synchronized String toString(Charset charset) {
return new String(buf, 0, count, charset);
}
// 弃用
@Deprecated
public synchronized String toString(int hibyte) {
return new String(buf, hibyte, 0, count);
}
// 对byte 数组而言,close没啥实质意义,所以空实现
public void close() throws IOException {
}
}
4、BufferedOutputStream
BufferedOutputStream 源码如下
public class BufferedOutputStream extends FilterOutputStream {
// Buffered outputStream底层也是byte数组
protected byte buf[];
// 大小,buf[0]到buf[count-1]是实际存储的bytes
protected int count;
// 构造函数,被装饰的outputStream,以及默认buf大小是8192
public BufferedOutputStream(OutputStream out) {
this(out, 8192);
}
public BufferedOutputStream(OutputStream out, int size) {
super(out);
if (size <= 0) {
throw new IllegalArgumentException("Buffer size <= 0");
}
buf = new byte[size];
}
/** Flush the internal buffer */
// 内部的flush方法,将buffer中的有效bytes(count是有效的bytes大小)通过被装饰的outputStream写入
private void flushBuffer() throws IOException {
if (count > 0) {
out.write(buf, 0, count);
count = 0;
}
}
// 写入byte
@Override
public synchronized void write(int b) throws IOException {
// 当buffer满了以后,flush buffer
if (count >= buf.length) {
flushBuffer();
}
buf[count++] = (byte)b;
}
// 将 byte 数组从 off 位置开始,len 长度的字节写入
@Override
public synchronized void write(byte b[], int off, int len) throws IOException {
if (len >= buf.length) {
// 如果请求长度已经超过输出缓冲区的大小,直接刷新输出缓冲区,然后直接写入数据。
flushBuffer();
out.write(b, off, len);
return;
}
if (len > buf.length - count) {
flushBuffer();
}
System.arraycopy(b, off, buf, count, len);
count += len;
}
// flush方法,需要先将buffer中写入,最后在调用被装饰outputStream的flush方法
@Override
public synchronized void flush() throws IOException {
flushBuffer();
out.flush();
}
}
五、常见类使用
本文主要介绍Java IO常见类的使用,包括:磁盘操作,字节操作,字符操作,对象操作和网络操作。
1、IO常见类的使用
Java 的 I/O 大概可以分成以下几类:
磁盘操作: File 字节操作: InputStream 和 OutputStream 字符操作: Reader 和 Writer 对象操作: Serializable 网络操作: Socket
1、File相关
File 类可以用于表示文件和目录的信息,但是它不表示文件的内容。
递归地列出一个目录下所有文件:
public static void listAllFiles(File dir) {
if (dir == null || !dir.exists()) {
return;
}
if (dir.isFile()) {
System.out.println(dir.getName());
return;
}
for (File file : dir.listFiles()) {
listAllFiles(file);
}
}
2、字节流相关
public static void copyFile(String src, String dist) throws IOException {
FileInputStream in = new FileInputStream(src);
FileOutputStream out = new FileOutputStream(dist);
byte[] buffer = new byte[20 * 1024];
// read() 最多读取 buffer.length 个字节
// 返回的是实际读取的个数
// 返回 -1 的时候表示读到 eof,即文件尾
while (in.read(buffer, 0, buffer.length) != -1) {
out.write(buffer);
}
in.close();
out.close();
}
3、实现逐行输出文本文件的内容
public static void readFileContent(String filePath) throws IOException {
FileReader fileReader = new FileReader(filePath);
BufferedReader bufferedReader = new BufferedReader(fileReader);
String line;
while ((line = bufferedReader.readLine()) != null) {
System.out.println(line);
}
// 装饰者模式使得 BufferedReader 组合了一个 Reader 对象
// 在调用 BufferedReader 的 close() 方法时会去调用 Reader 的 close() 方法
// 因此只要一个 close() 调用即可
bufferedReader.close();
}
4、序列化 & Serializable & transient
序列化就是将一个对象转换成字节序列,方便存储和传输。
序列化: ObjectOutputStream.writeObject() 反序列化: ObjectInputStream.readObject()
不会对静态变量进行序列化,因为序列化只是保存对象的状态,静态变量属于类的状态。
Serializable
序列化的类需要实现 Serializable 接口,它只是一个标准,没有任何方法需要实现,但是如果不去实现它的话而进行序列化,会抛出异常。
public static void main(String[] args) throws IOException, ClassNotFoundException {
A a1 = new A(123, "abc");
String objectFile = "file/a1";
ObjectOutputStream objectOutputStream = new ObjectOutputStream(new FileOutputStream(objectFile));
objectOutputStream.writeObject(a1);
objectOutputStream.close();
ObjectInputStream objectInputStream = new ObjectInputStream(new FileInputStream(objectFile));
A a2 = (A) objectInputStream.readObject();
objectInputStream.close();
System.out.println(a2);
}
private static class A implements Serializable {
private int x;
private String y;
A(int x, String y) {
this.x = x;
this.y = y;
}
@Override
public String toString() {
return "x = " + x + " " + "y = " + y;
}
}
transient
transient 关键字可以使一些属性不会被序列化。
ArrayList 中存储数据的数组 elementData 是用 transient 修饰的,因为这个数组是动态扩展的,并不是所有的空间都被使用,因此就不需要所有的内容都被序列化。通过重写序列化和反序列化方法,使得可以只序列化数组中有内容的那部分数据。
private transient Object[] elementData;
5、Java 中的网络支持:
InetAddress: 用于表示网络上的硬件资源,即 IP 地址; URL: 统一资源定位符; Sockets: 使用 TCP 协议实现网络通信; Datagram: 使用 UDP 协议实现网络通信。
1、InetAddress
没有公有的构造函数,只能通过静态方法来创建实例。
InetAddress.getByName(String host);
InetAddress.getByAddress(byte[] address);
2、URL
可以直接从 URL 中读取字节流数据。
public static void main(String[] args) throws IOException {
URL url = new URL("http://www.baidu.com");
/* 字节流 */
InputStream is = url.openStream();
/* 字符流 */
InputStreamReader isr = new InputStreamReader(is, "utf-8");
/* 提供缓存功能 */
BufferedReader br = new BufferedReader(isr);
String line;
while ((line = br.readLine()) != null) {
System.out.println(line);
}
br.close();
}
3、Sockets
ServerSocket: 服务器端类 Socket: 客户端类 服务器和客户端通过 InputStream 和 OutputStream 进行输入输出。

4、Datagram
DatagramSocket: 通信类 DatagramPacket: 数据包类
2、常见问题
Java 字节读取流的read方法返回int的原因
https://blog.csdn.net/congwiny/article/details/18922847
六、Unix IO 模型
https://blog.xueqimiao.com/io/4a6150/
七、BIO 详解
https://blog.xueqimiao.com/io/4e01fb/
八、NIO 详解
https://blog.xueqimiao.com/io/3b29ae/
九、IO多路复用详解
https://blog.xueqimiao.com/io/963051/
十、AIO 异步IO详解
https://blog.xueqimiao.com/io/ef077c/
十一、N【A】IO框架 Netty
https://blog.xueqimiao.com/io/91462d/
十二、NIO 零拷贝实现
https://blog.xueqimiao.com/io/80cc73/




