1. 网络协议

客户端和服务端之间完成数据交互,需要约定数据协议。数据协议如下图所示:

image-20240503133224881

传输内筒分为以下四个部分:

  1. 消息长度:
    总长度,四个字节存储,占用一个int类型;
  2. 序列化类型&消息头长度:
    占⽤⼀个 int 类型,第⼀个字节表示序列化类型,后⾯三个字节表示消息头⻓度;
  3. 消息头数据:
    经过序列化后的消息头数据;
  4. 消息主体数据:
    消息主体的⼆进制字节数据内容。

消息头数据序列化默认是 JSON 格式 ,示例如下:

image-20240503133554838

image-20240503133652948

⽹络协议设计的原则是便于编解码,这⾥我们温习下 TCP 粘包和拆包的知识点。
image-20240503133855508

TCP 是⾯向字节流的协议,它会将应⽤层发送的数据拆分成 TCP 报⽂段进⾏传输,发送端和接收端都会维护⼀个 buffer ,发送的数据⾸先会存⾄缓冲区 buffer ,然后通过⽹络发送给接收端的 buffer 中。

  • 粘包 如果⼀次请求发送的数据量⽐较⼩,没达到缓冲区⼤⼩,TCP 则会将多个请求合并为同⼀个请求进⾏发送 。
  • 拆包 如果⼀次请求发送的数据量⽐较⼤,超过了缓冲区⼤⼩,TCP 就会将其拆分为多次发送。

Netty 通过以下⼏种⽅式来解决粘包问题:

1、消息定⻓:FixedLengthFrameDecoder 发送的消息都是固定⻓度的,接收⽅根据固定⻓度来解析消息,这样可以有效避免粘包和拆包问题。

2、特定分隔符:DelimiterBasedFrameDecoder 在消息的末尾添加特定的分隔符,接收⽅根据分隔符来切分消息。

3、消息头⻓度:LenghtFieldBasedFrameDecode 在消息的头部添加表示消息⻓度的字段,接收⽅先读取消息头部的⻓度字段,然后根据⻓度字段的值来读取消息内容, 从⽽正确地解析出完整的消息。

RocketMQ 的解码器就是使⽤了 LenghtFieldBasedFrameDecoder.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class NettyDecoder extends LengthFieldBasedFrameDecoder {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);

private static final int FRAME_MAX_LENGTH =
Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "16777216"));

public NettyDecoder() {
super(FRAME_MAX_LENGTH, 0, 4, 0, 4);
}

@Override
public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf frame = null;
try {
frame = (ByteBuf) super.decode(ctx, in);
if (null == frame) {
return null;
}

ByteBuffer byteBuffer = frame.nioBuffer();

return RemotingCommand.decode(byteBuffer);
} catch (Exception e) {
log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
RemotingUtil.closeChannel(ctx.channel());
} finally {
if (null != frame) {
frame.release();
}
}

return null;
}
}

2. 通讯方式

客户端通信⽅式⽀持同步 sync 、异步 async 、单向 oneway 三种⽅式 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
RemotingCommand invokeSync(
final String addr,
final RemotingCommand request,
final long timeoutMillis
) throws InterruptedException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException;

void invokeAsync(
final String addr,
final RemotingCommand request,
final long timeoutMillis,
final InvokeCallback invokeCallback
) throws InterruptedException, RemotingConnectException,
RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;

void invokeOneway(
final String addr,
final RemotingCommand request,
final long timeoutMillis
) throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException,
RemotingTimeoutException, RemotingSendRequestException;

2.1 同步 sync

在同步通信中,客户端发送请求后会⼀直等待服务器响应,直到接收到响应或者超时。

这意味着:客户端发送线程在发送请求后会被阻塞,直到收到服务器的响应,然后继续执⾏发送下⼀个请求。

image-20240503134810788

同步请求的流程:

1、客户端连接服务端,创建 channel ;

2、客户端创建 responseFutrue 对象 ,主要由四个部分组成:响应结果、请求编号、回调函数、CountDownLatch。 然后将 responseFutrue 对象加⼊到本地缓存 响应表 reponseTable ⾥ 。

1
2
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
this.responseTable.put(opaque, responseFuture);

3、客户端将请求发送到服务端;

