
服务端代码:
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.concurrent.ConcurrentlinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;
import static com.netty.c1.ByteBufferUtil.debugAll;
@Slf4j
public class MultiThreadServer {
public static void main(String[] args) throws IOException {
Thread.currentThread().setName("boss");
Selector boss = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.register(boss, SelectionKey.OP_ACCEPT, null);
ssc.bind(new InetSocketAddress(8080));
// 创建固定数量的 worker 并初始化
// 注意如果代码跑在docker中,不是获取docker中的cpu数量,还是物理机的cpu数量
// 该bug在jdk10以后修复了
Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()];
for (int i = 0; i < workers.length; ++i) {
workers[i] = new Worker("worker-" + i);
}
AtomicInteger atomicInteger = new AtomicInteger();
while (true) {
boss.select();
Iterator iter = boss.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
log.debug("connected...{}", sc.getRemoteAddress());
log.debug("before register...{}", sc.getRemoteAddress());
workers[atomicInteger.getAndIncrement()% workers.length].register(sc);
log.debug("after register...{}", sc.getRemoteAddress());
}
}
}
}
static class Worker implements Runnable {
private Thread thread;
private Selector selector;
private String name;
private volatile boolean start = false; // 用来控制一个work对象只跟一个线程关联
private ConcurrentlinkedDeque queue = new ConcurrentlinkedDeque<>(); // 用一个队列构建两个线程直接信息传递的通道
public Worker(String name) {
this.name = name;
}
public void register(SocketChannel sc) throws IOException {
if(!start) {
thread = new Thread(this, name);
selector = Selector.open();
thread.start();
start = true;
}
// 向队列中添加了任务,但是任务还没有被执行
queue.add(() ->{
try {
sc.register(selector, SelectionKey.OP_READ, null);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
});
selector.wakeup(); // 为了唤醒worker线程中的select()方法
}
@Override
public void run() {
while (true) {
try {
selector.select(); // 有事件发生或者主动调用selector的wakeup方法,解除阻塞
Runnable task = queue.poll();
if (task != null) {
task.run();
}
Iterator iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel channel = (SocketChannel) key.channel();
channel.read(buffer);
buffer.flip();
debugAll(buffer);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
- boos线程处理accept事件
- worker线程处理 read 和 write事件
- 事件发生在channel上
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)