1. BIO

1.1 BioServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class Server {
public static void main(String[] args) {
try (ServerSocket serverSocket = new ServerSocket(12345)) {
System.out.println("服务器已启动,等待客户端连接...");
try (Socket socket = serverSocket.accept()) {
System.out.println("客户端已连接:" + socket.getInetAddress());

// 获取输入流,用于接收客户端消息
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
// 获取输出流,用于向客户端发送消息
PrintWriter out = new PrintWriter(socket.getOutputStream(), true);

String inputLine;
while ((inputLine = in.readLine()) != null) {
System.out.println("客户端消息:" + inputLine);
// 向客户端回显消息
out.println("服务器收到:" + inputLine);
if ("bye".equalsIgnoreCase(inputLine)) {
break;
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

1.2 MultiThreadedServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class MultiThreadedServer {
public static void main(String[] args) {
try (ServerSocket serverSocket = new ServerSocket(12345)) {
System.out.println("服务器已启动,等待客户端连接...");
while (true) {
Socket socket = serverSocket.accept();
System.out.println("客户端已连接:" + socket.getInetAddress());
new ClientHandler(socket).start();
}
} catch (IOException e) {
e.printStackTrace();
}
}

static class ClientHandler extends Thread {
private final Socket socket;

public ClientHandler(Socket socket) {
this.socket = socket;
}

@Override
public void run() {
try (BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintWriter out = new PrintWriter(socket.getOutputStream(), true)) {
String inputLine;
while ((inputLine = in.readLine()) != null) {
System.out.println("客户端消息:" + inputLine);
out.println("服务器收到:" + inputLine);
if ("bye".equalsIgnoreCase(inputLine)) {
break;
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}

1.3 BioClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class Client {
public static void main(String[] args) {
try (Socket socket = new Socket("localhost", 12345);
PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
Scanner scanner = new Scanner(System.in)) {

System.out.println("已连接到服务器,输入消息开始通信,输入 'bye' 结束。");
String userInput;
while (true) {
userInput = scanner.nextLine();
out.println(userInput);
if ("bye".equalsIgnoreCase(userInput)) {
break;
}
String response = in.readLine();
System.out.println(response);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

2. NIO

2.1 NioServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public class NIOServer {
private static final int PORT = 8888;
private static final int BUFFER_SIZE = 1024;

public static void main(String[] args) {
try (Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) {

serverSocketChannel.bind(new InetSocketAddress(PORT));
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

System.out.println("NIO Server is listening on port " + PORT);

while (true) {
selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();

if (key.isAcceptable()) {
handleAccept(key, selector);
} else if (key.isReadable()) {
handleRead(key);
}

keyIterator.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}

private static void handleAccept(SelectionKey key, Selector selector) throws IOException {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println("New client connected: " + socketChannel.getRemoteAddress());
}

private static void handleRead(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
int bytesRead = socketChannel.read(buffer);

if (bytesRead > 0) {
buffer.flip();
byte[] data = new byte[buffer.remaining()];
buffer.get(data);
String message = new String(data);
System.out.println("Received from client: " + message);

// Echo the message back to the client
ByteBuffer responseBuffer = ByteBuffer.wrap(("Server received: " + message).getBytes());
socketChannel.write(responseBuffer);
} else if (bytesRead == -1) {
// Client disconnected
System.out.println("Client disconnected: " + socketChannel.getRemoteAddress());
socketChannel.close();
}
}
}

2.2 NioClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class NIOClient {
private static final String SERVER_HOST = "localhost";
private static final int SERVER_PORT = 8888;
private static final int BUFFER_SIZE = 1024;

public static void main(String[] args) {
try (SocketChannel socketChannel = SocketChannel.open();
Scanner scanner = new Scanner(System.in)) {

socketChannel.connect(new InetSocketAddress(SERVER_HOST, SERVER_PORT));
socketChannel.configureBlocking(false);

System.out.println("Connected to server: " + SERVER_HOST + ":" + SERVER_PORT);

while (true) {
System.out.print("Enter message to send (or 'quit' to exit): ");
String message = scanner.nextLine();

if ("quit".equalsIgnoreCase(message)) {
break;
}

ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());
socketChannel.write(buffer);

buffer.clear();
int bytesRead = socketChannel.read(buffer);
if (bytesRead > 0) {
buffer.flip();
byte[] data = new byte[buffer.remaining()];
buffer.get(data);
String response = new String(data);
System.out.println("Received from server: " + response);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

3. AIO

3.1 AioServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public class Server {
private static final int PORT = 9999;

public static void main(String[] args) {
try (AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open()) {
serverSocketChannel.bind(new InetSocketAddress(PORT));
System.out.println("AIO Server is listening on port " + PORT);

serverSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
@Override
public void completed(AsynchronousSocketChannel socketChannel, Object attachment) {
serverSocketChannel.accept(null, this);
handleConnection(socketChannel);
}

@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
}
});

// 保持主线程存活
Thread.sleep(Long.MAX_VALUE);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}

private static void handleConnection(AsynchronousSocketChannel socketChannel) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
socketChannel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
if (result > 0) {
attachment.flip();
byte[] data = new byte[attachment.remaining()];
attachment.get(data);
String message = new String(data);
System.out.println("Received from client: " + message);

ByteBuffer responseBuffer = ByteBuffer.wrap(("Server received: " + message).getBytes());
socketChannel.write(responseBuffer, responseBuffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
try {
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
}
});
}
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
}

3.2 AioClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public class Client {
private static final String SERVER_HOST = "localhost";
private static final int SERVER_PORT = 9999;

public static void main(String[] args) {
try (AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open()) {
socketChannel.connect(new InetSocketAddress(SERVER_HOST, SERVER_PORT), null, new CompletionHandler<Void, Object>() {
@Override
public void completed(Void result, Object attachment) {
Scanner scanner = new Scanner(System.in);
System.out.print("Enter message to send: ");
String message = scanner.nextLine();

ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());
socketChannel.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.clear();
socketChannel.read(attachment, attachment, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
if (result > 0) {
attachment.flip();
byte[] data = new byte[attachment.remaining()];
attachment.get(data);
String response = new String(data);
System.out.println("Received from server: " + response);
}
try {
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}

@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
}
});

// 保持主线程存活
Thread.sleep(Long.MAX_VALUE);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
}

4. Tomcat

4.1 Request&Response

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class GPRequest {

private String method;
private String url;

public GPRequest(InputStream in){
try {
//拿到HTTP协议内容
String content = "";
byte[] buff = new byte[1024];
int len = 0;
if ((len = in.read(buff)) > 0) {
content = new String(buff,0,len);
}

String line = content.split("\\n")[0];
String [] arr = line.split("\\s");

this.method = arr[0];
this.url = arr[1].split("\\?")[0];
System.out.println(content);
}catch (Exception e){
e.printStackTrace();
}
}

public String getUrl() {
return url;
}

public String getMethod() {
return method;
}
}

public class GPResponse {
private OutputStream out;
public GPResponse(OutputStream out){
this.out = out;
}

public void write(String s) throws Exception {
//用的是HTTP协议,输出也要遵循HTTP协议
//给到一个状态码 200
StringBuilder sb = new StringBuilder();
sb.append("HTTP/1.1 200 OK\n")
.append("Content-Type: text/html;\n")
.append("\r\n")
.append(s);
out.write(sb.toString().getBytes());
}
}

4.2 Servlet

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public abstract class GPServlet {
public void service(GPRequest request, GPResponse response) throws Exception{
if("GET".equalsIgnoreCase(request.getMethod())){
doGet(request,response);
}else{
doPost(request,response);
}
}

public abstract void doGet(GPRequest request, GPResponse response) throws Exception;
public abstract void doPost(GPRequest request, GPResponse response) throws Exception;
}

public class FirstServlet extends GPServlet {

@Override
public void doGet(GPRequest request, GPResponse response) throws Exception {
this.doPost(request, response);
}

@Override
public void doPost(GPRequest request, GPResponse response) throws Exception {
response.write("This is First Serlvet");
}

}

public class SecondServlet extends GPServlet {

@Override
public void doGet(GPRequest request, GPResponse response) throws Exception {
this.doPost(request, response);
}

@Override
public void doPost(GPRequest request, GPResponse response) throws Exception {
response.write("This is Second Serlvet");
}

}

4.3 GPTomcat

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
public class GPTomcat {

public static void main(String[] args) {
new GPTomcat().start();
}

private int port = 8080;
private ServerSocket server;
private Map<String, GPServlet> servletMapping = new HashMap<String, GPServlet>();

private Properties webxml = new Properties();
//J2EE标准
//Servlet
//Request
//Response


//1、配置好启动端口,默认8080 ServerSocket IP:localhost
//2、配置web.xml 自己写的Servlet继承HttpServlet
// servlet-name
// servlet-class
// url-pattern
//3、读取配置,url-pattern 和 Servlet建立一个映射关系
// Map servletMapping

public void start(){

//1、加载配置文件,初始化ServeltMapping
init();

try {
server = new ServerSocket(this.port);

System.out.println("GP Tomcat 已启动,监听的端口是:" + this.port);

//2、等待用户请求,用一个死循环来等待用户请求
while (true) {
Socket client = server.accept();
//4、HTTP请求,发送的数据就是字符串,有规律的字符串(HTTP协议)
process(client);

}

} catch (Exception e) {
e.printStackTrace();
}
}

private void init(){

//加载web.xml文件,同时初始化 ServletMapping对象
try{
String WEB_INF = this.getClass().getResource("/").getPath();
FileInputStream fis = new FileInputStream(WEB_INF + "web.properties");

webxml.load(fis);

for (Object k : webxml.keySet()) {

String key = k.toString();
if(key.endsWith(".url")){
String servletName = key.replaceAll("\\.url$", "");
String url = webxml.getProperty(key);
String className = webxml.getProperty(servletName + ".className");
//单实例,多线程
GPServlet obj = (GPServlet)Class.forName(className).newInstance();
servletMapping.put(url, obj);
}

}
}catch(Exception e){
e.printStackTrace();
}

}

private void process(Socket client) throws Exception {

InputStream is = client.getInputStream();
OutputStream os = client.getOutputStream();

//7、Request(InputStrean)/Response(OutputStrean)
GPRequest request = new GPRequest(is);
GPResponse response = new GPResponse(os);

//5、从协议内容中拿到URL,把相应的Servlet用反射进行实例化
String url = request.getUrl();

if(servletMapping.containsKey(url)){
//6、调用实例化对象的service()方法,执行具体的逻辑doGet/doPost方法
servletMapping.get(url).service(request,response);
}else{
response.write("404 - Not Found");
}


os.flush();
os.close();

is.close();
client.close();
}

}

5. Netty

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
/**
* @Description curl http://localhost:8088
* @Author HSF
* @Date 2023/3/21 8:48
*/
public class BootstrapServer {

public static void main(String[] args) throws Exception {
// 启动netty服务端
new BootstrapServer().start(8088);

}

/**
* 服务器启动
* 1. 创建服务端Channel,本质是创建JDK底层原生的Channel,并初始化几个重要的属性,包括id、unsafe、pipeline等
* 2. 初始化服务单Channel,设置Socket参数以及用户自定义属性,并添加两个特殊的处理器ChannelInitializer和ServerBootstrapAcceptor
* 3. 注册服务端Channel,调用JDK底层将Channel注册到Selector上
* 4. 调用JDK底层进行端口绑定,并触发ChannelActive事件,把OP_ACCEPT事件注册到Channel的事件集合中
*
* 服务端如何处理客户端新建连接?
* 1. boss nioEventLoop线程轮询客户端新建连接OP_ACCEPT事件
* 2. 构造netty客户端 nioSocketChannel
* 3. 注册netty客户端 nioSocketChannel到worker工作线程中
* 4. 注册OP_READ 事件到nioSocketChannel的事件集合
*
* NioEventLoop无限循环三件事
* 1. 轮询IO事件:轮询selector选择器中已注册的所有Channel的IO事件
* 2. 处理IO事件:处理已经准备就绪的IO事件
* 3. 处理异步任务队列:处理任务队列中的非IO任务
*
* @param port
* @throws Exception
*/
private void start(int port) throws Exception {

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast("codec", new HttpServerCodec()) // HTTP 编解码
.addLast("compressor", new HttpContentCompressor()) // HttpContent 压缩
.addLast("aggregator", new HttpObjectAggregator(65536)) // HTTP 消息聚合
.addLast("handler", new HttpServerHandler()); // 自定义业务逻辑处理器
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true);

ChannelFuture f = b
.bind() // 端口绑定和启动
// initAndRegister 负责channel初始化和注册 1. 创建 2. 初始化 3. 注册
// doBind0 绑定端口号 1. 调用JDK底层进行端口绑定 2. 绑定成功后并触发channelActive事件
.sync(); // 阻塞等待服务器启动完成
System.out.println("Http Server started, Listening on " + port);
f.channel().closeFuture().sync();

} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}

class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {

@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) {
String content = String.format(
"Receive http request, uri: %s, method: %s, content: %s%n",
msg.uri(),
msg.method(),
msg.content().toString(CharsetUtil.UTF_8)
);
FullHttpResponse response = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1,
HttpResponseStatus.OK,
Unpooled.wrappedBuffer(content.getBytes())
);
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}

}
}

6. RPC

6.1 RPCServer

6.1.1 registry

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class RpcRegistry {

private int port;
public RpcRegistry(int port){
this.port = port;
}

public static void main(String[] args) throws Exception {
new RpcRegistry(8080).start();
}

public void start(){
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {

@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//自定义协议解码器
/** 入参有5个,分别解释如下
maxFrameLength:框架的最大长度。如果帧的长度大于此值,则将抛出TooLongFrameException。
lengthFieldOffset:长度字段的偏移量:即对应的长度字段在整个消息数据中得位置
lengthFieldLength:长度字段的长度。如:长度字段是int型表示,那么这个值就是4(long型就是8)
lengthAdjustment:要添加到长度字段值的补偿值
initialBytesToStrip:从解码帧中去除的第一个字节数
*/
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
//自定义协议编码器
pipeline.addLast(new LengthFieldPrepender(4));
//对象参数类型编码器
pipeline.addLast("encoder",new ObjectEncoder());
//对象参数类型解码器
pipeline.addLast("decoder",new ObjectDecoder(Integer.MAX_VALUE,ClassResolvers.cacheDisabled(null)));
pipeline.addLast(new RegistryHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture future = b.bind(port).sync();
System.out.println("GP RPC Registry start listen at " + port );
future.channel().closeFuture().sync();
} catch (Exception e) {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public class RegistryHandler  extends ChannelInboundHandlerAdapter {

//用保存所有可用的服务
public static ConcurrentHashMap<String, Object> registryMap = new ConcurrentHashMap<String,Object>();

//保存所有相关的服务类
private List<String> classNames = new ArrayList<String>();

public RegistryHandler(){
//完成递归扫描
scannerClass("com.smy.io.netty.rpc.provider");
doRegister();
}


@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Object result = new Object();
InvokerProtocol request = (InvokerProtocol)msg;

//当客户端建立连接时,需要从自定义协议中获取信息,拿到具体的服务和实参
//使用反射调用
if(registryMap.containsKey(request.getClassName())){
Object clazz = registryMap.get(request.getClassName());
Method method = clazz.getClass().getMethod(request.getMethodName(), request.getParames());
result = method.invoke(clazz, request.getValues());
}
ctx.write(result);
ctx.flush();
ctx.close();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}


/*
* 递归扫描
*/
private void scannerClass(String packageName){
URL url = this.getClass().getClassLoader().getResource(packageName.replaceAll("\\.", "/"));
File dir = new File(url.getFile());
for (File file : dir.listFiles()) {
//如果是一个文件夹,继续递归
if(file.isDirectory()){
scannerClass(packageName + "." + file.getName());
}else{
classNames.add(packageName + "." + file.getName().replace(".class", "").trim());
}
}
}

/**
* 完成注册
*/
private void doRegister(){
if(classNames.size() == 0){ return; }
for (String className : classNames) {
try {
Class<?> clazz = Class.forName(className);
Class<?> i = clazz.getInterfaces()[0];
registryMap.put(i.getName(), clazz.newInstance());
} catch (Exception e) {
e.printStackTrace();
}
}
}

}

6.1.2 provider

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class RpcServiceImpl implements IRpcService {

public int add(int a, int b) {
return a + b;
}

public int sub(int a, int b) {
return a - b;
}

public int mult(int a, int b) {
return a * b;
}

public int div(int a, int b) {
return a / b;
}

}


public class RpcHelloServiceImpl implements IRpcHelloService {

public String hello(String name) {
return "Hello " + name + "!";
}

}

6.1.3 protocol

1
2
3
4
5
6
7
8
9
@Data
public class InvokerProtocol implements Serializable {

private String className;//类名
private String methodName;//函数名称
private Class<?>[] parames;//形参列表
private Object[] values;//实参列表

}

6.2 RPCClient

6.2.1 api

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public interface IRpcService {

/** 加 */
int add(int a,int b);

/** 减 */
int sub(int a,int b);

/** 乘 */
int mult(int a,int b);

/** 除 */
int div(int a,int b);

}

public interface IRpcHelloService {
String hello(String name);
}

6.2.2 consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
public class RpcConsumer {

public static void main(String [] args){
IRpcHelloService rpcHello = RpcProxy.create(IRpcHelloService.class);

System.out.println(rpcHello.hello("Tom老师"));

IRpcService service = RpcProxy.create(IRpcService.class);

System.out.println("8 + 2 = " + service.add(8, 2));
System.out.println("8 - 2 = " + service.sub(8, 2));
System.out.println("8 * 2 = " + service.mult(8, 2));
System.out.println("8 / 2 = " + service.div(8, 2));
}

}

public class RpcProxy {

public static <T> T create(Class<?> clazz){
//clazz传进来本身就是interface
MethodProxy proxy = new MethodProxy(clazz);
Class<?> [] interfaces = clazz.isInterface() ?
new Class[]{clazz} :
clazz.getInterfaces();
T result = (T) Proxy.newProxyInstance(clazz.getClassLoader(),interfaces,proxy);
return result;
}

private static class MethodProxy implements InvocationHandler {
private Class<?> clazz;
public MethodProxy(Class<?> clazz){
this.clazz = clazz;
}

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//如果传进来是一个已实现的具体类(本次演示略过此逻辑)
if (Object.class.equals(method.getDeclaringClass())) {
try {
return method.invoke(this, args);
} catch (Throwable t) {
t.printStackTrace();
}
//如果传进来的是一个接口(核心)
} else {
return rpcInvoke(proxy,method, args);
}
return null;
}


/**
* 实现接口的核心方法
* @param method
* @param args
* @return
*/
public Object rpcInvoke(Object proxy,Method method,Object[] args){

//传输协议封装
InvokerProtocol msg = new InvokerProtocol();
msg.setClassName(this.clazz.getName());
msg.setMethodName(method.getName());
msg.setValues(args);
msg.setParames(method.getParameterTypes());

final RpcProxyHandler consumerHandler = new RpcProxyHandler();
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//自定义协议解码器
/** 入参有5个,分别解释如下
maxFrameLength:框架的最大长度。如果帧的长度大于此值,则将抛出TooLongFrameException。
lengthFieldOffset:长度字段的偏移量:即对应的长度字段在整个消息数据中得位置
lengthFieldLength:长度字段的长度:如:长度字段是int型表示,那么这个值就是4(long型就是8)
lengthAdjustment:要添加到长度字段值的补偿值
initialBytesToStrip:从解码帧中去除的第一个字节数
*/
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
//自定义协议编码器
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
//对象参数类型编码器
pipeline.addLast("encoder", new ObjectEncoder());
//对象参数类型解码器
pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
pipeline.addLast("handler",consumerHandler);
}
});

ChannelFuture future = b.connect("localhost", 8080).sync();
future.channel().writeAndFlush(msg).sync();
future.channel().closeFuture().sync();
} catch(Exception e){
e.printStackTrace();
}finally {
group.shutdownGracefully();
}
return consumerHandler.getResponse();
}

}
}

public class RpcProxyHandler extends ChannelInboundHandlerAdapter {

private Object response;

public Object getResponse() {
return response;
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
response = msg;
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("client exception is general");
}
}

7. Echo

7.1 Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println(ctx.channel().remoteAddress() + " -> Server :" + msg);

// 写消息到管道
ctx.write(msg);// 写消息
ctx.flush(); // 冲刷消息

// 上面两个方法等同于 ctx.writeAndFlush(msg);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {

// 当出现异常就关闭连接
cause.printStackTrace();
ctx.close();
}
}

7.1.1 EchoServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class EchoServer {

public static int DEFAULT_PORT = 7;

public static void main(String[] args) throws Exception {
int port;

try {
port = Integer.parseInt(args[0]);
} catch (RuntimeException ex) {
port = DEFAULT_PORT;
}

// 多线程事件循环器
EventLoopGroup bossGroup = new NioEventLoopGroup(); // boss
EventLoopGroup workerGroup = new NioEventLoopGroup(); // worker

try {
// 启动NIO服务的引导程序类
ServerBootstrap b = new ServerBootstrap();

b.group(bossGroup, workerGroup) // 设置EventLoopGroup
.channel(NioServerSocketChannel.class) // 指明新的Channel的类型
.childHandler(new EchoServerHandler()) // 指定ChannelHandler
.option(ChannelOption.SO_BACKLOG, 128) // 设置的ServerChannel的一些选项
.childOption(ChannelOption.SO_KEEPALIVE, true); // 设置的ServerChannel的子Channel的选项

// 绑定端口,开始接收进来的连接
ChannelFuture f = b.bind(port).sync();

System.out.println("EchoServer已启动,端口:" + port);

// 等待服务器 socket 关闭 。
// 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。
f.channel().closeFuture().sync();
} finally {

// 优雅的关闭
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}

}
}

7.1.2 EpollEchoServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class EpollEchoServer {

public static int DEFAULT_PORT = 7;

public static void main(String[] args) throws Exception {
int port;

try {
port = Integer.parseInt(args[0]);
} catch (RuntimeException ex) {
port = DEFAULT_PORT;
}

// 多线程事件循环器
EventLoopGroup bossGroup = new EpollEventLoopGroup(); // boss
EventLoopGroup workerGroup = new EpollEventLoopGroup(); // worker

try {
// 启动NIO服务的引导程序类
ServerBootstrap b = new ServerBootstrap();

b.group(bossGroup, workerGroup) // 设置EventLoopGroup
.channel(EpollServerSocketChannel.class) // 指明新的Channel的类型
.childHandler(new EchoServerHandler()) // 指定ChannelHandler
.option(ChannelOption.SO_BACKLOG, 128) // 设置的ServerChannel的一些选项
.childOption(ChannelOption.SO_KEEPALIVE, true); // 设置的ServerChannel的子Channel的选项

// 绑定端口,开始接收进来的连接
ChannelFuture f = b.bind(port).sync();

System.out.println("EchoServer已启动,端口:" + port);

// 等待服务器 socket 关闭 。
// 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。
f.channel().closeFuture().sync();
} finally {

// 优雅的关闭
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}

}
}

7.1.3 LocalEchoServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class LocalEchoServer {

public static int DEFAULT_PORT = 7;

public static void main(String[] args) throws Exception {
int port;

try {
port = Integer.parseInt(args[0]);
} catch (RuntimeException ex) {
port = DEFAULT_PORT;
}

// 多线程事件循环器
EventLoopGroup bossGroup = new LocalEventLoopGroup(); // boss
EventLoopGroup workerGroup = new LocalEventLoopGroup(); // worker

try {
// 启动NIO服务的引导程序类
ServerBootstrap b = new ServerBootstrap();

b.group(bossGroup, workerGroup) // 设置EventLoopGroup
.channel(LocalServerChannel.class) // 指明新的Channel的类型
.childHandler(new EchoServerHandler()) // 指定ChannelHandler
.option(ChannelOption.SO_BACKLOG, 128) // 设置的ServerChannel的一些选项
.childOption(ChannelOption.SO_KEEPALIVE, true); // 设置的ServerChannel的子Channel的选项

// 绑定端口,开始接收进来的连接
ChannelFuture f = b.bind(port).sync();

System.out.println("EchoServer已启动,端口:" + port);

// 等待服务器 socket 关闭 。
// 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。
f.channel().closeFuture().sync();
} finally {

// 优雅的关闭
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}

}
}

7.1.4 OioEchoServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class OioEchoServer {

public static int DEFAULT_PORT = 7;

public static void main(String[] args) throws Exception {
int port;

try {
port = Integer.parseInt(args[0]);
} catch (RuntimeException ex) {
port = DEFAULT_PORT;
}

// 多线程事件循环器
EventLoopGroup bossGroup = new OioEventLoopGroup(); // boss
EventLoopGroup workerGroup = new OioEventLoopGroup(); // worker

try {
// 启动NIO服务的引导程序类
ServerBootstrap b = new ServerBootstrap();

b.group(bossGroup, workerGroup) // 设置EventLoopGroup
.channel(OioServerSocketChannel.class) // 指明新的Channel的类型
.childHandler(new EchoServerHandler()) // 指定ChannelHandler
.option(ChannelOption.SO_BACKLOG, 128) // 设置的ServerChannel的一些选项
.childOption(ChannelOption.SO_KEEPALIVE, true); // 设置的ServerChannel的子Channel的选项

// 绑定端口,开始接收进来的连接
ChannelFuture f = b.bind(port).sync();

System.out.println("EchoServer已启动,端口:" + port);

// 等待服务器 socket 关闭 。
// 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。
f.channel().closeFuture().sync();
} finally {

// 优雅的关闭
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}

}
}

7.2 Client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public class EchoClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {

// 从管道读消息
ByteBuf buf = (ByteBuf) msg; // 转为ByteBuf类型
String m = buf.toString(CharsetUtil.UTF_8); // 转为字符串
System.out.println( "echo :" + m);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {

// 当出现异常就关闭连接
cause.printStackTrace();
ctx.close();
}
}

public final class EchoClient {

public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("用法: java EchoClient <host name> <port number>");
System.exit(1);
}

String hostName = args[0];
int portNumber = Integer.parseInt(args[1]);

// 配置客户端
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new EchoClientHandler());

// 连接到服务器
ChannelFuture f = b.connect(hostName, portNumber).sync();

Channel channel = f.channel();
ByteBuffer writeBuffer = ByteBuffer.allocate(32);
try (BufferedReader stdIn = new BufferedReader(new InputStreamReader(System.in))) {
String userInput;
while ((userInput = stdIn.readLine()) != null) {
writeBuffer.put(userInput.getBytes());
writeBuffer.flip();
writeBuffer.rewind();

// 转为ByteBuf
ByteBuf buf = Unpooled.copiedBuffer(writeBuffer);

// 写消息到管道
channel.writeAndFlush(buf);

// 清理缓冲区
writeBuffer.clear();
}
} catch (UnknownHostException e) {
System.err.println("不明主机,主机名为: " + hostName);
System.exit(1);
} catch (IOException e) {
System.err.println("不能从主机中获取I/O,主机名为:" + hostName);
System.exit(1);
}
} finally {

// 优雅的关闭
group.shutdownGracefully();
}
}
}

本站由 卡卡龙 使用 Stellar 1.29.1主题创建

本站访问量 次. 本文阅读量 次.