4、服务端解析出请求命令;

 1. 请求命令中包含命令类型、请求编号,服务端根据命令类型选择处理器 ,执⾏请求命令; 
 2.  服务端将响应数据返回给客户端; 
 3.  客户端将响应结果填充到响应表 reponseTable ⾥,同时因为是同步命令,并调⽤ countDownLatch 的 countDown ⽅法 , 这样发送消息线程就不再阻塞(实现同步请求的精髓)。

2.2 异步async

异步通信中,客户端发送请求后不会等待服务器的响应,⽽是继续执⾏后续代码。客户端会注册⼀个回调函数或者监听 器,⽤于处理服务器响应。当服务器响应返回时,会触发回调函数的执⾏。

image-20240503135851728

异步请求的流程 :

1、客户端连接服务端,创建 channel ;

2、通过信号量 semaphoreAsync 限制正在进⾏的异步请求的最⼤数量 ; boolean

1
acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); 

3、客户端创建 responseFutrue 对象 ,主要由四个部分组成:响应结果、请求编号、回调函数、CountDownLatch。 然后将 responseFutrue 对象加⼊到本地缓存 响应表 reponseTable ⾥ 。

1
2
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
this.responseTable.put(opaque, responseFuture);

4、客户端将请求发送到服务端,客户端异步⽅法结束 。

5、服务端解析出请求命令 ;

  1. 请求命令中包含命令类型、请求编号,服务端根据命令类型选择处理器 ,执⾏请求命令;

  2. 服务端将响应数据返回给客户端;

6、通讯框架收到服务端的响应数据后,通过回调线程执⾏回调函数。

2.3 单向 oneway

单向通信发起调⽤后,不关⼼调⽤结果,不做超时控制,只要请求已经发出,就完成本次调⽤。 通常⽤于可以重试,或者定时通知类的场景,调⽤过程是有可能因为⽹络问题,机器故障等原因,导致请求失败。业务 场景需要能接受这样的异常场景,才可以使⽤。

image-20240503140327004

需要注意的是,单向通信不能保证请求⼀定能够成功发送到服务器,也⽆法保证服务器是否正确地接收到了请求。

oneway 请求的流程 :

1、客户端连接服务端,创建 channel ;

2、通过信号量 semaphoreOneway 限制正在进⾏的 oneway 请求的最⼤数量 ;

1
boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); 

3、客户端将请求发送到服务端,客户端 oneway 请求⽅法结束 。

4、服务端解析出请求命令 , 请求命令中包含命令类型、请求编号,服务端根据命令类型选择处理器 ,执⾏请求命令 , 并 不会将响应数据返回给客户端 ;

image-20240503140612947

3. Reactor多线程设计

image-20240503135724482

⼀个 Reactor 主线程 ( eventLoopGroupBoss )责监听 TCP⽹络连接请求,建⽴好连接,创建 SocketChannel , 并注 册到 selector 上。

RocketMQ 源码会⾃动根据 OS 的类型选择 NIO 和 Epoll ,也可以通过参数配置 , 然后监听真正的⽹络数据。

拿到⽹络数据后,再丢给 Worker 线程池( eventLoopGroupSelector ),再真正执⾏业务逻辑之前需要进⾏ SSL 验 证、编解码、空闲检查、⽹络连接管理,这些⼯作都交给 defaultEventExecutorGroup 去做。

⽽业务操作由业务线程池中处理,根据 RemotingCommand 的业务请求编号 requestCode , 从处理器表 processorTable 这个本地缓存中找到对应的处理器 , 然后封装成 task 任务后,提交到对应的业务处理器的线程池执⾏。

从⼊⼝到业务逻辑的⼏个步骤⾥,线程池⼀直在增加,这跟每⼀步步骤逻辑复杂性相关 ,越复杂,需要的并发通道越宽。

RocketMQ 的线程模型如下所示 :

image-20240503140757819

4. 总结

  • 网络协议设计原则是便于编码,NettyLenghtFieldBasedFrameDecode 解码器⾮常容易得解决 TCP 粘包和拆包的问题;
  • 通络通讯框架支持同步、异步、单向三种通讯方式;
  • 需要深入理解Reactor线程模型;

本站由 卡卡龙 使用 Stellar 1.29.1主题创建

本站访问量 次. 本文阅读量 次.