
- 1、BIO
- 1.1、简介
- 1.2、案例演示
- 1.3、总结
- 2、NIO
- 2.1、简介
- 2.2、NIO与BIO比较
- 2.3、Buffer
- 2.3.1、简单使用
- 2.3.2、基本介绍
- 2.3.3、Buffer及其子类
- 2.3.4、ByteBuffer 正确使用姿势
- 2.3.5、ByteBuffer结构
- 2.3.6、ByteBuffer常见方法
- 2.3.7、Buffer 注意事项和补充
- 2.3.8、粘包半包案例
- 2.4、Channel
- 2.4.1、基本介绍
- 2.4.2、FileChannel
- FileChannel 工作模式
- 获取
- 强制写入
- 2.4.3、文件编程1:本地文件读写数据
- 2.4.4、文件编程2:使用一个Buffer完成文件读写
- 2.4.5、文件编程3:拷贝文件 transferFrom
- 2.5、Selector
- 2.5.1、基本介绍
- 2.5.2、NIO非阻塞网络编程原理
- 2.5.3、案例
- 2.7、NIO网络编程案例之一次无法写完的例子
- 2.8、NIO网络编程案例之消息边界问题
- 2.9、NIO网络编程案例之多线程优化
- 3、如何理解同步异步,阻塞非阻塞?
- 4、零拷贝问题
- 4.1、传统IO
- 4.2、NIO 优化
- 4.3、linux 2.1优化
- 4.4、linux 2.4优化
- 4.5、总结
BIO是同步并阻塞(传统阻塞型)的,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销(可以通过线程池机制改善,实现多个客户连接服务器)
- 应用场景:适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4 以前的唯一选择,但程序简单易理解。
图示
Java BIO就是传统的Java IO编程,其相关类和接口都在java.io包中,其开发步骤大致分为如下几步
- 服务端启动一个ServerSocket
- 客户端启动Socket对服务器进行通信,默认情况下服务端需要对每个客户建立与之对应的通讯线程。
- 客户端发出请求后,会询问服务器是否有线程来响应,如果没有则会等待,或者直接被拒绝;如果有响应,客户端线程会等待请求结束后,再继续执行
这里实现一个BIO的案例,服务端监听8888端口,客户端直接使用Windows自带的Telnet工具连接
public class BioServer {
@SuppressWarnings("InfiniteLoopStatement")
public static void main(String[] args) throws Exception {
//获取系统处理器个数,作为线程池数量
int nThreads = Runtime.getRuntime().availableProcessors();
//显式创建线程池
ExecutorService pool = new ThreadPoolExecutor(nThreads, 200,
0L, TimeUnit.MILLISECONDS,
new linkedBlockingQueue<>(1024), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
ServerSocket serverSocket = new ServerSocket(8888);
System.out.println("socket开启");
while (true) {
System.out.println("线程信息id =" + Thread.currentThread().getId() + " 名字 =" + Thread.currentThread().getName() + " 等待连接....");
final Socket socket = serverSocket.accept();
System.out.println("连接到一个客户端");
pool.execute(() -> handler(socket));
}
}
private static void handler(Socket socket) {
try {
byte[] bytes = new byte[1024];
//通过 socket 获取输入流
InputStream inputStream = socket.getInputStream();
//循环的读取客户端发送的数据
while (true) {
System.out.println("线程信息id =" + Thread.currentThread().getId() + " 名字 =" + Thread.currentThread().getName() + " read....");
//获取当前内容长度
int read = inputStream.read(bytes);
if (read != -1) {
//输出客户端发送的数据
System.out.println(new String(bytes, 0, read));
} else {
break;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println("关闭和client的连接");
try {
socket.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
将程序启动后,打开cmd命令窗口,输入
telnet 127.0.0.1 8888
可以看到控制台提示连接到一个客户端
服务端开启后,accept方法是一直阻塞的,直到一个客户端连接上来后,才会继续执行,这时候的Main线程会使用线程池创建了一个新的线程执行handler方法,用于处理与客户端之间的通信。然后Main线程重新进入循环,再accept的时候继续阻塞等待下一次客户端连接
使用Telnet发送一条Hello消息
使用inputStream.read方法同样也是阻塞的,一直死等客户端发送消息,收到后才会继续执行
1.3、总结重新开启一个cmd窗口,再次使用Telnet工具连接服务,可以看到Main线程又开启了一个新的线程来处理这次连接
- 每个请求都需要创建独立的线程,与对应的客户端进行数据 Read,业务处理,数据 Write 。
- 当并发数较大时,需要创建大量线程来处理连接,系统资源占用较大。
- 连接建立后,如果当前线程暂时没有数据可读,则线程就阻塞在 Read *** 作上,造成线程资源浪费
NIO是同步非阻塞的,服务器实现模式为一个线程处理多个请求(连接),即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有 I/O 请求就进行处理
- 应用场景:适用于连接数目多且连接比较短(轻 *** 作)的架构,比如聊天服务器,d幕系统,服务器间通讯等。 编程比较复杂,JDK1.4 开始支持。
图示
Java NIO 全称 java non-blocking IO,是指 JDK 提供的新 API。从 JDK1.4 开始,Java 提供了一系列改进的 输入/输出的新特性,被统称为 NIO(即 New IO),是同步非阻塞, 相关类都被放在 java.nio 包及子包下,并且对原 java.io 包中的很多类进行改写。
-
NIO 是面向缓冲区或块编程的。数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动,这就增加了处理过程中的灵活性,使用它可以提供非阻塞式的高伸缩性网络。
-
一个线程可以从某通道发送请求或者读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用,就什么都不会获取,线程并不会阻塞。所以直至数据变化到可以读取之前,该线程可以继续做其他的事情。
-
非阻塞写也是如此,一个线程请求写入一些数据到某通道,但不需要等待它完全写入, 这个线程同时可以去做别的事情,这就做到了用一个线程来处理多个 *** 作。
如图中所看到的那样,NIO有三大核心部分:Channel(通道),Buffer(缓冲区),Selector(选择器)
- 每个 Channel 都会对对应一个Buffer
- 一个Selector对应一个线程,一个线程对应多个Channel,对于一个Selector如何切换到某一个Channel是由一个Event事件决定的,这是一个很重要的概念,后面会提到,Selector会根据不同的事件再不同的Channel之间切换
- Buffer 就是一个内存块 , 底层是有一个数组,区别于BIO,输入流和输出流是分开的,不能混用,Buffer是可以读也可以写,需要使用flip方法动态切换
-
BIO 以流的方式处理数据,而 NIO 以块的方式处理数据,块 I/O 的效率比流 I/O 高很多
-
BIO 是阻塞的,NIO 则是非阻塞的
-
BIO 基于字节流和字符流进行 *** 作,而 NIO 基于 Channel(通道)和 Buffer(缓冲区)进行 *** 作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择器)用于监听多个通道的事件(比如:连接请求, 数据到达等),因此使用单个线程就可以监听多个客户端通道
private static void basicBufferTest() {
IntBuffer intBuffer = IntBuffer.allocate(5);
//向 buffer 存放数据
intBuffer.put(10);
intBuffer.put(11);
intBuffer.put(12);
intBuffer.put(13);
intBuffer.put(14);
//将 buffer 转换,读写切换(!!!)
intBuffer.flip();
while (intBuffer.hasRemaining()) {
System.out.println(intBuffer.get());
}
}
2.3.2、基本介绍
2.3.3、Buffer及其子类缓冲区本质上是一个可以读写数据的内存块,可以理解成是一个容器对象(含数组),该对象提供了一组方法,可以更轻松地使用内存块,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化, Channel 提供从文件、网络读取数据的渠道,但是读取或写入的数据都必须经由Buffer,如图
在 NIO 中,Buffer 是一个顶层父类,它是一个抽象类, 类的层级关系图如下
在Buffer中,定义了四个属性用于提供关于其所包含的数据元素的信息,其中四个属性恒定满足关系:mark <= position <= limit <= capacity
private int mark = -1; private int position = 0; private int limit; private int capacity;
通过Buffer的简单使用Demo来动态观察一些这几个属性:
其中flip方法就是用来重置limit和position的,用于从写模式转换为读模式
public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}
从读模式转换为写模式就需要调用clear或compact方法
Buffer类相关主要方法汇总
public abstract class Buffer {
//JDK1.4时,引入的api
//返回此缓冲区的容量
public final int capacity();
//返回此缓冲区的位置
public final int position();
//设置此缓冲区的位置
public final Buffer position (int newPositio);
//返回此缓冲区的限制
public final int limit();
//设置此缓冲区的限制
public final Buffer limit(int newLimit);
//在此缓冲区的位置设置标记
public final Buffer mark();
//将此缓冲区的位置重置为以前标记的位置
public final Buffer reset();
//清除此缓冲区, 即将各个标记恢复到初始状态,但是数据并没有真正擦除, 后面 *** 作会覆盖
public final Buffer clear();
//反转此缓冲区
public final Buffer flip();
//重绕此缓冲区
public final Buffer rewind();
//返回当前位置与限制之间的元素数
public final int remaining();
//告知在当前位置和限制之间是否有元素
public final boolean hasRemaining();
//告知此缓冲区是否为只读缓冲区
public abstract boolean isReadOnly();
//JDK1.6时引入的api
//告知此缓冲区是否具有可访问的底层实现数组
public abstract boolean hasArray();
//返回此缓冲区的底层实现数组
public abstract Object array();
//返回此缓冲区的底层实现数组中第一个缓冲区元素的偏移量
public abstract int arrayOffset();
//告知此缓冲区是否为直接缓冲区
public abstract boolean isDirect();
}
2.3.4、ByteBuffer 正确使用姿势
- 向 buffer 写入数据,例如调用 channel.read(buffer)
- 调用 flip() 切换至读模式
- 从 buffer 读取数据,例如调用 buffer.get()
- 调用 clear() 或 compact() 切换至写模式
- 重复 1~4 步骤
ByteBuffer 有以下重要属性
- capacity
- position
- limit
一开始
写模式下,position 是写入位置,limit 等于容量,下图表示写入了 4 个字节后的状态
flip 动作发生后,position 切换为读取位置,limit 切换为读取限制
读取 4 个字节后,状态
clear 动作发生后,状态
2.3.6、ByteBuffer常见方法compact 方法,是把未读完的部分向前压缩,然后切换至写模式
分配空间
- allocate:返回java.nio.HeapByteBuffer,位于java 堆内存,读写效率较低,受到 GC 的影响
- allocateDirect:返回java.nio.DirectByteBuffer,位于直接内存,读写效率高(少一次拷贝),不会受 GC 影响,分配的效率低,但是使用不当可能会出现内存移除,需要手动释放
向 buffer 写入数据有两种办法
- 调用 channel 的 read 方法,会从channel中读取数据,自动调用buffer的put方法
- 直接调用 buffer 自己的 put 方法
int readBytes = channel.read(buf);
buf.put((byte)127);
从 buffer 读取数据同样有两种办法
- 调用 channel 的 write 方法,自动调用buffer的get方法,往channel中写入
- 调用 buffer 自己的 get 方法
int writeBytes = channel.write(buf);
byte b = buf.get();
get 方法会让 position 读指针向后走,如果想重复读取数据
- 可以调用 rewind 方法将 position 重新置为 0
- 或者调用 get(int i) 方法获取索引 i 的内容,它不会移动读指针
mark 和 reset
mark 是在读取时,做一个标记,即使 position 改变,只要调用 reset 就能回到 mark 的位置
- 注意rewind 和 flip 都会清除 mark 位置
System.out.println((char) buffer.get()); System.out.println((char) buffer.get()); buffer.mark(); // 加标记,索引2 的位置 System.out.println((char) buffer.get()); System.out.println((char) buffer.get()); buffer.reset(); // 将 position 重置到索引 2 System.out.println((char) buffer.get()); System.out.println((char) buffer.get());
字符串与 ByteBuffer 互转
ByteBuffer buffer1 = StandardCharsets.UTF_8.encode("你好");
ByteBuffer buffer2 = Charset.forName("utf-8").encode("你好");
CharBuffer buffer3 = StandardCharsets.UTF_8.decode(buffer1);
System.out.println(buffer3.getClass());//class java.nio.HeapCharBuffer
System.out.println(buffer3.toString());//你好
2.3.7、Buffer 注意事项和补充
- ByteBuffer 支持类型化的 put 和 get, put 放入的是什么数据类型,get 就应该使用相应的数据类型来取出,否则可能有 BufferUnderflowException 异常。
private static void byteBufferTest() {
//创建一个 Buffer
ByteBuffer buffer = ByteBuffer.allocate(64);
//类型化方式放入数据
buffer.putInt(100);
buffer.putLong(9);
buffer.putChar('彭');
buffer.putShort((short) 4);
//取出
buffer.flip();
System.out.println();
System.out.println(buffer.getInt());
System.out.println(buffer.getLong());
System.out.println(buffer.getChar());
System.out.println(buffer.getShort());
}
- 一个Buffer可以被转换为只读Buffer,只读Buffer进行写 *** 作的时候,会抛出ReadOnlyBufferException异常
private static void readOnlyBufferTest() {
//创建一个 buffer
ByteBuffer buffer = ByteBuffer.allocate(64);
for (int i = 0; i < 64; i++) {
buffer.put((byte) i);
}
//读取
buffer.flip();
//得到一个只读的 Buffer
ByteBuffer readonlyBuffer = buffer.asReadOnlyBuffer();
System.out.println(readOnlyBuffer.getClass());
//读取
while (readOnlyBuffer.hasRemaining()) {
System.out.println(readOnlyBuffer.get());
}
//ReadOnlyBufferException
readOnlyBuffer.put((byte) 100);
}
- NIO 还提供了 MappedByteBuffer, 可以让文件直接在内存(堆外的内存)中进行修改, 而如何同步到文件由 NIO 来完成
public static void mappedByteBufferTest() throws Exception {
RandomAccessFile randomAccessFile = new RandomAccessFile("1.txt", "rw");
//获取对应的通道
FileChannel channel = randomAccessFile.getChannel();
MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 5);
mappedByteBuffer.put(0, (byte) 'H');
mappedByteBuffer.put(3, (byte) '9');
//IndexOutOfBoundsException
mappedByteBuffer.put(5, (byte) 'Y');
randomAccessFile.close();
System.out.println("修改成功~~");
}
- 前面我们讲的读写 *** 作,都是通过一个 Buffer 完成的,NIO 还支持通过多个 Buffer (即 Buffer 数组) 完成读 写 *** 作,即 Scattering (分散)和 Gathering(聚合)
public static void scatteringAndGatheringDemo() throws Exception {
//使用 ServerSocketChannel 和 SocketChannel 网络
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
InetSocketAddress inetSocketAddress = new InetSocketAddress(7000);
//绑定端口到 socket ,并启动
serverSocketChannel.socket().bind(inetSocketAddress);
//创建 buffer 数组
ByteBuffer[] byteBuffers = new ByteBuffer[2];
byteBuffers[0] = ByteBuffer.allocate(5);
byteBuffers[1] = ByteBuffer.allocate(3);
//等客户端连接(telnet)
SocketChannel socketChannel = serverSocketChannel.accept();
//假定从客户端接收 8 个字节
int messageLength = 8;
//循环的读取
while (true) {
int byteRead = 0;
while (byteRead < messageLength) {
long l = socketChannel.read(byteBuffers);
//累计读取的字节数
byteRead += l;
System.out.println("byteRead=" + byteRead);
//使用流打印, 看看当前的这个 buffer 的 position 和 limit
Arrays.stream(byteBuffers).map(buffer -> "position=" + buffer.position() + ", limit=" + buffer.limit()).forEach(System.out::println);
}
//将所有的 buffer 进行 flip
Arrays.asList(byteBuffers).forEach(Buffer::flip);
//将数据读出显示到客户端
long byteWrite = 0;
while (byteWrite < messageLength) {
long l = socketChannel.write(byteBuffers);
byteWrite += l;
}
//将所有的 buffer 进行 clear
Arrays.asList(byteBuffers).forEach(Buffer::clear);
System.out.println("byteRead:=" + byteRead + " byteWrite=" + byteWrite + ", messageLength=" +
messageLength);
}
}
byteRead=1 postion=1, limit=5 postion=0, limit=3 byteRead=2 postion=2, limit=5 postion=0, limit=3 byteRead=3 postion=3, limit=5 postion=0, limit=3 byteRead=4 postion=4, limit=5 postion=0, limit=3 byteRead=5 postion=5, limit=5 postion=0, limit=3 byteRead=6 postion=5, limit=5 postion=1, limit=3 byteRead=7 postion=5, limit=5 postion=2, limit=3 byteRead=8 postion=5, limit=5 postion=3, limit=3 byteRead:=8 byteWrite=8, messagelength82.3.8、粘包半包案例
网络上有多条数据发送给服务端,数据之间使用 n 进行分隔
但由于某种原因这些数据在接收时,被进行了重新组合,例如原始数据有3条为
- Hello,worldn
- I’m zhangsann
- How are you?n
变成了下面的两个 byteBuffer (黏包,半包)
- Hello,worldnI’m zhangsannHo
- w are you?n
现在要求你编写程序,将错乱的数据恢复成原始的按 n 分隔的数据
public static void main(String[] args) {
ByteBuffer source = ByteBuffer.allocate(32);
// 11 24
source.put("Hello,worldnI'm zhangsannHo".getBytes());
split(source);
source.put("w are you?nhaha!n".getBytes());
split(source);
}
private static void split(ByteBuffer source) {
source.flip();
int oldLimit = source.limit();
for (int i = 0; i < oldLimit; i++) {
if (source.get(i) == 'n') {
System.out.println(i);
ByteBuffer target = ByteBuffer.allocate(i + 1 - source.position());
// 0 ~ limit
source.limit(i + 1);
// 从source 读,向 target 写
target.put(source);
source.limit(oldLimit);
}
}
source.compact();
}
2.4、Channel
2.4.1、基本介绍
Channel区别于流:
- 通道可以同时进行读写,而流只能读或者只能写
- 通道可以实现异步读写数据
- 通道可以从缓冲读数据,也可以写数据到缓冲
2.4.2、FileChannel FileChannel 工作模式Channel在NIO中是一个接口,常用的Channel实现类有:FileChannel(用于文件读写))、 DatagramChannel(UDP数据读写) 、 ServerSocketChannel 和 SocketChannel (用于TCP数据读写,ServerSocketChannel类似 ServerSocket , SocketChannel 类似Socket)
获取FileChannel 只能工作在阻塞模式下
不能直接打开 FileChannel,必须通过 FileInputStream、FileOutputStream 或者 RandomAccessFile 来获取 FileChannel,它们都有 getChannel 方法
- 通过 FileInputStream 获取的 channel 只能读
- 通过 FileOutputStream 获取的 channel 只能写
- 通过 RandomAccessFile 是否能读写根据构造 RandomAccessFile 时的读写模式决定
*** 作系统出于性能的考虑,会将数据缓存,不是立刻写入磁盘。可以调用 force(true) 方法将文件内容和元数据(文件的权限等信息)立刻写入磁盘
2.4.3、文件编程1:本地文件读写数据
public class FileChannelDemo {
public static void main(String[] args) throws Exception {
fileChannelWrite();
fileChannelRead();
}
private static void fileChannelRead() throws Exception {
//创建文件的输入流
File file = new File("d:\file.txt");
FileInputStream fileInputStream = new FileInputStream(file);
//通过 fileInputStream 获取对应的 FileChannel -> 实际类型 FileChannelImpl
FileChannel fileChannel = fileInputStream.getChannel();
//创建缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate((int) file.length());
//将通道的数据读入到 Buffer
fileChannel.read(byteBuffer);
//将 byteBuffer 的 字节数据 转成 String
System.out.println(new String(byteBuffer.array()));
fileInputStream.close();
}
public static void fileChannelWrite() throws Exception {
String str = "hello world";
//创建一个输出流->channel
FileOutputStream fileOutputStream = new FileOutputStream("d:\file.txt");
//通过 fileOutputStream 获取 对应的 FileChannel
//这个 fileChannel 真实 类型是 FileChannelImpl
FileChannel fileChannel = fileOutputStream.getChannel();
//创建一个缓冲区 ByteBuffer
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//将 str 放入 byteBuffer
byteBuffer.put(str.getBytes());
//对 byteBuffer 进行 flip
byteBuffer.flip();
//将 byteBuffer 数据写入到 fileChannel
fileChannel.write(byteBuffer);
fileOutputStream.close();
}
}
2.4.4、文件编程2:使用一个Buffer完成文件读写
private static void fileChannelReadAndWrite() throws Exception {
FileInputStream fileInputStream = new FileInputStream("d:\file.txt");
FileChannel fileChannel01 = fileInputStream.getChannel();
FileOutputStream fileOutputStream = new FileOutputStream("d:\file2.txt");
FileChannel fileChannel02 = fileOutputStream.getChannel();
ByteBuffer byteBuffer = ByteBuffer.allocate(512);
//循环读取
while (true) {
//这里有一个重要的 *** 作,一定不要忘了
//清空 buffer
byteBuffer.clear();
int read = fileChannel01.read(byteBuffer);
System.out.println("read =" + read);
//表示读完
if (read == -1) {
break;
}
//将 buffer 中的数据写入到 fileChannel02 -- 2.txt
byteBuffer.flip();
fileChannel02.write(byteBuffer);
}
//关闭相关的流
fileInputStream.close();
fileOutputStream.close();
}
2.4.5、文件编程3:拷贝文件 transferFrom
private static void fileChannelCopy() throws Exception {
FileInputStream fileInputStream = new FileInputStream("d:\file.txt");
FileOutputStream fileOutputStream = new FileOutputStream("d:\file2.txt");
//获取各个流对应的 fileChannel
FileChannel sourceCh = fileInputStream.getChannel();
FileChannel destCh = fileOutputStream.getChannel();
//使用 transferForm 完成拷贝
destCh.transferFrom(sourceCh, 0, sourceCh.size());
//关闭相关通道和流
sourceCh.close();
destCh.close();
fileInputStream.close();
fileOutputStream.close();
}
2.5、Selector
2.5.1、基本介绍
Java NIO在使用一个线程处理多个的客户端连接,就会使用到 Selector(选择器) ,它能够检测多个注册的通道上是否有事件发生(注意:多个 Channel 以事件的方式可以注册到同一个 Selector),如果有事件发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道,也就是管理多个连接和请求, 只有真正有读写事件发生时,才会进行读写,就大大地减少了系统开销,并且不必为每个连接都 创建一个线程,不用去维护多个线程,更避免了多线程之间的上下文切换导致的开销
Selector是一个接口,源码如下
public abstract class Selector implements Closeable {
protected Selector() { }
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}
public abstract boolean isOpen();
public abstract SelectorProvider provider();
public abstract Set keys();
public abstract Set selectedKeys();
public abstract int selectNow() throws IOException;
public abstract int select(long timeout)
throws IOException;
public abstract int select() throws IOException;
public abstract Selector wakeup();
public abstract void close() throws IOException;
}
相关常用方法解释:
- open():得到一个选择器对象。
- select():阻塞的监控所有注册的通道,当其中有IO *** 作可以进行时,将对应的SelectionKey加入到内部集合并返回。
- select(long timeout):,参数用来设置超时时间,超时后返回。
- selectNow():不阻塞,直接返回。
- keys():获取所有注册到当前Selector上面的SelectionKey
- selectedKeys():从内部集合中得到所有的发生事件的SelectionKey。
- wakeup():唤醒Selector。
流程如下
- 当客户端连接,会通过ServerSocketChannel得到SocketChannel
- 得到的SocketChannel会调用register(Selector sel,int ops)方法,将自己注册到目标Selector上面
- 注册成功后,会返回一个SelectionKey对象,该对象会和目标Selector相关联,(Selector内部维护了其管理的所有SelectionKey的集合)
- Selector会进行执行select方法监听Channel中的事件发生
- 当有事件发生,会得到对应的SelectionKey,通过SelectionKey可以调用channel()方法反向获取SocketChannel,完成业务处理
绑定的Channel事件类型可以有
- connect - 客户端连接成功时触发
- accept - 服务器端成功接受连接时触发
- read - 数据可读入时触发,有因为接收能力弱,数据暂不能读入的情况
- write - 数据可写出时触发,有因为发送能力弱,数据暂不能写出的情况
- NIOServer
@SuppressWarnings("InfiniteLoopStatement")
public class NioServer {
public static void main(String[] args) throws Exception {
//创建 ServerSocketChannel -> ServerSocket
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//得到一个 Selector 对象
Selector selector = Selector.open();
//绑定一个端口 6666, 在服务器端监听
serverSocketChannel.socket().bind(new InetSocketAddress(6666));
//设置为非阻塞
serverSocketChannel.configureBlocking(false);
//把 serverSocketChannel 注册到 selector 关心 事件为 OP_ACCEPT
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//循环等待客户端连接
while (true) {
//这里我们等待 1 秒,如果没有事件发生, 返回
if (selector.select(1000) == 0) {
System.out.println("服务器等待了 1 秒,无连接");
continue;
}
//1.如果返回的>0, 表示已经获取到关注的事件, selectedKeys()返回关注事件的集合
Set selectionKeys = selector.selectedKeys();
Iterator keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
//根据 key 对应的通道发生的事件做相应处理
//如果是 OP_ACCEPT, 有新的客户端连接
if (key.isAcceptable()) {
//该该客户端生成一个 SocketChannel
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println(" 客 户 端 连 接 成 功 生 成 了 一 个 socketChannel " + socketChannel.hashCode());
//将 SocketChannel 设置为非阻塞
socketChannel.configureBlocking(false);
//将 socketChannel 注册到 selector, 关注事件为 OP_READ, 同时给 socketChannel
//关联一个 Buffer
socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
}
//发生 OP_READ
if (key.isReadable()) {
//通过 key 反向获取到对应 channel
SocketChannel channel = (SocketChannel) key.channel();
//获取到该 channel 关联的 buffer
ByteBuffer buffer = (ByteBuffer) key.attachment();
channel.read(buffer);
System.out.println("form 客户端 " + new String(buffer.array()));
}
//手动从集合中移动当前的 selectionKey, 防止重复 *** 作
keyIterator.remove();
}
}
}
}
- NIOClient
public class NioClient {
public static void main(String[] args) throws Exception {
//得到一个网络通道
SocketChannel socketChannel = SocketChannel.open();
//设置非阻塞
socketChannel.configureBlocking(false);
//提供服务器端的 ip 和 端口
InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);
//连接服务器
if (!socketChannel.connect(inetSocketAddress)) {
while (!socketChannel.finishConnect()) {
System.out.println("因为连接需要时间,客户端不会阻塞,可以做其它工作..");
}
}
//...如果连接成功,就发送数据
String str = "hello world";
//直接根据byte数组大小动态创建一个ByteBuffer
ByteBuffer buffer = ByteBuffer.wrap(str.getBytes());
//发送数据,将 buffer 数据写入 channel
socketChannel.write(buffer);
}
}
2.7、NIO网络编程案例之一次无法写完的例子
客户端
@SuppressWarnings("InfiniteLoopStatement")
public class NioClient {
public static void main(String[] args) throws Exception {
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(false);
sc.connect(new InetSocketAddress("localhost", 8080));
Selector selector = Selector.open();
sc.register(selector, SelectionKey.OP_ConNECT + SelectionKey.OP_READ);
while (true) {
selector.select();
Set keySet = selector.selectedKeys();
for (SelectionKey selectionKey : keySet) {
if (selectionKey.isConnectable()) {
System.out.println(sc.finishConnect());
} else if (selectionKey.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
int read = sc.read(buffer);
System.out.println(read);
System.out.println(buffer);
}
}
}
}
}
服务端
@SuppressWarnings("InfiniteLoopStatement")
public class NioServer {
public static void main(String[] args) throws Exception {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress(8080));
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT, ByteBuffer.allocate(16));
while (true) {
selector.select();
Set keySet = selector.selectedKeys();
Iterator keyIterator = keySet.iterator();
while (keyIterator.hasNext()) {
SelectionKey selectionKey = keyIterator.next();
keyIterator.remove();
if (selectionKey.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
SelectionKey scKey = sc.register(selector, SelectionKey.OP_READ);
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < 5000000; i++) {
stringBuilder.append("a");
}
ByteBuffer buffer = Charset.defaultCharset().encode(stringBuilder.toString());
int count = sc.write(buffer);
System.out.println("实际写入字节数:" + count);
if (buffer.hasRemaining()) {
scKey.interestOps(scKey.interestOps() + SelectionKey.OP_WRITE);
scKey.attach(buffer);
}
} else if (selectionKey.isWritable()) {
ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
SocketChannel sc = (SocketChannel) selectionKey.channel();
int count = sc.write(buffer);
System.out.println("实际写入字节数:" + count);
if (!buffer.hasRemaining()) {
selectionKey.attach(null);
selectionKey.interestOps(selectionKey.interestOps() - SelectionKey.OP_WRITE);
}
}
}
}
}
}
2.8、NIO网络编程案例之消息边界问题
网络传输的数据可能不是一段连续的,会出现粘包半包的情况
解决这种问题的方法有如下几种
- 一种思路是固定消息长度,数据包大小一样,服务器按预定长度读取,缺点是浪费带宽
- 另一种思路是按分隔符拆分,缺点是效率低
- TLV 格式,即 Type 类型、Length 长度、Value 数据,类型和长度已知的情况下,就可以方便获取消息大小,分配合适的 buffer,缺点是 buffer 需要提前分配,如果内容过大,则影响 server 吞吐量
- Http 1.1 是 TLV 格式
- Http 2.0 是 LTV 格式
这里模拟第二种方案,但是可能会遇到一个ByteBuffer读取不完的情况,这个时候就需要对ByteBuffer进行动态的分配大小
- 思路是首先分配一个较小的 buffer,例如 4k,如果发现数据不够,再分配 8k 的 buffer,将 4k buffer 内容拷贝至 8k buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能
- 另一种思路是用多个数组组成 buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗
Server
@SuppressWarnings("InfiniteLoopStatement")
public class NioServer {
public static void main(String[] args) throws Exception {
ServerSocketChannel ssc = ServerSocketChannel.open();
Selector selector = Selector.open();
ssc.configureBlocking(false);
ssc.register(selector, SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
while (true) {
selector.select();
Set keySet = selector.selectedKeys();
Iterator keyIterator = keySet.iterator();
while (keyIterator.hasNext()) {
SelectionKey selectionKey = keyIterator.next();
keyIterator.remove();
if (selectionKey.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(16));
} else if (selectionKey.isReadable()) {
try {
SocketChannel sc = (SocketChannel) selectionKey.channel();
ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
int count = sc.read(buffer);
if (count == -1) {
selectionKey.cancel();
} else {
System.out.println("读取了" + count);
split(buffer);
if (buffer.position() == buffer.limit()) {
ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
buffer.flip();
newBuffer.put(buffer);
selectionKey.attach(newBuffer);
}
}
} catch (Exception e) {
e.printStackTrace();
selectionKey.cancel(); // 因为客户端断开了,因此需要将 key 取消(从 selector 的 keys 集合中真正删除 key)
}
}
}
}
}
private static void split(ByteBuffer source) {
source.flip();
for (int i = 0; i < source.limit(); i++) {
// 找到一条完整消息
if (source.get(i) == 'n') {
int length = i + 1 - source.position();
// 把这条完整消息存入新的 ByteBuffer
ByteBuffer target = ByteBuffer.allocate(length);
// 从 source 读,向 target 写
for (int j = 0; j < length; j++) {
target.put(source.get());
}
}
}
source.compact();
}
}
Client
@SuppressWarnings("InfiniteLoopStatement")
public class NioClient {
public static void main(String[] args) throws Exception {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
sc.configureBlocking(false);
sc.write(Charset.defaultCharset().encode("0123n456789abcdef"));
sc.write(Charset.defaultCharset().encode("0123456789abcdef3333n"));
}
}
2.9、NIO网络编程案例之多线程优化
现在都是多核 cpu,设计时要充分考虑别让 cpu 的力量被白白浪费
所以这里设计分两组选择器
- 单线程配一个选择器,专门处理 accept 事件
- 创建 cpu 核心数的线程,每个线程配一个选择器,轮流处理 read 事件
Server
public static void main(String[] args) throws Exception {
Thread.currentThread().setName("Boss-Thread");
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()];
for (int i = 0; i < workers.length; i++) {
workers[i] = new Worker("worker-" + i);
}
AtomicInteger index = new AtomicInteger();
while (true) {
selector.select();
Set keySet = selector.selectedKeys();
Iterator keyIterator = keySet.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
keyIterator.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
workers[index.getAndIncrement() % workers.length].register(sc); // boss 调用 初始化 selector , 启动 worker-0
}
}
}
}
static class Worker implements Runnable {
private Selector selector;
private final String name;
private Boolean start = false;
private final ConcurrentlinkedQueue queue = new ConcurrentlinkedQueue<>();
public Worker(String name) {
this.name = name;
}
public void register(SocketChannel sc) throws Exception {
if (!start) {
selector = Selector.open();
Thread thread = new Thread(this, name);
thread.start();
start = true;
}
queue.add(() -> {
try {
sc.register(selector, SelectionKey.OP_READ);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
});
selector.wakeup();
}
@Override
public void run() {
while (true) {
try {
selector.select();
Runnable work = queue.poll();
if (work != null) {
work.run();
}
Set keySet = selector.selectedKeys();
Iterator keyIterator = keySet.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
keyIterator.remove();
if (key.isReadable()) {
try {
ByteBuffer buffer = ByteBuffer.allocate(128);
SocketChannel channel = (SocketChannel) key.channel();
int count = channel.read(buffer);
if (count == -1) {
key.cancel();
}
System.out.println(Thread.currentThread().getName() + "读取" + count);
buffer.flip();
} catch (Exception e) {
e.printStackTrace();
key.cancel();
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
3、如何理解同步异步,阻塞非阻塞?Client无关
- 同步:在进行读写 *** 作时,线程需要等待结果,还是相当于闲置
- 异步:在进行读写 *** 作时,线程不必等待结果,而是将来由 *** 作系统来通过回调方式由另外的线程来获得结果
- 阻塞:线程在获取到结果之前一直等待,无法做其他的事情
- 非阻塞:线程在获取结果的时候,如果结果没有收到,则跳过继续执行后面的事情,等到结果送到后,还是由当前线程去完成
同步阻塞
- 一个线程执行的时候直接停下来等待结果
同步非阻塞
- 一个线程执行的时候,如果结果没有获取到,直接跳过继续执行后面的事情,也就是不再等待,比如写个while循环去read某一个东西,当发现没有read的内容时,直接执行后面的事情,下次循环再次来read,直到read到希望的数据
异步非阻塞
- 一个线程执行的时候,需要read某个内容时,将这个创建一个回调方法发送给系统内核空间,自己继续干其他的事情,等到系统内核空间拿到了目标数据,再用另一个线程调用回调方法将数据发送到用户程序空间,与原始的线程没有关系
多路复用
- 单线程可以配置Selector完成对多个Channel可读可写时间的监控便称之为多路复用
传统的 IO 将一个文件通过 socket 写出
File f = new File("helloword/data.txt");
RandomAccessFile file = new RandomAccessFile(file, "r");
byte[] buf = new byte[(int)f.length()];
file.read(buf);
Socket socket = ...;
socket.getOutputStream().write(buf);
内部工作流程是这样的:
-
java 本身并不具备 IO 读写能力,因此 read 方法调用后,要从 java 程序的用户态切换至内核态,去调用 *** 作系统(Kernel)的读能力,将数据读入内核缓冲区。这期间用户线程阻塞, *** 作系统使用 DMA(Direct Memory Access)来实现文件读,其间也不会使用 cpu
DMA 也可以理解为硬件单元,用来解放 cpu 完成文件 IO,不利用 cpu 计算,减少 cpu 缓存伪共享
-
从内核态切换回用户态,将数据从内核缓冲区读入用户缓冲区(即 byte[] buf),这期间 cpu 会参与拷贝,无法利用 DMA
-
调用 write 方法,这时将数据从用户缓冲区(byte[] buf)写入 socket 缓冲区,cpu 会参与拷贝
-
接下来要向网卡写数据,这项能力 java 又不具备,因此又得从用户态切换至内核态,调用 *** 作系统的写能力,使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 cpu
可以看到中间环节较多,java 的 IO 实际不是物理设备级别的读写,而是缓存的复制,底层的真正读写是 *** 作系统来完成的
- 用户态与内核态的切换发生了 3 次,这个 *** 作比较重量级
- 数据拷贝了共 4 次
通过 DirectByteBuf
- ByteBuffer.allocate(10) HeapByteBuffer 使用的还是 java 内存
- ByteBuffer.allocateDirect(10) DirectByteBuffer 使用的是 *** 作系统内存
大部分步骤与优化前相同,不再赘述。唯有一点:java 可以使用 DirectByteBuf 将堆外内存映射到 jvm 内存中来直接访问使用
- 这块内存不受 jvm 垃圾回收的影响,因此内存地址固定,有助于 IO 读写
- java 中的 DirectByteBuf 对象仅维护了此内存的虚引用,内存回收分成两步
- DirectByteBuf 对象被垃圾回收,将虚引用加入引用队列
- 通过专门线程访问引用队列,根据虚引用释放堆外内存
- 减少了一次数据拷贝,用户态与内核态的切换次数没有减少
底层采用了 linux 2.1 后提供的 sendFile 方法,java 中对应着两个 channel 调用 transferTo/transferFrom 方法拷贝数据
- java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 cpu
- 数据从内核缓冲区传输到 socket 缓冲区,cpu 会参与拷贝
- 最后使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 cpu
可以看到
- 只发生了一次用户态与内核态的切换
- 数据拷贝了 3 次
- java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 cpu
- 只会将一些 offset 和 length 信息拷入 socket 缓冲区,几乎无消耗
- 使用 DMA 将 内核缓冲区的数据写入网卡,不会使用 cpu
整个过程仅只发生了一次用户态与内核态的切换,数据拷贝了 2 次。所谓的【零拷贝】,并不是真正无拷贝,而是在不会拷贝重复数据到 jvm 内存中,零拷贝的优点有
- 更少的用户态与内核态的切换
- 不利用 cpu 计算,减少 cpu 缓存伪共享
- 零拷贝适合小文件传输,拷贝次数比较频繁的
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)