来源

https://github.com/feiniaojin/pie-example.git

代码

Channel-管道

注意:ChannelProcessor的doProcess()方法负责触发责任链的执行;通过持有 ChannelProcessor的实现类(负责触发责任链)、ChannelPipeLine的实现类(负责执行责任链),实现责任链的触发、执行的解耦。

image-20240519174927907

Channel

1
2
3
4
5
6
7
8
9
10
11
12
// 通道
public interface Channel {

Channel process(Object in,Object out);

ChannelPipeline pipeline();

interface ChannelProcessor {
void doProcess(Object in,
Object out);
}
}

AbstractChannel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 抽象通道
public abstract class AbstractChannel implements Channel {

private DefaultChannelPipeline pipeline;

private ChannelProcessor processor = new DefaultChannelProcessorImpl();

protected AbstractChannel() {
pipeline = newChannelPipeline();
}

protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}

@Override
public ChannelPipeline pipeline() {
return pipeline;
}

public Channel process(Object inWrapper, Object outWrapper) {
processor.doProcess(inWrapper, outWrapper);
return this;
}

private class DefaultChannelProcessorImpl implements ChannelProcessor {

@Override
public void doProcess(Object inWrapper, Object outWrapper) {
pipeline.process(inWrapper, outWrapper);
}
}
}

DefaultChannel

1
2
3
4
// 默认通道实现
public class DefaultChannel extends AbstractChannel {

}

ChannelPipeLine-责任链

ChannelPipeLine负责责任链的维护和逻辑链式传播执行;默认持有头结点和尾节点,责任链由一个个PipeLineContext节点组成的链状结构组成。

ChannelPipeLine

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public interface ChannelPipeline {

ChannelPipeline process(Object in,
Object out);

ChannelPipeline addLast(String name, ChannelHandler handler);

Channel channel();

ChannelPipeline fireExceptionCaught(Throwable cause,
Object in,
Object out);

ChannelPipeline fireChannelProcess(Object in,
Object out);

ChannelHandlerContext head();

ChannelHandlerContext tail();
}

DefaultChannelPipeline

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
/**
* ChannelPipeline默认实现
*
*/
public class DefaultChannelPipeline implements ChannelPipeline {

AbstractChannelHandlerContext head;
AbstractChannelHandlerContext tail;

private static final String HEAD_NAME = generateName0(HeadContext.class);
private static final String TAIL_NAME = generateName0(TailContext.class);

private Channel channel;

protected DefaultChannelPipeline(Channel channel) {
this.channel = channel;
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}

public ChannelPipeline addLast(String name, ChannelHandler handler) {
AbstractChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, name, handler);
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
return this;
}

public Channel channel() {
return channel;
}

@Override
public ChannelPipeline fireExceptionCaught(Throwable cause,
Object in,
Object out) {
AbstractChannelHandlerContext.invokeExceptionCaught(head, cause, in, out);
return this;
}

@Override
public ChannelPipeline fireChannelProcess(Object in,
Object out) {
AbstractChannelHandlerContext.invokeChannelProcess(head, in, out);
return this;
}

private static String generateName0(Class<?> handlerType) {
return handlerType.getSimpleName() + "#0";
}

final static class TailContext extends AbstractChannelHandlerContext implements ChannelHandler {

private Logger logger = LoggerFactory.getLogger(TailContext.class);

TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, TAIL_NAME, TailContext.class);
}

@Override
public ChannelHandler handler() {
return this;
}

@Override
public void channelProcess(ChannelHandlerContext ctx, Object in, Object out) throws Exception {
if (logger.isDebugEnabled()) {
logger.debug("tail:channelProcess:there is no more handler");
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause, Object in, Object out) throws Exception {
if (logger.isDebugEnabled()) {
logger.debug("tail:exceptionCaught:there is no more handler");
}
}
}

final static class HeadContext extends AbstractChannelHandlerContext implements ChannelHandler {

private Logger logger = LoggerFactory.getLogger(TailContext.class);

HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, HEAD_NAME, HeadContext.class);
}

@Override
public ChannelHandler handler() {
return this;
}


