如需转载,请根据 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 许可,附上本文作者及链接。
本文作者: 执笔成念
作者昵称: zbcn
本文链接: https://1363653611.github.io/zbcn.github.io/2020/10/04/IO_03%E7%AE%A1%E9%81%93%E6%B5%81/
PipedInputStream和PipedOutputStream
管道输入输出流也是一对内存操作可循环读写的数组的输入输出流,基本上读写的方法都是放在 PipedInputStream
这个对象中,在做读写操作的时候需要把输入输出流进行关联,否则不能做对应的读写操作。
管道流缓冲区模型图:
- 开始写入
开始读取
PipedInputStream
jdk官方的解释:一个管道输入流应连接到管道输出流; 管道输入流提供写入管道输出流的任何数据字节。通常情况下,数据是通过一个线程从一个 PipedInputStream
对象读取的,而其他线程将数据写入相应的 PipedOutputStream
。不建议尝试从单个线程使用这两个对象,因为它可能会使线程死锁。管道输入流包含一个缓冲区,在读写操作的范围内解耦读取操作。如果向连接的管道输出流提供数据字节的线程不再有效,那么这个管道也就可以认为是“broken“ 坏了.
对于这个死锁,因为下面的读写都是synchronized 同步,锁的是该对象。所以如果读写都处于同一个对象,当写的时候发现已经满了,然后就等待,但是唤醒不了其他的线程,循环进入等待,释放,然后又是循环,就进入了死锁状态,也消费不了,也写不了。后面会在代码里讲到。
PipedInputStream UML 图
PipedInputStream 成员变量
1 | boolean closedByWriter = false; //管道是否被写线程关闭 |
2 | volatile boolean closedByReader = false; //管道是否被读线程关闭 |
3 | boolean connected = false; //是否与输出流连接 |
4 | Thread readSide; //读线程 |
5 | Thread writeSide; //写线程 |
6 | private static final int DEFAULT_PIPE_SIZE = 1024; //默认的管道流字节数组的长度 为1024 |
7 | protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE ; //管道流字节数组的长度 |
8 | protected byte buffer[]; //管道流字节数组 |
9 | /** |
10 | * 管道输出流接收到数据的下一个字节在循环缓冲区中位置的索引。初始化时该值为-1。 |
11 | * 比如第一次写入100个字节,此时in =100。 |
12 | * 为0时则代表改数组为空,read线程阻塞。 |
13 | * 当in=out 时,表示该数组已满,此时写线程阻塞 |
14 | */ |
15 | protected int in = -1; |
16 | protected int out = 0; //管道输入流读取的写一个字节在循环缓冲区中位置的索引。初始化时改值为0。比如第一次读取10个字节,此时out =10 |
PipedInputStream 构造函数
1 | //构造方法只初始化一个数组,并不连接管道输出流。 |
2 | public PipedInputStream() { |
3 | initPipe(DEFAULT_PIPE_SIZE); |
4 | } |
5 | public PipedInputStream(int pipeSize) { |
6 | initPipe(pipeSize); |
7 | } |
8 | //初始化缓冲数组 |
9 | private void initPipe(int pipeSize) { |
10 | if (pipeSize <= 0) { |
11 | throw new IllegalArgumentException("Pipe Size <= 0"); |
12 | } |
13 | buffer = new byte[pipeSize]; |
14 | } |
15 | |
16 | // 下面这两个构造方法初始化数组并传入需要连接的管道输出流。 |
17 | public PipedInputStream(PipedOutputStream src) throws IOException { |
18 | this(src, DEFAULT_PIPE_SIZE); |
19 | } |
20 | public PipedInputStream(PipedOutputStream src, int pipeSize) throws IOException { |
21 | initPipe(pipeSize); |
22 | connect(src); // 连接管道输出流。调用的输出流的connect() |
23 | } |
24 | public void connect(PipedOutputStream src) throws IOException { |
25 | src.connect(this); |
26 | } |
关联管道流的方式
方式1
1 | PipedOutputStream pous = new PipedOutputStream (); |
2 | PipedInputStream pins = new PipedInputStream(pous); |
方式2
1 | PipedInputStream pins = new PipedInputStream(); |
2 | PipedOutputStream pous = new PipedOutputStream (pins); |
以上的方式最终调用的还是PipedOutputStream 的connect()这个方法。
connect方法
1 | public synchronized void connect(PipedInputStream snk) throws IOException { |
2 | if (snk == null) { |
3 | throw new NullPointerException(); |
4 | } else if (sink != null || snk.connected) { // 如果已经关联了输入流对象 或者connected 为true则抛异常 |
5 | throw new IOException("Already connected"); |
6 | } |
7 | sink = snk; //关联的管道输入流对象 |
8 | snk.in = -1; //初始化in |
9 | snk.out = 0; //初始化out |
10 | snk.connected = true; |
11 | } |
数据读取
单字节读取
1 | public synchronized int read() throws IOException { |
2 | if (!connected) { |
3 | throw new IOException("Pipe not connected"); |
4 | } else if (closedByReader) { |
5 | throw new IOException("Pipe closed"); |
6 | } else if (writeSide != null && !writeSide.isAlive() |
7 | && !closedByWriter && (in < 0)) { //如果写线程不为空,但是却不可用,并且写入管道状态处于未关闭,并且 in<0(初始状态,没有写入过数据) 。则该写线程已经挂掉了。 |
8 | throw new IOException("Write end dead"); |
9 | } |
10 | readSide = Thread.currentThread(); //读为当前线程 |
11 | int trials = 2; // |
12 | while (in < 0) { //如果没有数据 |
13 | if (closedByWriter) { |
14 | /* closed by writer, return EOF */ |
15 | return -1; //如果写操作线程已关闭,直接返回-1 |
16 | } |
17 | if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) { //尝试了3次之后 写线程不为空,但是却不可用 抛出管道已坏异常 |
18 | throw new IOException("Pipe broken"); |
19 | } |
20 | /* might be a writer waiting */ |
21 | notifyAll(); //唤醒其他的线程,这里有可能唤醒了写线程,开始写入数据。 |
22 | // 注意:如果这里读写是同一个线程,那么这里并不能唤醒其他线程,因为这里是对象锁,然后,等该线程等待了一秒钟之后,又会获取到锁,此时,in依然是-1.没有数据,依然进入循环,造成死锁。 |
23 | // 下面的写操作 receive() 也会造成死锁。 |
24 | try { |
25 | wait(1000); // 当前 等待一秒。释放锁资源 |
26 | } catch (InterruptedException ex) { |
27 | throw new java.io.InterruptedIOException(); |
28 | } |
29 | } |
30 | int ret = buffer[out++] & 0xFF; //获取下一个读出的字节 当前索引为out,下一个为out+1 |
31 | if (out >= buffer.length) { // 如果读取后,out元素的索引已经达到了数组的最大限制,则out =0 下次从头开始读。 |
32 | out = 0; |
33 | } |
34 | if (in == out) {//如果当前in=out 则此时代表数组已经消费完了,为空,则in =-1,下次需要写线程从头开始写。 |
35 | /* now empty */ |
36 | in = -1; |
37 | } |
38 | return ret; //返回读取到的字节。 |
39 | } |
将缓冲区的数据读到byte数组
1 | //原文解释:从缓冲区读取len长度的数据到一个字节数组中。如果读到末尾了,或者 len超过了缓冲区可读的长度, 则读取长度小于len。如果 len为0,则不可以读到任何字节,将该方法返回0。此外 这个方法也是阻塞的,直到至少有一个字节可以读,或者 检测到缓冲区已经到达末尾,或者出现异常了,否则都会阻塞。方法返回的是读取到的字节数。如果达到末尾,则返回-1 |
2 | public synchronized int read(byte b[], int off, int len) throws IOException { |
3 | if (b == null) { |
4 | throw new NullPointerException(); |
5 | } else if (off < 0 || len < 0 || len > b.length - off) { |
6 | throw new IndexOutOfBoundsException(); |
7 | } else if (len == 0) { |
8 | return 0; |
9 | } |
10 | /* possibly wait on the first character */ |
11 | int c = read(); // 调用read 去读取下一个可读字节。 可能是第一个字节 |
12 | if (c < 0) { //如果返回的是 -1,则证明已经达到缓冲区末尾了。返回-1 |
13 | return -1; |
14 | } |
15 | b[off] = (byte) c; // 将当前读到的c放入off 索引下。 |
16 | int rlen = 1; // 定义 读取到的返回字节数。当前已经读取到一个,次数设置为1 |
17 | while ((in >= 0) && (len > 1)) { // 如果 in>=0 证明还有可读数据 |
18 | int available; // 剩余可读数 |
19 | if (in > out) { //如果 in>out 则 可读的长度设置的为(in - out) |
20 | available = Math.min((buffer.length - out), (in - out)); |
21 | } else { //如果 in< out 则 可读设置为buffer.length - out; 这里不会出现in=out 因为如果有in= out,则数组为空。进到这里肯定是数组不为空! |
22 | available = buffer.length - out; |
23 | } |
24 | // A byte is read beforehand outside the loop |
25 | if (available > (len - 1)) { //如果可读长度大于len-1 ,因为刚已经读取了一个字节。 |
26 | available = len - 1; //则设置可读为len-1 ,因为一次性读不完。 |
27 | } |
28 | System.arraycopy(buffer, out, b, off + rlen, available); //将数组内容复制。 |
29 | out += available; // 当前out值增加available长度。 |
30 | rlen += available; // 读取到的字节数加上available |
31 | len -= available; //l下一次要读的长度减掉available ,如果len 已经<=0 则 不进入循环了。 |
32 | if (out >= buffer.length) { // 如果当前的out 索引已经达到了数组的最大长度,则将out回归0 ,下次重新读。 |
33 | out = 0; |
34 | } |
35 | if (in == out) { //如果此时in = out 则说明数组已经没有可读内容。 in 置为-1。 下次不再循环读。 |
36 | /* now empty */ |
37 | in = -1; |
38 | } |
39 | } |
40 | return rlen; |
41 | } |
写操作receive()方法
1 | //接收(写入)一个字节。这个方法会一直阻塞,如果一直没有输入流可用 |
2 | //如果管道已经坏了,未连接的话,抛出异常 |
3 | protected synchronized void receive(int b) throws IOException { |
4 | checkStateForReceive(); //校验管道状态 |
5 | writeSide = Thread.currentThread(); //设置写线程为当前线程 |
6 | if (in == out) //如果数组已经满了 则阻塞 ,唤醒读线程 |
7 | awaitSpace(); |
8 | if (in < 0) { //如果缓冲区为空 ,则设置in 和out为0 |
9 | in = 0; |
10 | out = 0; |
11 | } |
12 | buffer[in++] = (byte)(b & 0xFF); |
13 | if (in >= buffer.length) { // 如果写到了数组的末尾,则重新写 |
14 | in = 0; |
15 | } |
16 | } |
17 | //校验管道和读线程 |
18 | private void checkStateForReceive() throws IOException { |
19 | if (!connected) { |
20 | throw new IOException("Pipe not connected"); |
21 | } else if (closedByWriter || closedByReader) { |
22 | throw new IOException("Pipe closed"); |
23 | } else if (readSide != null && !readSide.isAlive()) { |
24 | throw new IOException("Read end dead"); |
25 | } |
26 | } |
27 | //等待并唤醒读线程 |
28 | private void awaitSpace() throws IOException { |
29 | while (in == out) { |
30 | checkStateForReceive(); //校验管道和读线程 |
31 | /* full: kick any waiting readers */ |
32 | notifyAll(); //唤醒读线程 |
33 | try { |
34 | wait(1000); //锁等待,释放资源。 |
35 | } catch (InterruptedException ex) { |
36 | throw new java.io.InterruptedIOException(); |
37 | } |
38 | } |
39 | } |
40 | |
41 | //接收数据到字节数组(缓冲区),这个方法是阻塞的,直到有输入流(读线程)是可用的。 |
42 | synchronized void receive(byte b[], int off, int len) throws IOException { |
43 | checkStateForReceive(); //校验管道和读线程 |
44 | writeSide = Thread.currentThread(); //设置writeSide 为当前线程 |
45 | int bytesToTransfer = len; //定义本次期望写的字节的长度 |
46 | while (bytesToTransfer > 0) { |
47 | if (in == out) //如果发现缓冲区已经满了。阻塞,等待读线程读取(消费)数据。 |
48 | awaitSpace(); |
49 | int nextTransferAmount = 0; //设置最多可以写的长度 |
50 | if (out < in) { // 如果当前写入的索引大于当前读取的索引,则最大可写的长度buffer.length - in。 |
51 | nextTransferAmount = buffer.length - in; |
52 | } else if (in < out) {//如果当前写入的索引小于当前读取的索引 ,如果数组为空,则数组读写都从头开始。不为空,则可写的空间为out - in。 |
53 | if (in == -1) { |
54 | in = out = 0; |
55 | nextTransferAmount = buffer.length - in; |
56 | } else { |
57 | nextTransferAmount = out - in; |
58 | } |
59 | } |
60 | if (nextTransferAmount > bytesToTransfer) |
61 | nextTransferAmount = bytesToTransfer; //比较当前最大可写的长度,和期望写入的长度,最终可写的长度为最小的为准。 |
62 | assert(nextTransferAmount > 0); |
63 | System.arraycopy(b, off, buffer, in, nextTransferAmount); |
64 | bytesToTransfer -= nextTransferAmount; // 期望的值减去已经写入的字节数。 |
65 | off += nextTransferAmount; |
66 | in += nextTransferAmount; //当前写入的索引后移本次写入长度个位置 |
67 | if (in >= buffer.length) { //如果写到了末尾,则从开头写入。设置为0。 |
68 | in = 0; |
69 | } |
70 | } |
71 | } |
关闭 closed()
1 | public void close() throws IOException { |
2 | closedByReader = true; //设置读关闭。 |
3 | synchronized (this) { |
4 | in = -1; |
5 | } |
6 | } |
PipedOutputStream
PipedOutputStream,管道流的操作基本都在PipedInputStream类实现的。PipedOutputStream的操作很少。所以基本上没什么可说的,我们来看下PipedOutputStreamuml图
输入流中主要的就是写操作,但是真实的操作还是在PipedInputStream类中实现的receive()。 我们来看下源码。
1 | public void write(int b) throws IOException { |
2 | if (sink == null) { |
3 | throw new IOException("Pipe not connected"); |
4 | } |
5 | sink.receive(b); //调用PipedInputStream.receive(int b) |
6 | } |
7 | |
8 | public void write(byte b[], int off, int len) throws IOException { |
9 | if (sink == null) { |
10 | throw new IOException("Pipe not connected"); |
11 | } else if (b == null) { |
12 | throw new NullPointerException(); |
13 | } else if ((off < 0) || (off > b.length) || (len < 0) || |
14 | ((off + len) > b.length) || ((off + len) < 0)) { |
15 | throw new IndexOutOfBoundsException(); |
16 | } else if (len == 0) { |
17 | return; |
18 | } |
19 | sink.receive(b, off, len); //调用PipedInputStream.receive(byte b[], int off, int len) |
20 | } |
21 | |
22 | public synchronized void flush() throws IOException { |
23 | if (sink != null) { |
24 | synchronized (sink) { // 关联的PipedInpuStream 实例对象锁 |
25 | sink.notifyAll(); // 通知任何等待在管道缓冲区的读线程 |
26 | } |
27 | } |
28 | } |
关闭流方法,关联输入流 closed()
1 | public void close() throws IOException { |
2 | if (sink != null) { |
3 | sink.receivedLast(); //写入最后未完成的写操作 |
4 | } |
5 | } |
6 | synchronized void receivedLast() { |
7 | closedByWriter = true; |
8 | notifyAll(); //唤醒所有等待的线程,告知最后一次写入已经完成了。 |
9 | } |
为什么读写都要放在PipedInputStream中,为什么要这样设计呢?
管道流对缓冲区的读写的线程是需要做到读写阻塞的,那如果要达到阻塞怎么样控制到读和写的同步呢,我们看到在PipedInpuStream 和PipedOutpuStream
都是用synchronized来进行线程同步的,这是一个互斥锁,并且在PipedInpuStream
中和 PipedOutpuStream
类中synchronized 锁对象都是this 是类的实例。读写都在 PipedInpuStream
中可以很好的控制读写的互斥。如果分开在PipedInpuStream
中和 PipedOutpuStream
中实现的话,不能做到读写互斥。
同一个线程出现死锁
1 | // 由于 读取和写入都是使用 PipedInputStream 对象锁,所以回出现死锁现象 |
2 | private static void singleThreadDemo() throws IOException { |
3 | PipedOutputStream pos = new PipedOutputStream(); |
4 | PipedInputStream pis = new PipedInputStream(pos); |
5 | //读取字节 |
6 | byte[] r = new byte[30]; |
7 | //写 |
8 | byte[] w = new byte[1024]; |
9 | |
10 | Thread thread = new Thread(() -> { |
11 | try { |
12 | int i = 1; |
13 | while (i<200){ |
14 | Arrays.fill(w,(byte)1); |
15 | pos.write(w,0,1000); |
16 | System.out.println("第"+i+"次写入"+1000+"个字节,可读:"+pis.available()); |
17 | int j =0; |
18 | j= pis.read(r,0,30); |
19 | System.out.println("第"+i+"次读到:"+j+"个字节"); |
20 | i++; |
21 | Thread.sleep(new Random().nextInt(100)); |
22 | } |
23 | } catch (IOException e) { |
24 | e.printStackTrace(); |
25 | } catch (InterruptedException e) { |
26 | e.printStackTrace(); |
27 | } |
28 | }); |
29 | thread.start(); |
30 | } |
多线成 非死锁
1 | private static void multiThread() throws IOException { |
2 | |
3 | PipedOutputStream pos = new PipedOutputStream(); |
4 | PipedInputStream pis = new PipedInputStream(pos); |
5 | //读取字节 |
6 | byte[] r = new byte[30]; |
7 | //写 |
8 | byte[] w = new byte[1024]; |
9 | //读取 |
10 | executor.execute(()->{ |
11 | try { |
12 | int i = 1; |
13 | while (i<200){ |
14 | int j =0; |
15 | j= pis.read(r,0,30); |
16 | System.out.println("第"+i+"次读到:"+j+"个字节"); |
17 | i++; |
18 | Thread.sleep(new Random().nextInt(100)); |
19 | } |
20 | } catch (IOException | InterruptedException e) { |
21 | e.printStackTrace(); |
22 | } |
23 | }); |
24 | //写入 |
25 | executor.execute(() ->{ |
26 | try { |
27 | int i = 1; |
28 | while (i<200){ |
29 | Arrays.fill(w,(byte)1); |
30 | pos.write(w,0,1000); |
31 | System.out.println("第"+i+"次写入"+1000+"个字节,可读:"+pis.available()); |
32 | Thread.sleep(new Random().nextInt(100)); |
33 | } |
34 | } catch (IOException e) { |
35 | e.printStackTrace(); |
36 | } catch (InterruptedException e) { |
37 | e.printStackTrace(); |
38 | }finally { |
39 | try { |
40 | pos.close(); |
41 | } catch (IOException e) { |
42 | e.printStackTrace(); |
43 | } |
44 | } |
45 | }); |
46 | |
47 | executor.shutdown(); |
48 | } |
完美执行。