@Override
public void channelProcess(ChannelHandlerContext ctx,
Object in,
Object out) throws Exception {
if(logger.isDebugEnabled()){
logger.debug("head:channelProcess");
}
ctx.fireChannelProcess(in, out);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause,
Object in,
Object out) throws Exception {
logger.info("head:exceptionCaught");
}
}

@Override
public ChannelPipeline process(Object in,
Object out) {
head.process(in, out);
return this;
}

@Override
public ChannelHandlerContext head() {
return head;
}

@Override
public ChannelHandlerContext tail() {
return tail;
}

}

ChannelHandler-逻辑链

负责单个节点逻辑的执行。

ChannelHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* ChannelHandler
*
*/
public interface ChannelHandler {

void channelProcess(ChannelHandlerContext ctx,
Object in,
Object out) throws Exception;

void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause,
Object in,
Object out) throws Exception;
}

ChannelHandlerAdapter

1
2
3
4
5
6
7
8
9
10
11
12
public abstract class ChannelHandlerAdapter implements ChannelHandler {

@Override
public void channelProcess(ChannelHandlerContext ctx, Object in, Object out) throws Exception {
ctx.fireChannelProcess(in, out);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause, Object in, Object out) throws Exception {
ctx.fireExceptionCaught(cause, in, out);
}
}

ChannelHandlerContext-节点

ChannelHandlerContext

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* ChannelHandlerContext
*
*/
public interface ChannelHandlerContext {

Channel channel();

ChannelHandler handler();

ChannelPipeline pipeline();

ChannelHandlerContext process(Object in,
Object out);

ChannelHandlerContext fireExceptionCaught(Throwable cause,
Object in,
Object out);

ChannelHandlerContext fireChannelProcess(Object in,
Object out);
}

AbstractChannelHandlerContext

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
/**
* 抽象ChannelHandlerContext
*
*/
public abstract class AbstractChannelHandlerContext implements ChannelHandlerContext {
volatile AbstractChannelHandlerContext next;
volatile AbstractChannelHandlerContext prev;
private DefaultChannelPipeline pipeline;
private String name;

AbstractChannelHandlerContext(DefaultChannelPipeline pipeline,
String name, Class<? extends ChannelHandler> handlerClass) {
this.name = (String) ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
}

@Override
public Channel channel() {
return pipeline.channel();
}

@Override
public ChannelPipeline pipeline() {
return pipeline;
}

@Override
public ChannelHandlerContext fireExceptionCaught(Throwable cause,
Object in,
Object out) {
invokeExceptionCaught(this.next, cause, in, out);
return this;
}

@Override
public ChannelHandlerContext fireChannelProcess(Object in,
Object out) {
invokeChannelProcess(this.next, in, out);
return this;
}


private void invokeExceptionCaught(final Throwable cause,
Object in,
Object out) {
try {
handler().exceptionCaught(this, cause, in, out);
} catch (Throwable error) {

}
}

private void invokeChannelProcess(Object in,
Object out) {
try {
handler().channelProcess(this, in, out);
} catch (Throwable throwable) {
invokeExceptionCaught(throwable, in, out);
}
}

static void invokeExceptionCaught(final AbstractChannelHandlerContext next,
final Throwable cause,
Object in,
Object out) {
next.invokeExceptionCaught(cause, in, out);
}

static void invokeChannelProcess(final AbstractChannelHandlerContext next,
Object in,
Object out) {
next.invokeChannelProcess(in, out);
}

@Override
public ChannelHandlerContext process(Object in,
Object out) {

try {
handler().channelProcess(this, in, out);
} catch (Throwable t) {
invokeExceptionCaught(t, in, out);
}
return this;
}
}

DefaultChannelHandlerContext

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* DefaultChannelHandlerContext默认实现
*
*/
public class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {

private final ChannelHandler handler;

DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, String name, ChannelHandler handler) {
super(pipeline, name, handler.getClass());
this.handler = handler;
}

@Override
public ChannelHandler handler() {
return handler;
}
}

本站由 卡卡龙 使用 Stellar 1.29.1主题创建

本站访问量 次. 本文阅读量 次.