Netty

学会使用netty并调优

学自《跟闪电侠学 Netty:Netty 即时聊天实战与底层原理》

文档更新于: 2022-8-10 2:43

第 1 章 即时聊天系统简介

​ 移动互联网时代,相信大家应该都对即时聊天工具不陌生,比如最常用的微信,从2011年1月21日诞生至今,已经成为国内数亿用户必不可少的即时通信工具,是男女老少手机中必备的顶级App。Netty是一个异步基于事件驱动的高性能网络通信框架,在互联网中间件领域网络通信层是无可争议的最强王者。在本书中,笔者将带领大家使用Netty一步一步实现即时聊天工具的核心功能。
即时聊天通常分为单聊和群聊,下面分别来介绍一下。

1.1 单聊流程

​ 单聊指两个用户之间相互聊天。用户单聊的基本流程如下图所示。

用户单聊的基本流程

  1. A要和B聊天,首先A和B需要与服务端建立连接,然后进入登录流程,服务端保存用户标识和TCP连接的映射关系。
  2. A给B发消息,首先需要将带有B标识的消息数据包发送到服务端,然后服务端从消息数据包中获得B的标识,找到对应B的连接,将消息发送给B。
  3. 任意一方发消息给对方,如果对方不在线,则需要将消息缓存,在对方上线之后再发送。
    客户端与服务端之间相互通信的数据包被称为指令数据包。指令数据包分为指令和数据,每一种指令都对应客户端或者服务端的一种操作,数据部分对应的是指令处理需要的数据。

问题:要实现单聊,客户端与服务端分别要实现哪些指令呢?

1.2 单聊的指令

1.2.1 指令图示

​ 下图是客户端与服务端单聊的指令流程图。

客户端与服务端单聊的指令流程图

1.2.2 指令列表

​ 下表是要实现的单聊的指令列表,每条指令都会分为客户端和服务端。

指令内容客户端服务端
登录请求发送接收
登录响应接收发送
客户端发消息发送接收
服务端发消息接收发送
登出请求发送接收
登出响应接收发送

1.3 群聊流程

​ 群聊指一个组内多个用户之间的聊天,一个用户发到群组的消息会被组内任何一个成员接收,群聊的基本流程如下图所示。

群聊的基本流程

​ 要实现群聊,其实流程和单聊类似。

  1. A、B、C依然会经历登录流程,服务端保存用户标识对应的TCP连接。
  2. A发起群聊的时候,将A、B、C的标识发送至服务端,服务端拿到标识之后建立一个群ID,然后把这个ID与A、B、C的标识绑定。
  3. 群聊中任意一方在群里聊天的时候,将群ID发送至服务端,服务端获得群ID之后,取出对应的用户标识,遍历用户标识对应的TCP连接,就可以将消息发送至每一个群聊成员。

问题:群聊除了需要实现上述指令,还需要实现哪些指令呢?

1.4 群聊要实现的指令集

1.4.1 指令图示

​ 群聊的指令图示如下图所示。

群聊的指令图示

1.4.2 指令列表

​ 群聊的指令如下表所示。

指令内容客户端服务端
创建群聊请求发送接收
群聊创建成功通知接收发送
加入群聊请求发送接收
群聊加入通知接收发送
发送群聊消息发送接收
接收群聊消息接收发送
退出群聊请求发送接收
退出群聊通知接收发送

1.5 Netty

​ 使用 Netty 统一的 IO 读写 API 以及强大的 Pipeline 来编写业务处理逻辑,了解 Netty 以下核心知识点。
● 如何启动服务端?
● 如何启动客户端?
● 数据载体 ByteBuf。
● 如何设计长连自定义协议?
● 拆包/粘包原理与实践。
● 如何实现自定义编解码?
● 如何使用 Pipeline 与 ChannelHandler ?
● 如何定时发心跳数据包?
● 如何进行连接空闲检测?

1.5.1 客户端使用Netty的程序逻辑结构

​ 下图展示了客户端使用Netty的程序逻辑结构。

客户端Netty的程序逻辑结构

  1. 客户端会解析控制台指令,比如发送消息或者建立群聊等指令。
  2. 客户端会基于控制台的输入创建一个指令对象,用户告诉服务端具体要干什么事情。
  3. TCP通信需要的数据格式为二进制,因此,接下来通过自定义二进制协议将指令对象封装成二进制,这一步被称为协议的编码。
  4. 对于收到服务端的数据,首先需要截取出一段完整的二进制数据包。
  5. 将此二进制数据包解析成指令对象,比如收到消息。
  6. 将指令对象送到对应的逻辑处理器来处理。

1.5.2 服务端使用Netty的程序逻辑结构

​ 服务端使用Netty的程序逻辑结构与客户端非常类似,如下图所示,这里不再赘述。

服务端Netty的程序逻辑结构

1.6 实现的即时聊天形式

以讲授Netty基础知识为主,故不会涉及即时聊天相关的图形化界面,后续所有的聊天都基于控制台进行,通过与控制台交互可以实现单聊和群聊。

第 2 章 Netty 是什么

​ 在开始了解Netty是什么之前,我们先来回顾一下,如果需要实现一个客户端与服务端通信的程序,使用传统的IO编程,应该如何来实现?

2.1 IO编程

​ 我们简化一下场景:客户端每隔两秒发送一个带有时间戳的“hello world”给服务端,服务端收到之后打印它。
​ 为了方便演示,在下面的例子中,服务端和客户端各有一个类,把这两个类复制到你的IDE中,先后运行 IOServer.java 和 IOClient.java ,可以看到效果。
​ 下面是传统的IO编程中的服务端实现。

IOServer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class IOServer {
public static void main(String[] args) throws Exception {
ServerSocket serverSocket = new ServerSocket(8000);
// 接收新连接线程
new Thread(() -> {
while (true) {
try {
// (1)阻塞方法获取新连接
Socket socket = serverSocket.accept();
// (2)为每一个新连接都创建一个新线程,负责读取数据
new Thread(() -> {
try {
int len;
byte[] data = new byte[1024];
InputStream inputStream = socket.getInputStream();
// (3)按字节流方式读取数据
while ((len = inputStream.read(data)) != -1) {
System.out.println(new String(data, 0, len));
}
} catch (IOException e) {
}
}).start();
} catch (IOException e) {
}
}
}).start();
}
}

​ 服务端首先创建一个serverSocket来监听8000端口,然后创建一个线程,线程里不断调用阻塞方法serverSocket.accept()获取新连接,见(1);当获得新连接之后,为每一个新连接都创建一个新线程,这个线程负责从该连接中读取数据,见(2);然后以字节流方式读取数据,见(3)。
​ 下面是传统的IO编程中的客户端实现。

IOClient.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class IOClient {
public static void main(String[] args) {
new Thread(() -> {
try {
Socket socket = new Socket("127.0.0.1", 8000);
while (true) {
try {
socket.getOutputStream().write((new Date() + ": helloworld").getBytes());
Thread.sleep(2000);
} catch (Exception e) {
}
}
} catch (IOException e) {
}
}).start();
}
}

​ 客户端的代码相对简单,连接上服务端8000端口之后,每隔两秒,我们都向服务端写一个带有时间戳的“hello world”。
​ IO编程模型在客户端较少的情况下运行良好,但是对于客户端比较多的业务来说,单机服务端可能需要支撑成千上万个连接,IO模型可能就不太合适了,我们来分析一下原因。
​ 在上面的示例中,从服务端代码可以看到,在传统的IO模型中,每个连接创建成功之后都需要由一个线程来维护,每个线程都包含一个while死循环,那么1万个连接对应1万个线程,继而有1万个 while 死循环,这就带来如下几个问题。

  1. 线程资源受限:线程是操作系统中非常宝贵的资源,同一时刻有大量的线程处于阻塞状态,是非常严重的资源浪费,操作系统耗不起。
  2. 线程切换效率低下:单机CPU核数固定,线程爆炸之后操作系统频繁进行线程切换,应用性能急剧下降。
  3. 除了以上两个问题,在IO编程中,我们看到数据读写是以字节流为单位的。

​ 为了解决这3个问题,JDK在1.4版本之后提出了NIO。

2.2 NIO编程

​ 网上有很多关于NIO的文章,这里不再深入分析。下面简单描述一下NIO是如何解决以上3个问题的。

2.2.1 线程资源受限

​ 在NIO编程模型中,新来一个连接不再创建一个新线程,而是可以把这个连接直接绑定到某个固定的线程,然后这个连接所有的读写都由这个线程来负责,那么它是怎么做到的?我们用下图来对比一下IO与NIO。

IO与NIO

​ 如上图所示,在IO模型中,一个连接来了,会创建一个线程,对应一个while死循环,死循环的目的就是不断监测这个连接上是否有数据可以读。在大多数情况下,1万个连接里面同一时刻只有少量的连接有数据可读,因此,很多while死循环都白白浪费掉了,因为读不出数据。
​ 而在NIO模型中,这么多while死循环转换为一个死循环,这个死循环由一个线程控制,那么NIO又是如何做到一个线程一个while死循环就能监测1万个连接是否有数据可读的呢?
​ 这就是NIO模型中Selector的作用,一个连接来了之后,不会创建一个while死循环去监听是否有数据可读,而是直接把这条连接注册到Selector上。然后,通过检查这个Selector,就可以批量监测出有数据可读的连接,进而读取数据。下面我们举一个生活中非常简单的例子来说明IO与NIO的区别。
​ 在一家幼儿园里,小朋友有上厕所的需求,小朋友都太小以至于你要问他要不要上厕所,他才会告诉你。幼儿园一共有100个小朋友,有两种方案可以解决小朋友上厕所的问题。

  1. 每个小朋友都配一个老师。每个老师都隔段时间询问小朋友是否要上厕所。如果要上,就领他去厕所,100个小朋友就需要100个老师来询问,并且每个小朋友上厕所的时候都需要一个老师领着他去,这就是IO模型,一个连接对应一个线程。
  2. 所有的小朋友都配同一个老师。这个老师隔段时间询问所有的小朋友是否有人要上厕所,然后每一时刻把所有要上厕所的小朋友批量领到厕所,这就是NIO模型。所有小朋友都注册到同一个老师,对应的就是所有的连接都注册到同一个线程,然后批量轮询。

​ 这就是NIO模型解决线程资源受限问题的方案。在实际开发过程中,我们会开多个线程,每个线程都管理着一批连接,相对于IO模型中一个线程管理一个连接,消耗的线程资源大幅减少。

2.2.2 线程切换效率低下

​ 由于NIO模型中线程数量大大降低,因此线程切换效率也大幅度提高。

2.2.3 IO读写面向流

​ IO读写是面向流的,一次性只能从流中读取一字节或者多字节,并且读完之后流无法再读取,需要自己缓存数据。而NIO的读写是面向Buffer的,可以随意读取里面任何字节数据,不需要自己缓存数据,只需要移动读写指针即可。
​ 简单讲完了JDK NIO的解决方案之后,接下来我们使用NIO方案替换掉IO方案。先来看看,如果用JDK原生的NIO来实现服务端,该怎么做。
前方高能预警:以下代码可能会让你感觉极度不适,如有不适,请跳过。

NIOServer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;

/**
* @author 闪电侠
*/
public class NIOServer {
public static void main(String[] args) throws IOException {
Selector serverSelector = Selector.open();
Selector clientSelector = Selector.open();
new Thread(() -> {
try {
// 对应IO编程中的服务端启动
ServerSocketChannel listenerChannel = ServerSocketChannel.open();
listenerChannel.socket().bind(new InetSocketAddress(8000));
listenerChannel.configureBlocking(false);
listenerChannel.register(serverSelector, SelectionKey.OP_ACCEPT);
while (true) {
// 监测是否有新连接,这里的1指阻塞的时间为 1ms
if (serverSelector.select(1) > 0) {
Set<SelectionKey> set = serverSelector.selectedKeys();
Iterator<SelectionKey> keyIterator = set.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (key.isAcceptable()) {
try {
// (1)每来一个新连接,不需要创建一个线程,而是直接注册到clientSelector
SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
clientChannel.configureBlocking(false);
clientChannel.register(clientSelector, SelectionKey.OP_READ);
} finally {
keyIterator.remove();
}
}
}
}
}
} catch (IOException ignored) {
}
}).start();
new Thread(() -> {
try {
while (true) {
// (2)批量轮询哪些连接有数据可读,这里的1指阻塞的时间为 1ms
if (clientSelector.select(1) > 0) {
Set<SelectionKey> set = clientSelector.selectedKeys();
Iterator<SelectionKey> keyIterator = set.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (key.isReadable()) {
try {
SocketChannel clientChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// (3)面向Buffer
clientChannel.read(byteBuffer);
byteBuffer.flip();
System.out.println(Charset.defaultCharset().newDecoder().decode(byteBuffer));
} finally {
keyIterator.remove();
key.interestOps(SelectionKey.OP_READ);
}
}
}
}
}
} catch (IOException ignored) {
}
}).start();
}
}

相信大部分没有接触过NIO的读者应该会直接跳过代码来到这一行:原来使用JDK原生NIO的API实现一个简单的服务端通信程序如此复杂!
我们还是先对照NIO来解释一下核心思路。

  1. NIO模型中通常会有两个线程,每个线程都绑定一个轮询器Selector。在这个例子中,serverSelector负责轮询是否有新连接,clientSelector负责轮询连接是否有数据可读。
  2. 服务端监测到新连接之后,不再创建一个新线程,而是直接将新连接绑定到clientSelector上,这样就不用IO模型中的1万个while循环死等,参见(1)。
  3. clientSelector被一个while死循环包裹着,如果在某一时刻有多个连接有数据可读,那么通过clientSelector.select(1)方法可以轮询出来,进而批量处理,参见(2)。
  4. 数据的读写面向Buffer,参见(3)。

​ 其他细节部分,因为实在是太复杂,所以笔者不再多讲,读者也不用对代码的细节深究到底。总之,强烈不建议直接基于JDK原生NIO来进行网络开发,下面是笔者总结的原因。

  1. JDK的NIO编程需要了解很多概念,编程复杂,对NIO入门非常不友好,编程模型不友好,ByteBuffer的API简直“反人类”。
  2. 对NIO编程来说,一个比较合适的线程模型能充分发挥它的优势,而JDK没有实现,需要自己实现,就连简单的自定义协议拆包都要自己实现。
  3. JDK的NIO底层由Epoll实现,该实现饱受诟病的空轮询Bug会导致CPU占用率飙升至100%。
  4. 项目庞大之后,自行实现的NIO很容易出现各类Bug,维护成本较高,上面这些代码笔者都不能保证没有Bug。

​ 正因为如此,客户端代码这里就省略了,读者可以直接使用IOClient.java与NIOServer.java通信。
​ JDK的NIO犹如带刺的玫瑰,虽然美好,让人向往,但是使用不当会让你抓耳挠腮,痛不欲生,正因为如此,Netty横空出世!

2.3 Netty编程

​ Netty到底是何方神圣?
​ 用一句简单的话来说就是:Netty封装了JDK的NIO,让你用得更方便,不用再写一大堆复杂的代码了。
​ 用官方正式的话来说就是:Netty是一个异步事件驱动的网络应用框架,用于快速开发可维护的高性能服务端和客户端。
​ 下面是笔者总结的使用Netty而不使用JDK原生NIO的原因。

  1. 使用JDK原生NIO需要了解太多概念,编程复杂,一不小心就Bug横飞。
  2. Netty底层IO模型随意切换,而这一切只需要做微小的改动,改改参数,Netty可以直接从NIO模型变身为IO模型。
  3. Netty自带的拆包/粘包、异常检测等机制让你从NIO的繁重细节中脱离出来,只需要关心业务逻辑即可。
  4. Netty解决了JDK很多包括空轮询在内的Bug。
  5. Netty底层对线程、Selector做了很多细小的优化,精心设计的Reactor线程模型可以做到非常高效的并发处理。
  6. 自带各种协议栈,让你处理任何一种通用协议都几乎不用亲自动手。
  7. Netty社区活跃,遇到问题随时邮件列表或者Issue。
  8. Netty已经历各大RPC框架、消息中间件、分布式通信中间件线上的广泛验证,健壮性无比强大。

​ 这些原因看不懂没有关系,在后续的章节中我们都可以学到。接下来我们用Netty来重新实现一下本章开篇的功能吧!
首先引入Maven依赖,本书后续Netty都基于4.1.6.Final版本。

1
2
3
4
5
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.6.Final</version>
</dependency>

​ 然后是服务端实现部分。

NettyServer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;

public class NettyServer {
public static void main(String[] args) {
ServerBootstrap serverBootstrap = new ServerBootstrap();
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
serverBootstrap
.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
System.out.println(msg);
}
});
}
}).bind(8000);
}
}

​ 这么一小段代码就实现了我们前面NIO编程中的所有功能,包括服务端启动、接收新连接、打印客户端传来的数据,怎么样?是不是比JDK原生NIO编程简洁许多?
初学Netty的时候,由于大部分人对NIO编程缺乏经验,因此,将Netty里的概念与IO模型结合起来可能更好理解。

  1. boss对应IOServer.java中的负责接收新连接的线程,主要负责创建新连接。
  2. worker对应IOServer.java中的负责读取数据的线程,主要用于读取数据及业务逻辑处理。

​ 剩下的逻辑笔者在后面的内容中会详细分析,读者可以先把这段代码复制到自己的IDE里,然后运行main函数。
​ 下面是客户端NIO的实现部分。

NettyClient.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;

import java.util.Date;

public class NettyClient {
public static void main(String[] args) throws InterruptedException {
Bootstrap bootstrap = new Bootstrap();
NioEventLoopGroup group = new NioEventLoopGroup();
bootstrap
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new StringEncoder());
}
});
Channel channel = bootstrap.connect("127.0.0.1", 8000).channel();
while (true) {
channel.writeAndFlush(new Date() + ": hello world!");
Thread.sleep(2000);
}
}
}

​ 在客户端程序中,group对应了IOClient.java中main函数起的线程,剩下的逻辑在后面的内容中会详细分析,现在你要做的事情就是把这段代码复制到你的IDE里,然后运行main函数,最后回到NettyServer.java的控制台,你会看到效果。
​ 使用Netty之后是不是觉得整个世界都变美好了?一方面,Netty对NIO封装得如此完美,写出来的代码非常优雅;另一方面,使用Netty之后,网络通信的性能问题几乎不用操心,尽情地让Netty“榨干”你的CPU吧。

第 3 章 Netty 开发环境配置

​ 本章介绍Netty开发环境的搭建,笔者假设读者已经有了Java编程需要的环境。如果读者已经安装过Maven、Git、IntelliJ IDEA环境,建议直接看本章末尾的“如何使用本书的代码”。

本章介绍Netty开发环境的搭建,笔者假设读者已经有了Java编程需要的环境。如果读者已经安装过Maven、Git、IntelliJ IDEA环境,建议直接看本章末尾的“如何使用本书的代码”。

3.1 Maven

Maven是一个基于对象模型来管理项目构建的项目管理工具,通过配置文件pom.xml来配置jar包,相对于传统复制jar包的方式,管理依赖更为方便,如果你没有安装过Maven,下面的指导将带你一起安装。

3.1.1 下载

首先,到Apache官网下载Maven,由于Maven也是使用Java编写的,所以不同操作系统下载的Maven zip包是一样的,这里选择最新的版本:apache-maven-版本号-src.zip,下载到本地之后解压,接下来看不同的操作系统配置。

3.1.2 配置和验证

Windows

  1. 假定我们将文件夹解压到D:\maven,该目录下有bin、lib等目录。
  2. 通过“我的电脑”->“属性”->“高级系统设置”->“环境变量”->“系统变量”->“新建”新建一个环境变量,变量名为M2_HOME,值为D:\maven。
  3. 找到变量名字为Path的环境变量,在变量值的尾部加入“;%M2_HOME%\bin;”,这里需要注意前面的分号。
    最后,打开Dos窗口,输入mvn -v,如果出来版本号相关的信息,则说明我们的Maven已经安装成功了。

Linux && Mac
假定我们将文件夹解压到/usr/local/maven,该目录下有bin、lib等目录;接下来,和Windows系统一样,需要配置环境变量。我们打开/etc/profile文件,在尾部加入下面两行代码。

1
2
export MAVEN_HOME=/usr/local/maven
PATH=$JAVA_HOME/bin:$MAVEN_HOME/bin:$PATH

​ 然后,在命令行执行source /etc/profile,让配置生效;接下来,通过mvn -v命令来验证是否生效,如果出来的是版本号相关的信息,则说明Maven已经安装成功了。

3.2 Git

​ Git是一个版本管理工具,本书代码使用Git来做版本控制,每个章节的代码都是一个Git分支,方便读者循序渐进地学习。我们来看一下如何安装配置Git。

3.2.1 下载与安装

Windows

  • 在gitforwindows.org下载最新版本的Windows Git。

  • 下载完成之后,双击exe文件,只需要一直单击“下一步”按钮,安装即可。其中有一步需要注意一下,如下图所示。

    img

    该步骤为调整环境变量,我们选择中间的一项,继续单击“下一步”按钮,直到安装成功。

  • 安装成功之后,在任意目录上单击鼠标右键,选择“Git Bash Here”这一项,输入git,如果出来提示,则表明安装成功。

Linux && Mac

  • 如果你使用的是Debian或Ubuntu,那么直接使用一条命令sudo apt-get install git即可完成安装;如果是centOS版本,则在命令行执行yuminstall -y git即可完成安装。
  • Mac系统自带Git,不过默认没有安装,你需要运行xcode,然后选择菜单“xcode”->“Preferences”,选择“Downloads”这个Tab页面,再选择“Command Line Tools”,单击“Install”按钮即可完成安装。

3.2.2 配置

​ 最后,我们通过在命令行依次输入以下命令来配置你的名字和邮箱,这样在提交代码的时候就能知道作者的信息。

1
2
git config --global user.name "Your Name"
git config --global user.email "email@example.com"

3.3 IntelliJ IDEA

​ 本书使用IntelliJ IDEA作为集成开发环境。当然,如果你非常熟悉Eclipse,也可以使用Eclipse。对于想入门学习IntelliJ IDEA的读者,笔者之前录制的一个免费视频可以奉献给大家,请通过“读者服务”扫码获取,详细的安装过程和介绍,该视频里均有。
​ 接下来我们看一下如何使用本书的代码。
​ 首先,我们通过下图所示的步骤将代码仓库导入本地。

img

img

​ 代码复制到本地之后,在IntelliJ IDEA右下角切换相应的分支,即可找到每一节对应的完整代码,如下图所示。

img

img

img

​ 由于在代码里,笔者使用了lombok自动生成getter、setter及构造函数,需要在IntelliJ IDEA中安装插件,否则代码会报红,具体安装可以参考下图所示的步骤。
​ 首先调出配置。

img

​ 然后找到IDEA插件相关的配置。

img

​ 接着在弹出来的窗口中输入lombok。

img

​ 最后单击“Install”按钮安装,之后重启IntelliJ IDEA即可。

第 4 章 服务端启动流程

​ 这一章,我们学习如何使用Netty来启动一个服务端应用程序。

4.1 服务端启动最小化代码

​ 以下是服务端启动的一个非常精简的Demo。

NettyServer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class NettyServer {
public static void main(String[] args) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
}
});
serverBootstrap.bind(8000);
}
}
  • 上述代码首先创建了两个NioEventLoopGroup,这两个对象可以看作传统IO编程模型的两大线程组,bossGroup表示监听端口,接收新连接的线程组;workerGroup表示处理每一个连接的数据读写的线程组。用生活中的例子来讲就是,一个工厂要运作,必然要有一个老板负责从外面接活,然后有很多员工,负责具体干活。老板就是bossGroup,员工们就是workerGroup,bossGroup接收完连接,交给workerGroup去处理。
  • 其次创建了一个引导类ServerBootstrap,这个类将引导服务端的启动工作。
  • 通过.group(bossGroup,workerGroup)给引导类配置两大线程组,这个引导类的线程模型也就定型了。
  • 然后指定服务端的IO模型为NIO,上述代码通过.channel(NioServerSocketChannel.class)来指定IO模型,也可以有其他选择。如果你想指定IO模型为BIO,那么这里配置上OioServerSocketChannel.class类型即可。当然通常我们也不会这么做,因为Netty的优势就在于NIO。
  • 接着调用childHandler()方法,给这个引导类创建一个ChannelInitializer,主要是定义后续每个连接的数据读写,对于业务处理逻辑,不理解也没关系,后面我们会详细分析。在ChannelInitializer这个类中,有一个泛型参数NioSocketChannel,这个类就是Netty对NIO类型连接的抽象,而前面的NioServerSocketChannel也是对NIO类型连接的抽象,NioServerSocketChannel和NioSocketChannel的概念可以与BIO编程模型中的ServerSocket和Socket两个概念对应。

最小化参数配置到这里就完成了,总结一下就是,要启动一个Netty服务端,必须要指定三类属性,分别是线程模型、IO模型、连接读写处理逻辑。有了这三者,之后再调用bind(8000),就可以在本地绑定一个8000端口启动服务端。以上这段代码,读者可以直接复制到自己的IDE中运行。

4.2 自动绑定递增端口

上面代码绑定了8000端口,接下来我们实现一个稍微复杂点的逻辑。我们指定一个起始端口号,比如1000;然后从1000端口往上找一个端口,直到这个端口能够绑定成功。比如1000端口不可用,我们就尝试绑定1001端口,然后1002端口,以此类推。

serverBootstrap.bind(8000)方法是一个异步方法,调用之后是立即返回的,它的返回值是一个ChannelFuture。我们可以给这个ChannelFuture添加一个监听器GenericFutureListener,然后在GenericFutureListener的operationComplete方法里,监听端口是否绑定成功。下面是监听端口是否绑定成功的代码片段。

1
2
3
4
5
6
7
8
9
10
serverBootstrap.bind(8000).addListener(new GenericFutureListener<Future<? super Void>>() {
public void operationComplete(Future<? super Void> future) {
if (future.isSuccess()) {
System.out.println("端口绑定成功!");
} else {

System.err.println("端口绑定失败!");
}
}
});

接下来就可以从1000端口开始,往上找端口号,直到端口绑定成功。我们要做的就是在if (future.isSuccess())的else逻辑里重新绑定一个递增的端口。我们从这段绑定逻辑中抽取出一个bind方法。

1
2
3
4
5
6
7
8
9
10
11
12
private static void bind(final ServerBootstrap serverBootstrap, final int port) {
serverBootstrap.bind(port).addListener(new GenericFutureListener<Future<? super Void>>() {
public void operationComplete(Future<? super Void> future) {
if (future.isSuccess()) {
System.out.println("端口[" + port + "]绑定成功!");
} else {
System.err.println("端口[" + port + "]绑定失败!");
bind(serverBootstrap, port + 1);
}
}
});
}

上述代码中最关键的就是在端口绑定失败之后,重新调用自身方法,并且把端口号加一,这样,在我们的主流程里面就可以直接调用。

1
bind(serverBootstrap, 1000)

读者可以自行修改代码,运行之后看到效果,最终会发现,端口成功绑定在了1024。从1000开始到1023,端口均绑定失败,这是因为在笔者的Mac系统下,1023以下的端口号都被系统保留了,需要ROOT权限才能绑定。

以上就是自动绑定递增端口的逻辑。

问题:服务端启动引导类ServerBootstrap除了指定线程模型、IO模型、连接读写处理逻辑,还可以做哪些事情?

4.3 服务端启动的其他方法

4.3.1 handler()方法

1
2
3
4
5
serverBootstrap.handler(new ChannelInitializer<NioServerSocketChannel>() {
protected void initChannel(NioServerSocketChannel ch) {
System.out.println("服务端启动中");
}
});

handler()方法可以和前面分析的childHandler()方法对应起来:childHandler()方法用于指定处理新连接数据的读写处理逻辑;handler()方法用于指定在服务端启动过程中的一些逻辑,通常情况下用不到这个方法。

4.3.2 attr()方法

1
serverBootstrap.attr(AttributeKey.newInstance("serverName"), "nettyServer")

attr()方法可以给服务端Channel,也就是NioServerSocketChannel指定一些自定义属性,然后通过channel.attr()取出这个属性。比如,上面的代码可以指定服务端Channel的serverName属性,属性值为nettyServer,其实就是给NioServerSocketChannel维护一个Map而已,通常情况下也用不上这个方法。

4.3.3 childAttr()方法

除了可以给服务端Channel即NioServerSocketChannel指定一些自定义属性,我们还可以给每一个连接都指定自定义属性。

1
serverBootstrap.childAttr(AttributeKey.newInstance("clientKey"), "clientValue")

上面的childAttr()方法可以给每一个连接都指定自定义属性,后续我们可以通过channel.attr()方法取出该属性。

4.3.4 option()方法

option()方法可以给服务端Channel设置一些TCP参数,最常见的就是so_backlog,设置如下。

1
serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024)

这个设置表示系统用于临时存放已完成三次握手的请求的队列的最大长度,如果连接建立频繁,服务器处理创建新连接较慢,则可以适当调大这个参数。

4.3.5 childOption()方法

childOption()方法可以给每个连接都设置一些TCP参数。

1
2
3
serverBootstrap
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)

上述代码中设置了两种TCP参数,其中:

  • ChannelOption.SO_KEEPALIVE表示是否开启TCP底层心跳机制,true表示开启。
  • ChannelOption.TCP_NODELAY表示是否开启Nagle算法,true表示关闭,false表示开启。通俗地说,如果要求高实时性,有数据发送时就马上发送,就设置为关闭;如果需要减少发送次数,减少网络交互,就设置为开启。

其他参数这里就不一一讲解了,读者有兴趣可以自行研究。

4.4 总结

  • 在本章中,我们首先学习了Netty服务端启动的流程,一句话总结就是:首先创建一个引导类,然后给它指定线程模型、IO模型、连接读写处理逻辑,绑定端口之后,服务端就启动起来了。
  • 然后我们学习到bind方法是异步的,可以通过这个异步机制来实现递增端口绑定。
  • 最后我们讨论了Netty服务端启动的其他方法,主要包括给服务端Channel或者客户端Channel设置属性值、设置底层TCP参数。

如果你觉得这个过程比较简单,想深入了解服务端启动的底层原理,可参考第21章。

第 5 章 客户端启动流程

上一章,我们已经学习了Netty服务端启动流程;这一章,我们来学习Netty客户端启动流程。

5.1 客户端启动Demo

对于客户端的启动来说,和服务端的启动类似,依然需要线程模型、IO模型,以及IO业务处理逻辑三大参数。下面我们来看一下客户端启动的标准流程。

NettyClient.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class NettyClient {
public
static void main(String[] args) {
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap
// 1.指定线程模型
.group(workerGroup)
// 2.指定 IO 类型为 NIO
.channel(NioSocketChannel.class)
// 3.IO 处理逻辑
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
}
});
// 4.建立连接
bootstrap.connect("localhost", 80).addListener(future -> {
if (future.isSuccess()) {
System.out.println("连接成功!");
} else {
System.err.println("连接失败!");
}
});
}
}

从上面的代码可以看到,客户端启动的引导类是Bootstrap,负责启动客户端和连接服务端;而在服务端启动的时候,这个引导类是ServerBootstrap。引导类创建完成之后,客户端启动的流程如下。

  • 与服务端的启动一样,需要给它指定线程模型,驱动连接的数据读写,这个线程的概念可以和第1章中IOClient.java创建的线程联系起来。
  • 指定IO模型为NioSocketChannel,表示IO模型为NIO。当然,你可以设置IO模型为OioSocketChannel,但是通常不会这么做,因为Netty的优势在于NIO。
  • 给引导类指定一个Handler,主要定义连接的业务处理逻辑,不理解没关系,在后面会详细分析。
  • 配置完线程模型、IO模型、业务处理逻辑之后,调用connect方法进行连接,可以看到connect方法有两个参数,第一个参数可以填写IP或者域名,第二个参数填写端口号。由于connect方法返回的是一个Future,也就是说这个方法是异步的,通过addListener方法可以监听连接是否成功,进而打印连接信息。

到了这里,一个客户端启动的Demo就完成了,其实只要和客户端Socket编程模型对应起来,这里的三个概念就会显得非常简单。读者如果忘掉了,可以先回顾一下第1章的IOClient.java,再来看这里的启动流程。

5.2 失败重连

在网络情况差的情况下,客户端第一次连接可能会连接失败,这个时候我们可能会尝试重连。重连的逻辑写在连接失败的逻辑块里。

1
2
3
4
5
6
7
8
9
10
bootstrap
.connect("meituan.com", 80)
.addListener(future -> {
if (future.isSuccess()) {
System.out.println("连接成功!");
} else {
System.err.println("连接失败!");
// 重连
}
});

在重连的时候,依然调用同样的逻辑。因此,我们把建立连接的逻辑先抽取出来,然后在重连的时候,递归调用自身。

1
2
3
4
5
6
7
8
9
10
private static void connect(Bootstrap bootstrap, String host, int port) {
bootstrap.connect(host, port).addListener(future -> {
if (future.isSuccess()) {
System.out.println("连接成功!");
} else {
System.err.println("连接失败,开始重连");
connect(bootstrap, host, port);
}
});
}

上面这一段便是带有自动重连功能的逻辑,可以看到在连接失败的时候,会调用自身进行重连。

但是,在通常情况下,连接失败不会立即重连,而是通过一个指数退避的方式,比如每隔1秒、2秒、4秒、8秒,以2的幂次来建立连接,到达一定次数之后就放弃连接。接下来我们实现这段逻辑,默认重试5次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
connect(bootstrap, "meituan.com", 80, MAX_RETRY);

private static void connect(Bootstrap bootstrap, String host, int port, int retry) {
bootstrap.connect(host, port).addListener(future -> {
if (future.isSuccess()) {
System.out.println("连接成功!");
} else if (retry == 0) {
System.err.println("重试次数已用完,放弃连接!");
} else {
// 第几次重连
int order = (MAX_RETRY - retry) + 1;
// 本次重连的间隔
int delay = 1 << order;
System.err.println(new Date() + ": 连接失败,第" + order + "次重连……");
bootstrap.config().group().schedule(() -> connect(bootstrap, host, port, retry--), delay, TimeUnit.SECONDS);
}
});
}

从上面的代码可以看到,通过判断连接是否成功及剩余的重试次数,分别执行不同的逻辑。

  • 如果连接成功,则打印连接成功的消息。
  • 如果连接失败但重试次数已经用完,则放弃连接。
  • 如果连接失败但重试次数仍然没有用完,则计算下一次重连间隔delay,然后定期重连。

在上面的代码中,我们看到,定时任务调用的是bootstrap.config().group().schedule(),其中bootstrap.config()这个方法返回的是BootstrapConfig,它是对Bootstrap配置参数的抽象,然后bootstrap.config().group()返回的就是我们在一开始配置的线程模型workerGroup,调用workerGroup的schedule方法即可实现定时任务逻辑。

在schedule方法块里,前四个参数原封不动地传递,最后一个重试次数参数减掉1,就是下一次建立连接时的上下文信息。读者可以自行修改代码,更改到一个连接不上的服务端Host或者Port,查看控制台日志就可以看到5次重连日志。

以上就是实现指数退避的客户端重连逻辑。

问题:客户端启动过程中的引导类Bootstrap除了指定线程模型、IO模型、连接读写处理逻辑,还可以做哪些事情?

5.3 客户端启动的其他方法

5.3.1 attr()方法

1
bootstrap.attr(AttributeKey.newInstance("clientName"), "nettyClient")

attr()方法可以为客户端Channel也就是NioSocketChannel绑定自定义属性,然后通过channel.attr()方法取出这个属性。比如,上面的代码可以指定客户端Channel的clientName属性,属性值为nettyClient,其实就是为NioSocketChannel维护一个Map而已。后续在NioSocketChannel通过参数传来传去的时候,就可以通过它来取出设置的属性,非常方便。

5.3.2 option()方法

1
2
3
4
Bootstrap
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)

option()方法可以为连接设置一些TCP底层相关的属性,比如上面的代码中,我们设置了3种TCP属性,其中:

  • ChannelOption.CONNECT_TIMEOUT_MILLIS表示连接的超时时间,超过这个时间,如果仍未连接到服务端,则表示连接失败。
  • ChannelOption.SO_KEEPALIVE表示是否开启TCP底层心跳机制,true表示开启。
  • ChannelOption.TCP_NODELAY表示是否开始Nagle算法,true表示关闭,false表示开启。通俗地说,如果要求高实时性,有数据发送时就马上发送,就设置为true;如果需要减少发送次数,减少网络交互,就设置为false。

其他参数这里就不一一讲解了,读者有兴趣可以去自行研究。

5.4 总结

  • 本章中我们首先学习了Netty客户端启动的流程,一句话总结就是:首先创建一个引导类,然后为它指定线程模型、IO模型、连接读写处理逻辑,连接上特定主机和端口后,客户端就启动起来了。
  • 然后我们学习到connect方法是异步的,可以通过异步回调机制来实现指数退避重连逻辑。
  • 最后我们讨论了Netty客户端启动的其他方法,主要包括给客户端Channel绑定自定义属性值、设置底层TCP参数。

5.5 思考

与服务端启动相比,客户端启动的引导类少了哪些方法,为什么不需要这些方法?

第 6 章 客户端与服务端双向通信

在前面两章中,我们学习了服务端启动与客户端启动的流程。熟悉了这两个流程之后,就可以建立服务端与客户端之间的连接了。本章我们用一个Demo来了解服务端和客户端是如何通信的。

本章要实现的功能是:在客户端连接成功之后,向服务端写一段数据;服务端收到数据之后打印,并向客户端返回一段数据。这里展示的是核心代码,完整代码请参考代码仓库对应的章节。

6.1 客户端发送数据到服务端

在客户端启动流程这一章,读者已经了解到客户端相关的数据读写逻辑是通过Bootstrap的handler()方法指定的。

1
2
3
4
5
6
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
// 指定连接数据读写逻辑
}
});

接下来在initChannel()方法里给客户端添加一个逻辑处理器,其作用是负责向服务端写数据。

1
2
3
4
5
6
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new FirstClientHandler());
}
});
  • ch.pipeline()返回的是和这条连接相关的逻辑处理链,采用了责任链模式。
  • 调用addLast()方法添加一个逻辑处理器,逻辑处理器的作用就是,在客户端建立连接成功之后,向服务端写数据。下面是这个逻辑处理器相关的代码。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class FirstClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println(new Date() + ": 客户端写出数据");
// 1. 获取数据
ByteBuf buffer = getByteBuf(ctx);
// 2. 写数据
ctx.channel().writeAndFlush(buffer);
}

private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
// 1. 获取二进制抽象 ByteBuf
ByteBuf buffer = ctx.alloc().buffer();
// 2. 准备数据,指定字符串的字符集为 UTF-8
byte[] bytes = "你好,闪电侠!".getBytes(StandardCharsets.UTF_8);
// 3. 填充数据到 ByteBuf
buffer.writeBytes(bytes);
return buffer;
}
}
  • 这个逻辑处理器继承自ChannelInboundHandlerAdapter,覆盖了channelActive()方法,这个方法会在客户端连接建立成功之后被调用。

  • 客户端连接建立成功之后,调用channelActive()方法。在这个方法里,我们编写向服务端写数据的逻辑。

  • 写数据的逻辑分为三步:首先需要获取一个Netty对二进制数据的抽象ByteBuf。在上面代码中,ctx.alloc()获取到一个ByteBuf的内存管理器,其作用就是分配一个ByteBuf。然后把字符串的二进制数据填充到ByteBuf,这样就获取到Netty需要的数据格式。最后调用ctx.channel().writeAndFlush()把数据写到服务端。

以上就是客户端启动之后,向服务端写数据的逻辑。可以看到,和传统的Socket编程不同的是,Netty里的数据是以ByteBuf为单位的,所有需要写出的数据都必须放到一个ByteBuf中。数据的写出如此,数据的读取亦如此。接下来我们看一下服务端是如何读取这段数据的。

6.2 服务端读取客户端数据

在服务端启动流程这一章,我们提到,服务端相关的数据处理逻辑是通过ServerBootstrap的childHandler()方法指定的。

1
2
3
4
5
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
// 指定连接数据读写逻辑
}
});

现在,我们在initChannel()方法里给服务端添加一个逻辑处理器,这个处理器的作用就是负责读取客户端发来的数据。

1
2
3
4
5
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new FirstServerHandler());
}
});

这个方法里的逻辑和客户端侧类似,获取服务端侧关于这个连接的逻辑处理链Pipeline,然后添加一个逻辑处理器,负责读取客户端发来的数据。

1
2
3
4
5
6
7
public class FirstServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println(new Date() + ": 服务端读到数据 -> " + byteBuf.toString(StandardCharsets.UTF_8));
}
}

服务端侧的逻辑处理器同样继承自ChannelInboundHandlerAdapter。与客户端不同的是,这里覆盖的方法是channelRead(),这个方法在接收到客户端发来的数据之后被回调。

这里的msg参数指的就是Netty里数据读写的载体,为什么这里不直接是ByteBuf,而需要强转一下呢?我们后面会分析到。这里我们强转之后,调用byteBuf.toString()就能够获得客户端发来的字符串数据。

我们先运行服务端,再运行客户端,下面两个图分别是服务端控制台和客户端控制台的输出。

服务端

img

客户端
img

到目前为止,我们已经实现了客户端发送数据和服务端打印,离本章开头提出的目标还差一半,接下来我们实现另外一半目标:服务端收到数据之后向客户端返回数据。

6.3 服务端返回数据到客户端

服务端向客户端写数据逻辑与客户端的写数据逻辑一样,首先创建一个ByteBuf,然后填充二进制数据,最后调用writeAndFlush()方法写出去。下面是服务端返回数据的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class FirstServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 接收数据逻辑省略
// 返回数据到客户端
System.out.println(new Date() + ": 服务端写出数据");
ByteBuf out = getByteBuf(ctx);
ctx.channel().writeAndFlush(out);
}

private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
byte[] bytes = "你好,欢迎关注我的微信公众号,《闪电侠的博客》!".getBytes(StandardCharsets.UTF_8);
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes(bytes);
return buffer;
}
}

现在,轮到客户端了。客户端读取数据的逻辑和服务端读取数据的逻辑一样,同样是覆盖channelRead()方法。

1
2
3
4
5
6
7
8
public class FirstClientHandler extends ChannelInboundHandlerAdapter {
// 写数据相关的逻辑省略
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println(new Date() + ": 客户端读到数据 -> " + byteBuf.toString(StandardCharsets.UTF_8));
}
}

将这段逻辑添加到客户端的逻辑处理器FirstClientHandler之后,客户端就能收到服务端发来的数据。

客户端与服务端读写数据的逻辑完成之后,先运行服务端,再运行客户端,控制台输出分别如下面两图所示。

服务端

img

客户端

img

到这里,本章要实现的客户端与服务端双向通信的功能就实现完毕了。

6.4 总结

  • 首先,我们了解到客户端和服务端的逻辑处理均在启动的时候,通过为逻辑处理链Pipeline添加逻辑处理器,来编写数据的读写逻辑。Pipeline的逻辑我们在后面会分析。
  • 然后,在客户端连接成功之后,会回调到逻辑处理器的channelActive()方法。不管服务端还是客户端,收到数据之后都会调用channelRead()方法。

写数据调用writeAndFlush()方法,客户端与服务端交互的二进制数据载体为ByteBuf,ByteBuf通过连接的内存管理器创建,字节数据填充到ByteBuf之后才能写到对端。接下来一章我们会重点分析ByteBuf。

6.5 思考

如何实现在新连接接入的时候,服务端主动向客户端推送消息,客户端回复服务端消息?

第 7 章 数据载体 ByteBuf 的介绍

在前面的章节中,我们已经了解到,Netty的数据读写是以ByteBuf为单位进行交互的。本章我们就来详细剖析一下ByteBuf。

7.1 ByteBuf的结构

首先,我们来了解一下ByteBuf的结构,如下图所示。

img

从ByteBuf的结构图可以看到:

  • ByteBuf是一个字节容器,容器里面的数据分为三部分,第一部分是已经丢弃的字节,这部分数据是无效的;第二部分是可读字节,这部分数据是ByteBuf的主体数据,从ByteBuf里读取的数据都来自这一部分;最后一部分的数据是可写字节,所有写到ByteBuf的数据都会写到这一段。后面的虚线部分表示该ByteBuf最多还能扩容多少容量。
  • 以上三部分内容是被两个指针划分出来的,从左到右依次是读指针(readerIndex)和写指针(writerIndex)。还有一个变量capacity,表示ByteBuf底层内存的总容量。
  • 从ByteBuf中每读取一字节,readerIndex自增1,ByteBuf里总共有writerIndex-readerIndex字节可读。由此可以知道,当readerIndex与writerIndex相等的时候,ByteBuf不可读。
  • 写数据是从writerIndex指向的部分开始写的,每写一字节,writerIndex自增1,直到增到capacity。这个时候,表示ByteBuf已经不可写。
  • ByteBuf里其实还有一个参数maxCapacity。当向ByteBuf写数据的时候,如果容量不足,则可以进行扩容,直到capacity扩容到maxCapacity,超过maxCapacity就会报错。

Netty使用ByteBuf这个数据结构可以有效地区分可读数据和可写数据,读写之间相互没有冲突。当然,ByteBuf只是对二进制数据的抽象,具体底层的实现我们后面会讲到。在这里,读者只需要知道Netty关于数据读写只认ByteBuf。下面我们来学习ByteBuf常用的API。

7.2 容量API

capacity()
表示ByteBuf底层占用了多少字节的内存(包括丢弃的字节、可读字节、可写字节),不同的底层实现机制有不同的计算方式,后面我们介绍ByteBuf的分类时会讲到。
maxCapacity()
表示ByteBuf底层最大能够占用多少字节的内存,当向ByteBuf中写数据的时候,如果发现容量不足,则进行扩容,直到扩容到maxCapacity,超过这个数,就抛出异常。
readableBytes()与isReadable()
readableBytes()表示ByteBuf当前可读的字节数,它的值等于writerIndex-readerIndex,如果两者相等,则不可读,isReadable()方法返回false。
writableBytes()、isWritable()与maxWritableBytes()
writableBytes()表示ByteBuf当前可写的字节数,它的值等于capacity-writerIndex,如果两者相等,则表示不可写,isWritable()返回false,但是这个时候,并不代表不能往ByteBuf写数据了。如果发现往ByteBuf写数据写不进去,Netty会自动扩容ByteBuf,直到底层的内存大小为maxCapacity,而maxWritableBytes()就表示可写的最大字节数,它的值等于maxCapacitywriterIndex。

7.3 读写指针相关的API

readerIndex()与readerIndex(int)
前者表示返回当前的读指针readerIndex,后者表示设置读指针。
writeIndex()与writeIndex(int)
前者表示返回当前的写指针writerIndex,后者表示设置写指针。
markReaderIndex()与resetReaderIndex()
前者表示把当前的读指针保存起来,后者表示把当前的读指针恢复到之前保存的值。下面两段代码是等价的。

1
2
3
4
5
6
7
8
9
// 代码片段一
int readerIndex = buffer.readerIndex();
// 其他操作
buffer.readerIndex(readerIndex);

// 代码片段二
buffer.markReaderIndex();
// 其他操作
buffer.resetReaderIndex();

希望大家多使用代码片段二这种方式,不需要自己定义变量。无论Buffer被当作参数传递到哪里,调用resetReaderIndex()都可以恢复到之前的状态,在解析自定义协议的数据包时非常常见,推荐大家使用这一对API。
markWriterIndex()与resetWriterIndex()
这一对API的作用与上一对API类似,这里不再赘述。

7.4 读写API

本质上,关于ByteBuf的读写都可以看作从指针开始的地方开始读写数据。

writeBytes(byte[] src)与buffer.readBytes(byte[] dst)
writeBytes()表示把字节数组src里的数据全部写到ByteBuf,而readBytes()表示把ByteBuf里的数据全部读取到dst。这里dst字节数组的大小通常等于readableBytes(),而src字节数组大小的长度通常小于等于writableBytes()。

writeByte(byte b)与buffer.readByte()
writeByte()表示往ByteBuf中写一字节,而buffer.readByte()表示从ByteBuf中读取一字节,类似的API还有writeBoolean()、writeChar()、writeShort()、writeInt()、writeLong()、writeFloat()、writeDouble(),以及readBoolean()、readChar()、readShort()、readInt()、readLong()、readFloat()、readDouble(),这里不再赘述,相信读者应该很容易理解这些API。

与读写API类似的API还有getBytes()、getByte()与setBytes()、setByte()系列,唯一的区别就是get、set不会改变读写指针,而read、write会改变读写指针,这一点在解析数据的时候千万要注意。
release()与retain()
由于Netty使用了堆外内存,而堆外内存是不被JVM直接管理的。也就是说,申请到的内存无法被垃圾回收器直接回收,所以需要我们手动回收。这有点类似于C语言里,申请到的内存必须手工释放,否则会造成内存泄漏。

Netty的ByteBuf是通过引用计数的方式管理的,如果一个ByteBuf没有地方被引用到,则需要回收底层内存。在默认情况下,当创建完一个ByteBuf时,它的引用为1,然后每次调用retain()方法,它的引用就加一,release()方法的原理是将引用计数减一,减完之后如果发现引用计数为0,则直接回收ByteBuf底层的内存。

slice()、duplicate()、copy()
在通常情况下,这三个方法会被放到一起比较,三者的返回值分别是一个新的ByteBuf对象。

  • slice()方法从原始ByteBuf中截取一段,这段数据是从readerIndex到writeIndex的,同时,返回的新的ByteBuf的最大容量maxCapacity为原始ByteBuf的readableBytes()。
  • duplicate()方法把整个ByteBuf都截取出来,包括所有的数据、指针信息。
  • slice()方法与duplicate()方法的相同点是:底层内存及引用计数与原始ByteBuf共享,也就是说,经过slice()方法或者duplicate()方法返回的ByteBuf调用write系列方法都会影响到原始ByteBuf,但是它们都维持着与原始ByteBuf相同的内存引用计数和不同的读写指针。
  • slice()方法与duplicate()方法的不同点就是:slice()方法只截取从readerIndex到writerIndex之间的数据,它返回的ByteBuf的最大容量被限制到原始ByteBuf的readableBytes(),而duplicate()方法是把整个ByteBuf都与原始ByteBuf共享。
  • slice()方法与duplicate()方法不会复制数据,它们只是通过改变读写指针来改变读写的行为,而最后一个方法copy()会直接从原始ByteBuf中复制所有的信息,包括读写指针及底层对应的数据,因此,往copy()方法返回的ByteBuf中写数据不会影响原始ByteBuf。
  • slice()方法和duplicate()方法不会改变ByteBuf的引用计数,所以原始ByteBuf调用release()方法之后发现引用计数为零,就开始释放内存,调用这两个方法返回的ByteBuf也会被释放。这时候如果再对它们进行读写,就会报错。因此,我们可以通过调用一次retain()方法来增加引用,表示它们对应的底层内存多了一次引用,引用计数为2。在释放内存的时候,需要调用两次release()方法,将引用计数降到零,才会释放内存。
  • 这三个方法均维护着自己的读写指针,与原始ByteBuf的读写指针无关,相互之间不受影响。

retainedSlice()与retainedDuplicate()
相信读者应该已经猜到这两个API的作用了,它们的作用是在截取内存片段的同时,增加内存的引用计数,分别与下面两段代码等价。

1
2
3
4
// retainedSlice 等价于
slice().retain();
// retainedDuplicate() 等价于
duplicate().retain();

使用slice()和duplicate()方法的时候,千万要理清内存共享、引用计数共享、读写指针不共享等概念。下面举两个常见的容易出错的例子。
例1:多次释放

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Buffer buffer = xxx;
doWith(buffer);
// 一次释放
buffer.release();

public void doWith (Bytebuf buffer){
// ...
// 没有增加引用计数
Buffer slice = buffer.slice();
foo(slice);
}
public void foo (ByteBuf buffer){
// 从缓冲区读取并处理
// 重复释放
buffer.release();
}

这里的doWith有时候是用户自定义的方法,有时候是Netty的回调方法,如channelRead()等。

例2:不释放造成内存泄漏

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Buffer buffer = xxx;
doWith(buffer);
// 引用计数为2,调用release()方法之后,引用计数为1,无法释放内存
buffer.release();

public void doWith(Bytebuf buffer) {
// ...
// 增加引用计数
Buffer slice = buffer.retainedSlice();
foo(slice);
// 没有调用release()方法
}
public void foo(ByteBuf buffer) {
//从缓冲区读取并处理
}

想要避免以上两种情况的发生,大家只需要记住一点,在一个函数体里面,只要增加了引用计数(包括ByteBuf的创建和手动调用retain()方法),就必须调用release()方法。

7.5 实战

了解了以上API之后,我们使用上述API来写一个简单的Demo。

ByteBufTest.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;

public class ByteBufTest {
public static void main(String[] args) {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(9, 100);
print("allocate ByteBuf(9, 100)", buffer);
// write方法改变写指针,写完之后写指针未到capacity的时候,buffer仍然可写
buffer.writeBytes(new byte[]{1, 2, 3, 4});
print("writeBytes(1,2,3,4)", buffer);
// write方法改变写指针,写完之后写指针未到capacity的时候,buffer仍然可写,写完int类型之后,写指针增加4
buffer.writeInt(12);
print("writeInt(12)", buffer);
// write方法改变写指针,写完之后写指针等于capacity的时候,buffer不可写
buffer.writeBytes(new byte[]{5});
print("writeBytes(5)", buffer);
// write方法改变写指针,写的时候发现buffer不可写则开始扩容,扩容之后capacity随即改变
buffer.writeBytes(new byte[]{6});
print("writeBytes(6)", buffer);
// get方法不改变读写指针
System.out.println("getByte(3) return: " + buffer.getByte(3));
System.out.println("getShort(3) return: " + buffer.getShort(3));
System.out.println("getInt(3) return: " + buffer.getInt(3));
print("getByte()", buffer);
// set方法不改变读写指针
buffer.setByte(buffer.readableBytes() + 1, 0);
print("setByte()", buffer);
// read方法改变读指针
byte[] dst = new byte[buffer.readableBytes()];
buffer.readBytes(dst);
print("readBytes(" + dst.length + ")", buffer);
}

private static void print(String action, ByteBuf buffer) {
System.out.println("after ===========" + action + "============");
System.out.println("capacity(): " + buffer.capacity());
System.out.println("maxCapacity(): " + buffer.maxCapacity());
System.out.println("readerIndex(): " + buffer.readerIndex());
System.out.println("readableBytes(): " + buffer.readableBytes());
System.out.println("isReadable(): " + buffer.isReadable());
System.out.println("writerIndex(): " + buffer.writerIndex());
System.out.println("writableBytes(): " + buffer.writableBytes());
System.out.println("isWritable(): " + buffer.isWritable());
System.out.println("maxWritableBytes(): " + buffer.maxWritableBytes());
System.out.println();
}
}

控制台输出如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
after ===========allocate ByteBuf(9, 100)============
capacity()9
maxCapacity()100
readerIndex()0
readableBytes()0
isReadable()false
writerIndex()0
writableBytes()9
isWritable()true
maxWritableBytes()100

after ===========writeBytes(1,2,3,4)============
capacity()9
maxCapacity()100
readerIndex()0
readableBytes()4
isReadable()true
writerIndex()4
writableBytes()5
isWritable()true
maxWritableBytes()96

after ===========writeInt(12)============
capacity()9
maxCapacity()100
readerIndex()0
readableBytes()8
isReadable()true
writerIndex()8
writableBytes()1
isWritable()true
maxWritableBytes()92

after ===========writeBytes(5)============
capacity()9
maxCapacity()100
readerIndex()0
readableBytes()9
isReadable()true
writerIndex()9
writableBytes()0
isWritable()false
maxWritableBytes()91

after ===========writeBytes(6)============
capacity()64
maxCapacity()100
readerIndex()0
readableBytes()10
isReadable()true
writerIndex()10
writableBytes()54
isWritable()true
maxWritableBytes()90

getByte(3) return: 4
getShort(3) return: 1024
getInt(3) return: 67108864
after ===========getByte()============
capacity()64
maxCapacity()100
readerIndex()0
readableBytes()10
isReadable()true
writerIndex()10
writableBytes()54
isWritable()true
maxWritableBytes()90

after ===========setByte()============
capacity()64
maxCapacity()100
readerIndex()0
readableBytes()10
isReadable()true
writerIndex()10
writableBytes()54
isWritable()true
maxWritableBytes()90

after ===========readBytes(10)============
capacity()64
maxCapacity()100
readerIndex()10
readableBytes()0
isReadable()false
writerIndex()10
writableBytes()54
isWritable()true
maxWritableBytes()90


了解了ByteBuf的结构之后,不难理解控制台的输出。

7.6 总结

  • 本章我们分析了Netty对二进制数据的抽象ByteBuf的结构,本质上它的原理就是,引用了一段内存,这段内存可以是堆内的,也可以是堆外的,然后用引用计数来控制这段内存是否需要被释放。使用读写指针来控制ByteBuf的读写,可以理解为是外观模式的一种使用。
  • 基于读写指针和容量、最大可扩容容量,衍生出一系列读写方法,要注意read、write与get、set的区别。
  • 多个ByteBuf可以引用同一段内存,通过引用计数来控制内存的释放,遵循谁retain()谁release()的原则。
  • 最后,我们通过一个具体的例子说明了ByteBuf的实际使用。

7.7 思考

slice()方法可能用在什么场景?

第 8 章 客户端与服务端通信协议编解码

在学习了ByteBuf的API之后,本章我们来学习如何设计并实现客户端与服务端的通信协议。

8.1 什么是客户端与服务端的通信协议

无论使用Netty还是使用原始的Socket编程,基于TCP通信的数据包格式均为二进制,协议指的就是客户端与服务端事先商量好的,每一个二进制数据包中的每一段字节分别代表什么含义的规则。一个简单的登录指令如下图所示。

登录指令

在这个数据包中,第一个字节为1表示这是一个登录指令,接下来是用户名和密码,这两个值以\0分割,客户端发送这段二进制数据包到服务端,服务端就能根据这个协议来取出用户名和密码,执行登录逻辑。在实际的通信协议设计中,我们会考虑更多细节,比这个协议稍微复杂一些。

那么,协议设计好之后,客户端与服务端的通信过程又是怎样的呢?

客户端与服务端的通信如下图所示。

img

  • 客户端把一个Java对象按照通信协议转换成二进制数据包。
  • 通过网络,把这段二进制数据包发送到服务端。在数据的传输过程中,由TCP/IP协议负责数据的传输,与应用层无关。
  • 服务端接收到数据之后,按照协议取出二进制数据包中的相应字段,包装成Java对象,交给应用逻辑处理。
  • 服务端处理完之后,如果需要生成响应给客户端,则按照相同的流程进行。

第1章已经列出了实现一个支持单聊和群聊的IM指令集合,设计协议的目的就是客户端与服务端能够识别这些具体的指令。接下来,我们就看一下如何设计这个通信协议。

8.2 通信协议的设计

通信协议的设计如下图所示。

通信协议的设计

  • 第一个字段是魔数,通常情况下为固定的几字节(这里规定为4字节)。为什么需要这个字段,而且还是一个固定的数?假设我们在服务器上开了一个端口,比如80端口,如果没有这个魔数,任何数据包传递到服务器,服务器都会根据自定义协议来进行处理,包括不符合自定义协议规范的数据包。例如,直接通过http://服务器IP来访问服务器(默认为80端口),服务端收到的是一个标准的HTTP协议数据包,但是它仍然会按照事先约定好的协议来处理HTTP协议,显然,这是会解析出错的。而有了这个魔数之后,服务端首先取出前面4字节进行比对,能够在第一时间识别出这个数据包并非是遵循自定义协议的,也就是无效数据包,出于安全考虑,可以直接关闭连接以节省资源。在Java字节码的二进制文件中,开头的4字节为0xcafebabe,用来标识这是一个字节码文件,亦有异曲同工之妙。
  • 接下来的1字节为版本号,通常情况下是预留字段,在协议升级的时候用到,有点类似TCP协议中的一个字段标识是IPV4协议还是IPV6协议。在大多数情况下,这个字段是用不到的,但是为了协议能够支持升级,还是先留着。
  • 第三部分的序列化算法表示如何把Java对象转换为二进制数据及二进制数据如何转换回Java对象,比如Java自带的序列化、JSON、Hessian等序列化方式。
  • 第四部分的字段表示指令,关于指令相关的介绍,我们在前面已经讨论过。服务端或者客户端每收到一种指令,都会有相应的处理逻辑。这里我们用1字节来表示,最高支持256种指令,对于这个即时聊天系统来说已经完全足够了。
  • 第五部分的字段表示数据长度,占4字节。
  • 最后一部分为数据内容,每一种指令对应的数据都是不一样的,比如登录的时候需要用户名和密码,收消息的时候需要用户标识和具体消息内容等。

在通常情况下,这样一套标准的协议能够适配大多数情况下的客户端与服务端的通信场景,接下来我们就看一下如何使用Netty来实现这套协议。

8.3 通信协议的实现

我们把Java对象根据协议封装成二进制数据包的过程称为编码,把从二进制数据包中解析出Java对象的过程称为解码。在学习如何使用Netty进行通信协议的编解码之前,我们先来定义一下客户端与服务端通信的Java对象。

8.3.1 Java对象

如下代码定义通信过程中的Java对象。

1
2
3
4
5
6
7
8
9
10
11
@Data
public abstract class Packet {
/**
* 协议版本
*/
private Byte version = 1;
/**
* 指令
*/
public abstract Byte getCommand();
}
  • 以上是通信过程中Java对象的抽象类。可以看到,我们定义了一个版本号(默认值为1),以及一个获取指令的抽象方法。所有的指令数据包都必须实现这个方法,这样我们就可以知道某种指令的含义。
  • @Data注解由lombok提供,它会自动帮我们生产getter、setter方法,减少大量重复代码,推荐使用。

接下来,以客户端登录请求为例,定义登录请求数据包。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public interface Command {
Byte LOGIN_REQUEST = 1;
}

@Data
public class LoginRequestPacket extends Packet {

private Integer userId;
private String username;
private String password;

@Override
public Byte getCommand() {
return LOGIN_REQUEST;
}
}

登录请求数据包继承自Packet定义了3个字段,分别是用户ID、用户名和密码。其中最为重要的就是覆盖了父类的getCommand()方法,值为常量LOGIN_REQUEST。
Java对象定义完成之后,我们需要定义一种规则,如何把一个Java对象转换成二进制数据,这个规则叫作Java对象的序列化。

8.3.2 序列化

如下代码定义序列化接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface Serializer {
/**
* 序列化算法
*/
byte getSerializerAlgorithm();
/**
* Java 对象转换成二进制数据
*/
byte[] serialize(Object object);
/**
* 二进制数据转换成Java对象
*/
<T> T deserialize(Class<T> clazz, byte[] bytes);
}

序列化接口有3个方法:getSerializerAlgorithm()方法获取具体的序列化算法标识,serialize()方法将Java对象转换成字节数组,deserialize()方法将字节数组转换成某种类型的Java对象。在此,使用最简单的JSON序列化方式,将阿里巴巴的Fastjson作为序列化框架。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public interface SerializerAlgorithm {
/**
* JSON序列化标识
*/
byte JSON = 1;
}

public class JSONSerializer implements Serializer {
@Override
public byte getSerializerAlgorithm() {
return SerializerAlgorithm.JSON;
}

@Override
public byte[] serialize(Object object) {
return JSON.toJSONBytes(object);
}

@Override
public <T> T deserialize(Class<T> clazz, byte[] bytes) {
return JSON.parseObject(bytes, clazz);
}
}

然后,我们定义一下序列化算法的类型,以及默认序列化算法。

1
2
3
4
5
6
7
8
9
public interface Serializer {
/**
* JSON序列化
*/
byte JSON_SERIALIZER = 1;

Serializer DEFAULT = new JSONSerializer();
// ...
}

这样,我们就实现了序列化相关的逻辑。如果想要实现其他序列化算法,则只需要继承Serializer,然后定义序列化算法的标识,再覆盖两个方法即可。
序列化定义了Java对象与二进制数据的互转过程。接下来,我们学习如何把这部分数据编码到通信协议的二进制数据包中去。

8.3.3 编码:封装成二进制数据的过程

PacketCodeC.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private static final int MAGIC_NUMBER = 0x12345678;
public ByteBuf encode(Packet packet) {
// 1. 创建 ByteBuf 对象
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.ioBuffer();
// 2. 序列化 Java 对象
byte[] bytes = Serializer.DEFAULT.serialize(packet);
// 3. 实际编码过程
byteBuf.writeInt(MAGIC_NUMBER);
byteBuf.writeByte(packet.getVersion());
byteBuf.writeByte(Serializer.DEFAULT.getSerializerAlgorithm());
byteBuf.writeByte(packet.getCommand());
byteBuf.writeInt(bytes.length);
byteBuf.writeBytes(bytes);
return byteBuf;
}

编码过程分为3个步骤。

  • 我们需要创建一个ByteBuf,这里我们调用Netty的ByteBuf分配器来创建,ioBuffer()方法会返回适配IO读写相关的内存,它会尽可能创建一个直接内存。直接内存可以理解为不受JVM堆管理的内存空间,写到IO缓冲区的效果更高。
  • 将Java对象序列化成二进制数据包。
  • 我们对照本章开头的协议设计和上一章ByteBuf的API,逐个往ByteBuf写入字段,即实现了编码过程。到此,编码过程结束。

一端实现编码之后,Netty会将此ByteBuf写到另一端。另一端获得的也是一个ByteBuf对象。基于这个ByteBuf对象,就可以反解出在对端创建的Java对象,这个过程被称作解码,下面我们就来分析这个过程。

8.3.4 解码:解析Java对象的过程

PacketCodeC.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public Packet decode(ByteBuf byteBuf) {
// 跳过魔数
byteBuf.skipBytes(4);
// 跳过版本号
byteBuf.skipBytes(1);
// 序列化算法标识
byte serializeAlgorithm = byteBuf.readByte();
// 指令
byte command = byteBuf.readByte();
// 数据包长度
int length = byteBuf.readInt();
byte[] bytes = new byte[length];
byteBuf.readBytes(bytes);
Class<? extends Packet> requestType = getRequestType(command);
Serializer serializer = getSerializer(serializeAlgorithm);
if
(requestType != null && serializer != null) {
return serializer.deserialize(requestType, bytes);
}
return null;
}

private static Class<? extends Packet> getRequestType(byte command) {
return REQUEST_TYPE_MAP.get(command);
}

public static Serializer getSerializer(byte serializerAlgorithm) {
if (serializerAlgorithm == SerializerAlgorithm.JSON) {
return new JSONSerializer();
}
return null;
}

解码的流程如下。

  • 我们假定decode方法传递进来的ByteBuf已经是合法的(后面我们再来实现校验),即首个4字节是我们定义的魔数0x12345678,这里我们调用skipBytes跳过这个4字节。
  • 我们暂时不关注协议版本,通常在没有遇到协议升级的时候,暂时不处理这个字段。因为在绝大多数情况下,几乎用不着这个字段,但仍然需要暂时保留。
  • 我们调用ByteBuf的API分别获得序列化算法标识、指令、数据包的长度。
  • 我们根据获得的数据包的长度取出数据,通过指令获得该数据包对应的Java对象的类型,根据序列化算法标识获得序列化对象,将字节数组转换为Java对象。至此,解码过程结束。

由此可以看到,解码过程与编码过程正好是相反的过程。

8.4 总结

本章,我们学到了以下几个知识点。

  • 通信协议是为了客户端与服务端交互,双方协商出来的满足一定规则的二进制数据格式。
  • 介绍了一种通用的通信协议的设计,包括魔数、版本号、序列化算法标识、指令、数据长度、数据几个字段,该协议能够满足绝大多数通信场景。
  • Java对象及序列化的目的就是实现Java对象与二进制数据的互转。
  • 我们依照设计的协议和ByteBuf的API实现了通信协议,这个过程被称为编解码过程。

8.5 思考

  • 除了JSON序列化方式,还有哪些序列化方式?如何实现?
  • 序列化和编码都是把Java对象封装成二进制数据的过程,这两者有什么区别和联系?

第 9 章 实现客户端登录

本章我们来实现客户端登录到服务端的过程。

9.1 登录流程

登录流程如下图所示。

img

由上图可以看到,客户端连接上服务端之后:
1.客户端首先会构建一个登录请求对象,然后通过编码把请求对象编码为ByteBuf,写到服务端。
2.服务端接收到ByteBuf之后,首先通过解码把ByteBuf解码为登录请求响应,然后进行校验。
3.服务端校验通过之后,构造一个登录响应对象,依然经过编码,再写回客户端。
4.客户端接收服务端的响应数据之后,解码ByteBuf,获得登录响应对象,判断是否登录成功。

9.2 逻辑处理器
下面来分别实现上述4个过程。开始之前,我们先回顾一下客户端与服务端的启动流程,当客户端启动的时候,我们会在引导类Bootstrap中配置客户端的处理逻辑。本节中我们给客户端配置的逻辑处理器被叫作ClientHandler。

1
2
public class ClientHandler extends ChannelInboundHandlerAdapter {
}

然后,在客户端启动的时候,我们给Bootstrap配置上这个逻辑处理器。

1
2
3
4
5
6
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new ClientHandler());
}
});

这样,在客户端Netty中IO事件相关的回调就能够回调到ClientHandler。
同样,我们给服务端引导类ServerBootstrap也配置一个逻辑处理器:ServerHandler。

1
2
3
4
5
6
7
8
9
public class ServerHandler extends ChannelInboundHandlerAdapter {}

serverBootstrap
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new ServerHandler());
}
}

这样,在服务端Netty中IO事件相关的回调就能够回调到ServerHandler。
接下来,我们就围绕这两个Handler编写处理逻辑。

9.3 客户端发送登录请求

9.3.1 客户端处理登录请求

我们实现客户端与服务端连接之后,立即登录。在连接上服务端之后,Netty会回调ClientHandler的channelActive()方法,我们在这个方法体里编写相应的逻辑。

ClientHandler.java

1
2
3
4
5
6
7
8
9
10
11
12
13
public void channelActive(ChannelHandlerContext ctx) {
System.out.println(new Date() + ": 客户端开始登录");
// 创建登录对象
LoginRequestPacket loginRequestPacket = new LoginRequestPacket();
loginRequestPacket.setUserId(UUID.randomUUID().toString());
loginRequestPacket.setUsername("flash");
loginRequestPacket.setPassword("pwd");
// 编码
ByteBuf buffer = PacketCodeC.INSTANCE.encode(ctx.alloc(), loginRequestPacket);
// 写数据
ctx.channel().writeAndFlush(buffer);
}

在编码环节,我们把PacketCodeC变成单例模式,然后从ByteBuf分配器抽取出一个参数,这里第一个实参ctx.alloc()获取的就是与当前连接相关的ByteBuf分配器,建议这样使用。

写数据的时候,我们首先通过ctx.channel()获取当前连接(Netty对连接的抽象为Channel,后面章节会分析),然后调用writeAndFlush()把二进制数据写到服务端。这样,客户端发送登录请求的逻辑就完成了。接下来,我们介绍服务端接收到这个数据之后是如何处理的。

9.3.2 服务端处理登录请求

ServerHandler.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf requestByteBuf = (ByteBuf) msg;
// 解码
Packet packet = PacketCodeC.INSTANCE.decode(requestByteBuf);
// 判断是否是登录请求数据包
if (packet instanceof LoginRequestPacket) {
LoginRequestPacket loginRequestPacket = (LoginRequestPacket) packet;
// 登录校验
if (valid(loginRequestPacket)) {
// 校验成功
} else {
// 校验失败
}
}
}
private boolean valid(LoginRequestPacket loginRequestPacket) {
return true;
}

向服务端引导类ServerBootstrap中添加逻辑处理器ServerHandler,Netty在收到数据之后,会回调channelRead()方法。这里的第二个参数msg,在这个场景中,可以直接强转为ByteBuf。为什么Netty不直接把这个参数类型定义为ByteBuf?我们在后续的内容中会分析。

拿到ByteBuf之后,首先要做的事情就是解码,解码出Java数据包对象,然后判断如果是登录请求数据包LoginRequestPacket,就进行登录逻辑的处理。这里,我们假设所有的登录都是成功的,valid()方法返回true。

服务端校验通过之后,接下来就需要向客户端发送登录响应,我们继续编写服务端的逻辑。

9.4 服务端发送登录响应

9.4.1 服务端处理登录响应

ServerHandler.java

1
2
3
4
5
6
7
8
9
10
11
12
LoginResponsePacket loginResponsePacket = new LoginResponsePacket();
loginResponsePacket.setVersion(packet.getVersion());
if (valid(loginRequestPacket)) {
loginResponsePacket.setSuccess(true);
} else {
loginResponsePacket.setReason("账号密码校验失败");
loginResponsePacket.setSuccess(false);
}
// 编码
ByteBuf responseByteBuf = PacketCodeC.INSTANCE.encode(ctx.alloc(), loginResponsePacket);
ctx.channel().writeAndFlush(responseByteBuf);

这段代码仍然是在服务端逻辑处理器ServerHandler的channelRead()方法里构造一个登录响应包LoginResponsePacket,然后在校验成功和失败的时候分别设置标志位,接下来调用编码器把Java对象编码成ByteBuf,调用writeAndFlush()写到客户端。至此,服务端的登录逻辑编写完成。还有最后一步,即客户端处理登录响应。

9.4.2 客户端处理登录响应

客户端接收服务端数据的处理逻辑也在ClientHandler的channelRead()方法中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;
Packet packet = PacketCodeC.INSTANCE.decode(byteBuf);
if (packet instanceof LoginResponsePacket) {
LoginResponsePacket loginResponsePacket = (LoginResponsePacket) packet;
if (loginResponsePacket.isSuccess()) {
System.out.println(new Date() + ": 客户端登录成功");
}
else {
System.out.println(new Date() + ": 客户端登录失败,原因:" +loginResponsePacket.getReason());
}
}
}

客户端拿到数据之后,调用PacketCodeC进行解码操作,如果类型是登录响应数据包,则逻辑比较简单,在控制台打印出一条消息。
至此,客户端整个登录流程就结束了。这里为了给大家演示,客户端和服务端的处理逻辑都较为简单,但是相信大家应该已经掌握了使用Netty来进行客户端与服务端交互的基本思路。基于这个思路,再运用到实际项目中,并不是难事。
最后,我们来看一下效果,下面两图分别是客户端与服务端的控制台输出。

客户端

img

服务端

img

9.5 总结

本章我们梳理了客户端登录的基本流程,然后结合上一章的编解码逻辑,使用Netty实现了完整的客户端登录流程。

9.6 思考

客户端登录成功或者失败之后,如何把成功或者失败的标识绑定在客户端的连接上?服务端又如何高效避免客户端重新登录?

第 10 章 实现客户端与服务端收发消息

这一章,我们来实现客户端与服务端收发消息,要实现的具体功能是:在控制台输入一条消息之后按回车键,校验完客户端的登录状态之后,把消息发送到服务端;服务端收到消息之后打印,并向客户端发送一条消息,客户端收到消息之后打印。

10.1 收发消息对象

首先,我们来定义一下客户端与服务端的收发消息对象。我们把客户端发送至服务端的消息对象定义为MessageRequestPacket。

1
2
3
4
5
6
7
8
@Data
public class MessageRequestPacket extends Packet {
private String message;
@Override
public Byte getCommand() {
return MESSAGE_REQUEST;
}
}

指令为MESSAGE_REQUEST=3。
我们把服务端发送至客户端的消息对象定义为MessageResponsePacket。

1
2
3
4
5
6
7
8
@Data
public class MessageResponsePacket extends Packet {
private String message;
@Override
public Byte getCommand() {
return MESSAGE_RESPONSE;
}
}

指令为MESSAGE_RESPONSE=4。
至此,我们的指令已经有如下4种。

1
2
3
4
5
6
public interface Command {
Byte LOGIN_REQUEST = 1;
Byte LOGIN_RESPONSE = 2;
Byte MESSAGE_REQUEST = 3;
Byte MESSAGE_RESPONSE = 4;
}

10.2 判断客户端是否登录成功

在第9章中,我们出了一道思考题:如何判断客户端是否已经登录?
在客户端启动流程这一章,我们提到可以给客户端连接也就是Channel绑定属性,那么通过channel.attr(xxx).set(xx)方式,是否可以在登录成功之后,给Channel绑定一个登录成功的标志位,然后在判断是否登录成功的时候取出这个标志位呢?答案是肯定的。
首先定义登录成功的标志位。

1
2
3
public interface Attributes {
AttributeKey<Boolean> LOGIN = AttributeKey.newInstance("login");
}

然后在客户端登录成功之后,给客户端绑定登录成功的标志位。
ClientHandler.java

1
2
3
4
5
6
7
8
9
10
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// ...
if (loginResponsePacket.isSuccess()) {
LoginUtil.markAsLogin(ctx.channel());
System.out.println(new Date() + ": 客户端登录成功");
} else {
System.out.println(new Date() + ": 客户端登录失败,原因:" + loginResponsePacket.getReason());
}
// ...
}

这里,我们省去了非关键代码部分。

1
2
3
4
5
6
7
8
9
public class LoginUtil {
public static void markAsLogin(Channel channel) {
channel.attr(Attributes.LOGIN).set(true);
}
public static boolean hasLogin(Channel channel) {
Attribute<Boolean> loginAttr = channel.attr(Attributes.LOGIN);
return loginAttr.get() != null;
}
}

如上代码所示,我们抽取出LoginUtil用于设置登录标志位并判断是否有标志位。如果有标志位,不管标志位的值是什么,都表示已经成功登录。接下来,我们实现控制台输入消息并发送至服务端。

10.3 在控制台输入消息并发送

现在,我们在客户端连接上服务端之后启动控制台线程,从控制台获取消息,然后发送至服务端。

NettyClient.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private static void connect(Bootstrap bootstrap, String host, int port, int retry) {
bootstrap.connect(host, port).addListener(future -> {
if (future.isSuccess()) {
Channel channel = ((ChannelFuture) future).channel();
// 连接成功之后,启动控制台线程
startConsoleThread(channel);
}
// ...
});
}

private static void startConsoleThread(Channel channel) {
new Thread(() -> {
while (!Thread.interrupted()) {
if (LoginUtil.hasLogin(channel)) {
System.out.println("输入消息发送至服务端: ");
Scanner sc = new Scanner(System.in);
String line = sc.nextLine();
MessageRequestPacket packet = new MessageRequestPacket();
packet.setMessage(line);
ByteBuf byteBuf = PacketCodeC.INSTANCE.encode(channel.alloc(), packet);
channel.writeAndFlush(byteBuf);
}
}
}).start();
}

这里,我们省略了非关键代码。连接成功之后,调用startConsoleThread()开始启动控制台线程。在控制台线程中,判断只要当前Channel是登录状态,就允许控制台输入消息。
从控制台获取消息之后,将消息封装成消息对象,然后将消息编码成ByteBuf,最后通过writeAndFlush()将消息写到服务端,这个过程相信大家在学习了上节内容后,应该不会太陌生。接下来,我们介绍服务端收到消息之后是如何处理的。

10.4 服务端收发消息处理

ServerHandler.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf requestByteBuf = (ByteBuf) msg;
Packet packet = PacketCodeC.INSTANCE.decode(requestByteBuf);
if (packet instanceof LoginRequestPacket) {
// 处理登录
} else if (packet instanceof MessageRequestPacket) {
// 处理消息
MessageRequestPacket messageRequestPacket = ((MessageRequestPacket) packet);
System.out.println(new Date() + ": 收到客户端消息: " + messageRequestPacket.getMessage());
MessageResponsePacket messageResponsePacket = new MessageResponsePacket();
messageResponsePacket.setMessage("服务端回复【" + messageRequestPacket.getMessage() + "】");
ByteBuf responseByteBuf = PacketCodeC.INSTANCE.encode(ctx.alloc(), messageResponsePacket);
ctx.channel().writeAndFlush(responseByteBuf);
}
}

服务端在收到消息之后,仍然回调到channelRead()方法,解码之后用一个else分支进入消息处理的流程。

首先服务端将收到的消息打印到控制台,然后封装一个消息响应对象MessageResponsePacket,接下来编码成ByteBuf,再调用writeAndFlush()将数据写到客户端。我们再来看一下客户端收到消息的逻辑。

10.5 客户端收消息处理

ClientHandler.java

1
2
3
4
5
6
7
8
9
10
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;
Packet packet = PacketCodeC.INSTANCE.decode(byteBuf);
if (packet instanceof LoginResponsePacket) {
// 登录逻辑
} else if (packet instanceof MessageResponsePacket) {
MessageResponsePacket messageResponsePacket = (MessageResponsePacket) packet;
System.out.println(new Date() + ": 收到服务端的消息: " + messageResponsePacket.getMessage());
}
}

客户端在收到消息之后,回调到channelRead()方法,仍然用一个else逻辑进入消息处理的逻辑,这里我们仅仅简单地打印出消息。
最后,我们来看一下客户端和服务端的运行效果图。

客户端

img

服务端

img

10.6 总结

1.定义了负责收发消息的Java对象中进行消息的收发。
2.学习了Channel的attr()方法的实际用法:可以通过给Channel绑定属性来设置某些状态,获取某些状态,不需要额外的Map来维持。
3.学习了如何在控制台获取消息并发送至服务端。
4.实现了服务端回消息、客户端响应的逻辑。可以看到,这部分实际和前一章的登录流程有点类似。

10.7 思考

随着我们实现的指令越来越多,如何避免channelRead()中对指令处理的if else泛滥?

第 11 章 Pipeline 与 ChannelHandler

这一章,我们将学习Netty的两大核心组件:Pipeline与ChannelHandler。
我们在上一章的最后提出一个问题:如何避免if else泛滥?我们注意到,不管服务端还是客户端,处理流程大致都分为下图所示的几个步骤。

img

我们把这三类逻辑都写在一个类里,客户端写在ClientHandler,服务端写在ServerHandler,如果要做功能的扩展(比如我们要校验魔数,或者其他特殊逻辑),只能在一个类里修改,这个类就会变得越来越臃肿。

另外,每次发指令数据包都要手动调用编码器编码成ByteBuf,对于这类场景的编码优化,我们能想到的办法自然是模块化处理,不同的逻辑放置到单独的类中来处理,最后将这些逻辑串联起来,形成一个完整的逻辑处理链。

Netty中的Pipeline和ChannelHandler正是用来解决这个问题的。它通过责任链设计模式来组织代码逻辑,并且能够支持逻辑的动态添加和删除,Netty能够支持各类协议的扩展,比如HTTP、Websocket和Redis,靠的就是Pipeline和ChannelHandler。下面我们来学习这部分内容。

11.1 Pipeline与ChannelHandler的构成

无论从服务端来看,还是从客户端来看,在Netty的整个框架里面,一个连接对应着一个Channel。这个Channel的所有处理逻辑都在一个叫作ChannelPipeline的对象里,ChannelPipeline是双向链表结构,它和Channel之间是一对一的关系。

如下图所示,ChannelPipeline里的每个节点都是一个ChannelHandlerContext对象,这个对象能够获得和Channel相关的所有上下文信息。这个对象同时包含一个重要的对象,那就是逻辑处理器ChannelHandler,每个ChannelHandler都处理一块独立的逻辑。

img

我们再来看ChannelHandler有哪些分类。

11.2 ChannelHandler的分类

由下图可以看到,ChannelHandler有两大子接口。

img

第一个子接口是ChannelInboundHandler,从字面意思可以猜到,它是处理读数据的逻辑。比如在一端读到一段数据,首先要解析这段数据,然后对这段数据做一系列逻辑处理,最终把响应写到对端。在组装响应之前的所有处理逻辑,都可以放置在一系列ChannelInboundHandler里处理,它的一个最重要的方法就是channelRead()。读者可以将ChannelInboundHandler的逻辑处理过程与TCP的七层协议解析过程联系起来,把收到的数据一层层地从物理层上升到应用层。

第二个子接口ChannelOutboundHandler是处理写数据的逻辑,它是定义一端在组装完响应之后把数据写到对端的逻辑。比如,我们封装好一个response对象后,有可能对这个response做一些其他特殊逻辑处理,然后编码成ByteBuf,最终写到对端。它最核心的方法就是write(),读者可以将ChannelOutboundHandler的逻辑处理过程与TCP的七层协议封装过程联系起来。我们在应用层组装响应之后,通过层层协议的封装,直到底层的物理层。

这两个子接口分别有对应的默认实现:ChannelInboundHandlerAdapter和ChannelOutbound- HandlerAdapter,它们分别实现了两大子接口的所有功能,在默认情况下会把读写事件传播到下一个Handler。

下面我们就用一个具体的Demo来学习这两大Handler的事件传播机制。

11.3 ChannelInboundHandler的事件传播

关于ChannelInboundHandler,我们用channelRead()作例子,来体验一下Inbound事件的传播。
我们在服务端的Pipeline添加3个ChannelInboundHandler。

NettyServer.java

1
2
3
4
5
6
7
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.Pipeline().addLast(new InBoundHandlerA());
ch.Pipeline().addLast(new InBoundHandlerB());
ch.Pipeline().addLast(new InBoundHandlerC());
}
});

每个inboundHandler都继承自ChannelInboundHandlerAdapter,实现了channelRead()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class InBoundHandlerA extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("InBoundHandlerA: " + msg);
super.channelRead(ctx, msg);
}
}

public class InBoundHandlerB extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("InBoundHandlerB: " + msg);
super.channelRead(ctx, msg);
}
}

public class InBoundHandlerC extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("InBoundHandlerC: " + msg);
super.channelRead(ctx, msg);
}
}

在channelRead()方法里,我们打印当前Handler的信息,调用父类的channelRead()方法。而父类的channelRead()方法会自动调用下一个inboundHandler的channelRead()方法,并且会把当前inboundHandler里处理完毕的对象传递到下一个inboundHandler,上述例子中传递的对象都是同一个msg。

我们通过addLast()方法为Pipeline添加inboundHandler,当然,除了这个方法,还有其他方法,感兴趣的读者可以自行浏览Pipeline的API,这里我们添加的顺序为A ->B->C。最后来看一下控制台的输出,如下图所示。

img

由上图可以看到,inboundHandler的执行顺序与通过addLast()方法添加的顺序保持一致。我们再来看outboundHandler的事件传播。

11.4 ChannelOutboundHandler的事件传播

关于ChannelOutboundHandler,我们用write()作例子,来体验一下Outbound事件的传播。
我们继续在服务端的Pipeline添加3个ChannelOutboundHandler。

1
2
3
4
5
6
7
8
9
10
11
12
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
// inbound,处理读数据的逻辑链
ch.Pipeline().addLast(new InboundHandlerA());
ch.Pipeline().addLast(new InboundHandlerB());
ch.Pipeline().addLast(new InboundHandlerC());
// outbound,处理写数据的逻辑链
ch.Pipeline().addLast(new OutboundHandlerA());
ch.Pipeline().addLast(new OutboundHandlerB());
ch.Pipeline().addLast(new OutboundHandlerC());
}
});

每个outboundHandler都继承自ChannelOutboundHandlerAdapter,实现了write()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class OutboundHandlerA extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("OutboundHandlerA: " + msg);
super.write(ctx, msg, promise);
}
}

public class OutboundHandlerB extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("OutboundHandlerB: " + msg);
super.write(ctx, msg, promise);
}
}

public class OutboundHandlerC extends ChannelOutboundHandlerAdapter {
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("OutboundHandlerC: " + msg);
super.write(ctx, msg, promise);
}
}

在write()方法里,我们打印当前Handler的信息,调用父类的write()方法。而父类的write()方法会自动调用下一个outboundHandler的write()方法,并且会把当前outboundHandler里处理完毕的对象传递到下一个outboundHandler。
我们通过addLast()方法添加outboundHandler的顺序为A->B->C。最后来看控制台的输出,如下图所示。

img

由上图可以看到,outboundHandler的执行顺序与添加的顺序相反,这是为什么呢?这就要说到Pipeline的结构和执行顺序了。
Pipeline的结构
如下图所示,不管我们定义的是哪种类型的Handler,最终它们都以双向链表的方式连接。实际链表的节点是ChannelHandlerContext,为了让结构清晰突出,可以直接把节点看作ChannelHandler。

img

Pipeline的执行顺序
虽然两种类型的Handler在一个双向链表里,但是这两类Handler的分工是不一样的,inboundHandler的事件通常只会传播到下一个inboundHandler,outboundHandler的事件通常只会传播到下一个outboundHandler,两者相互不受干扰,如下图所示。

img

关于Pipeline与ChannelHandler相关的事件传播就讲完了。下一章,我们会了解几种特殊的ChannelHandler,并且使用这几种特殊的ChannelHandler来改造客户端和服务端逻辑,解决if else泛滥的问题。

11.5 总结

  • 通过前面编写客户端与服务端处理逻辑,引出了Pipeline和ChannelHandler的概念。
  • ChannelHandler分为Inbound和Outbound两种类型的接口,分别是处理数据读与数据写的逻辑,可与TCP协议栈处理数据的两个方向联系起来。
  • 两种类型的Handler均有相应的默认实现,默认会把事件传递到下一个Handler,这里的传递事件其实就是把本Handler的处理结果传递到下一个Handler继续处理。
  • inboundHandler的执行顺序与实际的添加顺序相同,而outboundHandler则相反。

11.6 思考

  • 参考本章的例子,如果我们往Pipeline里添加Handler的顺序不变,要在控制台打印出

    inboundA->inboundC->outboundB->outboundA,该如何实现?

  • 如何在每个Handler里都打印上一个Handler处理结束的时间点?

第 12 章 构建客户端与服务端的 Pipeline

通过上一章的学习,我们已经了解了Pipeline和ChannelHandler的基本概念。本章使用上一章的理论知识来重新构建客户端和服务端的Pipeline,把复杂的逻辑从单独的一个ChannelHandler中抽取出来。
Netty内置了很多开箱即用的ChannelHandler,我们通过学习Netty内置的ChannelHandler来逐步构建Pipeline。

12.1 ChannelInboundHandlerAdapter与ChannelOutboundHandlerAdapter

首先是ChannelInboundHandlerAdapter,这个适配器主要用于实现其接口ChannelInboundHandler的所有方法,这样我们在编写自己的Handler时就不需要实现Handler里的每一种方法,而只需要实现我们所关心的方法即可。在默认情况下,对于ChannelInboundHandlerAdapter,我们比较关心的是它的如下方法。

ChannelInboundHandlerAdapter.java

1
2
3
4
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}

它的作用就是接收上一个Handler的输出,这里的msg就是上一个Handler的输出。大家也可以看到,默认情况下的Adapter会通过fireChannelRead()方法直接把上一个Handler的输出结果传递到下一个Handler。
与ChannelInboundHandlerAdapter类似的类是ChannelOutboundHandlerAdapter,它的核心方法如下。
ChannelOutboundHandlerAdapter.java

1
2
3
4
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ctx.write(msg, promise);
}

在默认情况下,这个Adapter也会把对象传递到下一个Outbound节点,它的传播顺序与inboundHandler相反,这里不再展开介绍。
我们往Pipeline添加的第一个Handler的channelRead()方法中,msg对象其实就是ByteBuf。服务端在接收到数据之后,应该要做的第一步逻辑就是把这个ByteBuf进行解码,然后把解码后的结果传递到下一个Handler,如下所示。

1
2
3
4
5
6
7
8
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf requestByteBuf = (ByteBuf) msg;
// 解码
Packet packet = PacketCodeC.INSTANCE.decode(requestByteBuf);
// 解码后的对象传递到下一个Handler
ctx.fireChannelRead(packet);
}

在开始解码之前,我们先来了解一下另外一个特殊的Handler。

12.2 ByteToMessageDecoder

通常情况下,无论在客户端还是在服务端,当我们收到数据后,首先要做的就是把二进制数据转换到Java对象,所以Netty很贴心地提供了一个父类,来专门做这个事情。我们看一下如何使用这个类来实现服务端的解码。

1
2
3
4
5
6
public class PacketDecoder extends ByteToMessageDecoder {   
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) {
out.add(PacketCodeC.INSTANCE.decode(in));
}
}

当继承了ByteToMessageDecoder这个类之后,只需要实现decode()方法即可。大家可以看到,这里的in传递进来的时候就已经是ByteBuf类型,所以不再需要强转。第三个参数是List类型,我们通过向这个List里面添加解码后的结果对象,就可以自动实现结果向下一个Handler传递,这样就实现了解码的逻辑Handler。
另外,值得注意的是,对于Netty里的ByteBuf,我们使用4.1.6.Final版本,默认情况下用的是堆外内存。在ByteBuf一章中提到,堆外内存需要我们自行释放,在解码的例子中,其实我们已经漏掉了这个操作,这一点是非常致命的。随着程序运行得越来越久,内存泄漏的问题就慢慢暴露出来了,而这里我们使用ByteToMessageDecoder,Netty会自动进行内存的释放,我们不用操心太多的内存管理方面的逻辑。
通过解码器把二进制数据转换到Java对象即指令数据包之后,就可以针对每一种指令数据包编写逻辑了。

12.3 SimpleChannelInboundHandler

回顾一下前面处理Java对象的逻辑。

1
2
3
4
5
6
if (packet instanceof LoginRequestPacket) {
// ...
} else if (packet instanceof MessageRequestPacket) {
// ...
} else if
...

通过if else分支进行逻辑的处理,当要处理的指令越来越多的时候,代码会显得越来越臃肿,我们可以通过给Pipeline添加多个Handler(ChannelInboundHandlerAdapter的子类)来解决过多的if else问题。
XXXHandler.java

1
2
3
4
5
if (packet instanceof XXXPacket) {
// ...处理
} else {
ctx.fireChannelRead(packet);
}

这样的一个好处就是,每次添加一个指令处理器,其逻辑处理的框架都是一致的。
但是,大家应该也注意到了,我们编写指令处理Handler的时候,依然编写了一段其实可以不用关心的if else判断,然后手动传递无法处理的对象(XXXPacket)至下一个指令处理器,这也是一段重复度极高的代码。因此,基于这种考虑,Netty抽象出了一个SimpleChannelInboundHandler对象,自动实现了类型判断和对象传递,这样我们的应用代码就可以专注于业务逻辑。
下面来看如何使用SimpleChannelInboundHandler简化指令处理逻辑。

LoginRequestHandler.java

1
2
3
4
5
6
public class LoginRequestHandler extends SimpleChannelInboundHandler<LoginRequestPacket> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, LoginRequestPacket loginRequestPacket) {
// 登录逻辑
}
}

从字面意思可以看到,SimpleChannelInboundHandler的使用非常简单。我们在继承这个类的时候,给它传递一个泛型参数,然后在channelRead0()方法里,不用再通过if逻辑来判断当前对象是否是本Handler可以处理的对象,也不用强转,不用往下传递本Handler处理不了的对象,这一切都已经交给父类SimpleChannelInboundHandler来实现,我们只需要专注于我们要处理的业务逻辑即可。

上面的LoginRequestHandler是用来处理登录的逻辑,同理,我们可以很轻松地编写一个消息处理逻辑处理器。

MessageRequestHandler.java

1
2
3
4
5
6
public class MessageRequestHandler extends SimpleChannelInboundHandler<MessageRequestPacket> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageRequestPacket messageRequestPacket) {

}
}

12.4 MessageToByteEncoder

在前面的章节中,我们已经实现了登录和消息处理逻辑。处理完登录和消息这两类指令之后,我们都会给客户端返回一个响应。在写响应之前,需要把响应对象编码成ByteBuf,结合本节内容,最后的逻辑框架如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class LoginRequestHandler extends SimpleChannelInboundHandler<LoginRequestPacket> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, LoginRequestPacketloginRequestPacket) {
LoginResponsePacket loginResponsePacket = login(loginRequestPacket);
ByteBuf responseByteBuf = PacketCodeC.INSTANCE.encode(ctx.alloc(), loginResponsePacket);
ctx.channel().writeAndFlush(responseByteBuf);
}
}

public class MessageRequestHandler extends SimpleChannelInboundHandler<MessageRequestPacket> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageRequestPacket messageRequestPacket) {
MessageResponsePacket messageResponsePacket = receiveMessage(messageRequestPacket);
ByteBuf responseByteBuf = PacketCodeC.INSTANCE.encode(ctx.alloc(), messageRequestPacket);
ctx.channel().writeAndFlush(responseByteBuf);
}
}

读者应该注意到了,在上述代码中,处理每一种指令完成之后的逻辑都是类似的,都需要先进行编码,然后调用writeAndFlush()将数据写到客户端。这个编码的过程其实也是重复的逻辑,而且在编码过程中,我们还需要手动创建一个ByteBuf,过程如下。
PacketCodeC.java

1
2
3
4
5
6
7
public ByteBuf encode(ByteBufAllocator byteBufAllocator, Packet packet) {
// 1. 创建 ByteBuf 对象
ByteBuf byteBuf = byteBufAllocator.ioBuffer();
// 2. 序列化 Java 对象
// 3. 实际编码过程
return byteBuf;
}

Netty提供了一个特殊的ChannelHandler来专门处理编码逻辑,不需要每一次将响应写到对端的时候都调用一次编码逻辑进行编码,也不需要自行创建ByteBuf。这个类被叫作MessageToByteEncoder,从字面意思可以看出,它的功能就是将对象转换到二进制数据。

使用MessageToByteEncoder来实现编码逻辑的过程如下。

1
2
3
4
5
6
public class PacketEncoder extends MessageToByteEncoder<Packet> {
@Override
protected void encode(ChannelHandlerContext ctx, Packet packet, ByteBuf out) {
PacketCodeC.INSTANCE.encode(out, packet);
}
}

PacketEncoder继承自MessageToByteEncoder,泛型参数Packet表示这个类的作用是实现Packet类型对象到二进制数据的转换。

这里我们只需要实现encode()方法。在这个方法里,第二个参数是Java对象,而第三个参数是ByteBuf对象,我们要做的事情就是,把Java对象的字段写到ByteBuf对象,而不再需要自行去分配ByteBuf对象。因此,大家注意到,PacketCodeC的encode()方法的定义也改了,下面是更改前后的对比。
PacketCodeC.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 更改前的定义
public ByteBuf encode(ByteBufAllocator byteBufAllocator, Packet packet) {
// 1. 创建 ByteBuf 对象
ByteBuf byteBuf = byteBufAllocator.ioBuffer();
// 2. 序列化 Java 对象
// 3. 实际编码过程
return byteBuf;
}

// 更改后的定义
public void encode(ByteBuf byteBuf, Packet packet) {
// 1. 序列化 Java 对象
// 2. 实际编码过程
}

PacketCodeC不再需要手动创建ByteBuf对象,不再需要把创建完的ByteBuf对象进行返回。当我们向Pipeline中添加这个编码器后,在指令处理完毕之后就只需要调用writeAndFlush()把Java对象写出去即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
public class LoginRequestHandler extends SimpleChannelInboundHandler<LoginRequestPacket> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, LoginRequestPacket loginRequestPacket) {
ctx.channel().writeAndFlush(login(loginRequestPacket));
}
}

public class MessageRequestHandler extends SimpleChannelInboundHandler<MessageResponsePacket> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageResponsePacket messageRequestPacket) {
ctx.channel().writeAndFlush(receiveMessage(messageRequestPacket));
}
}

通过前面的分析,可以看到,Netty为了让我们的逻辑更为清晰简洁,做了很多工作,能直接用Netty自带的Handler来解决的问题,不再需要重复造轮子。在接下来的章节中,我们会继续探讨Netty还有哪些开箱即用的Handler。

12.5 构建客户端与服务端的Pipeline

分析完服务端的Pipeline的Handler组成结构,相信读者也不难自行分析出客户端的Pipeline的Handler结构。最后我们来看一下客户端和服务端完整的Pipeline的Handler结构,如下图所示。

img

对应代码如下。
客户端

1
2
3
4
5
6
7
8
9
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.Pipeline().addLast(new PacketDecoder());
ch.Pipeline().addLast(new LoginResponseHandler());
ch.Pipeline().addLast(new MessageResponseHandler());
ch.Pipeline().addLast(new PacketEncoder());
}
});

服务端

1
2
3
4
5
6
7
8
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.Pipeline().addLast(new PacketDecoder());
ch.Pipeline().addLast(new LoginRequestHandler());
ch.Pipeline().addLast(new MessageRequestHandler());
ch.Pipeline().addLast(new PacketEncoder());
}
});

12.6 总结

本章我们学习了用Netty内置的ChannelHandler来逐步构建服务端的Pipeline,通过内置的ChannelHandler可以减少很多重复的逻辑。

  • 基于ByteToMessageDecoder,可以实现自定义解码,而不用关心ByteBuf的强转和解码结果的传递。
  • 基于SimpleChannelInboundHandler,可以实现每一种指令的处理,不再需要强转,不再有冗长乏味的if else逻辑,不再需要手动传递对象。
  • 基于MessageToByteEncoder,可以实现自定义编码,不用关心ByteBuf的创建,不用每次向对端写Java对象都进行一次编码。

12.7 思考

在LoginRequestHandler和MessageRequestHandler的channelRead0()方法中,第二个参数对象(XXXRequestPacket)是从哪里传递过来的?

第 13 章 拆包/粘包理论与解决方案

本章我们来学习一下Netty里拆包和粘包的概念,并且学习如何选择适合我们的应用程序的拆包器。

13.1 拆包/粘包例子

我们先来看一个例子,选择客户端与服务端双向通信这一章节的代码,然后做适当修改。

客户端FirstClientHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class FirstClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
for (int i = 0; i < 1000; i++) {
ByteBuf buffer = getByteBuf(ctx);
ctx.channel().writeAndFlush(buffer);
}
}

private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
byte[] bytes = "你好,我是客户端,这是客户端发的消息".getBytes(StandardCharsets.UTF_8);
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes(bytes);
return buffer;
}
}

客户端在连接建立成功之后,使用一个for循环,不断地向服务端写数据。
服务端FirstServerHandler

1
2
3
4
5
6
7
public class FirstServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println(new Date() + ": 服务端读到数据 -> " + byteBuf.toString(StandardCharsets.UTF_8));
}
}

服务端在收到数据之后,仅仅把数据打印出来。读者可以花几分钟时间思考一下,服务端的输出会是什么样子的?
可能很多读者觉得服务端会输出1000次“你好,欢迎关注我的微信公众号,《闪电侠的博客》!”,然而实际上服务端却是这样输出的,如下图所示。

image-20220810131019279

从服务端的控制台输出可以看出,存在3种类型的输出。

  • 一种是正常的字符串输出。
  • 一种是多个字符串“粘”在了一起,我们定义这种ByteBuf为粘包。
  • 一种是一个字符串被“拆”开,形成一个破碎的包,我们定义这种ByteBuf为半包。

13.2 为什么会有粘包、半包现象

尽管我们在应用层面使用了Netty,但是操作系统只认TCP协议;尽管我们的应用层按照ByteBuf为单位来发送数据,但是到了底层操作系统,仍然是按照字节流发送数据的,因此,数据到了服务端,也按照字节流的方式读入,然后到了Netty应用层面,重新拼装成ByteBuf。

这里的ByteBuf与客户端按照顺序发送的ByteBuf可能是不对等的。因此,我们需要在客户端根据自定义协议来组装应用层的数据包,然后在服务端根据应用层的协议来组装数据包,这个过程通常在服务端被称为拆包,而在客户端被称为粘包。

拆包和粘包是相对的,一端粘了包,另外一端就需要将粘过的包拆开。举个例子,发送端将三个数据包粘成两个TCP数据包发送到接收端,接收端就需要根据应用协议将两个数据包重新拆分成三个数据包。

13.3 拆包的原理

在没有Netty的情况下,用户如果自己需要拆包,基本原理就是不断地从TCP缓冲区中读取数据,每次读取完都需要判断是否是一个完整的数据包。

  • 如果当前读取的数据不足以拼接成一个完整的业务数据包,那就保留该数据,继续从TCP缓冲区中读取,直到得到一个完整的数据包。
  • 如果当前读到的数据加上已经读取的数据足够拼接成一个数据包,那就将已经读取的数据拼接上本次读取的数据,构成一个完整的业务数据包传递到业务逻辑,多余的数据仍然保留,以便和下次读到的数据尝试拼接。

如果我们自己实现拆包,那么这个过程将会非常麻烦。每一种自定义协议都需要自己实现,还需要考虑各种异常,而Netty自带的一些开箱即用的拆包器已经完全满足我们的需求了。下面介绍Netty有哪些自带的拆包器。

13.4 Netty自带的拆包器

13.4.1 固定长度的拆包器FixedLengthFrameDecoder

如果应用层协议非常简单,每个数据包的长度都是固定的,比如100,那么只需要把这个拆包器加到Pipeline中,Netty就会把一个个长度为100的数据包(ByteBuf)传递到下一个ChannelHandler。

13.4.2 行拆包器LineBasedFrameDecoder

从字面意思来看,发送端发送数据包的时候,每个数据包之间以换行符作为分隔,接收端通过LineBasedFrameDecoder将粘过的ByteBuf拆分成一个个完整的应用层数据包。

13.4.3 分隔符拆包器DelimiterBasedFrameDecoder

DelimiterBasedFrameDecoder是行拆包器的通用版本,只不过我们可以自定义分隔符。

13.4.4 基于长度域的拆包器LengthFieldBasedFrameDecoder

最后一种拆包器是最通用的一种拆包器,只要你的自定义协议中包含长度域字段,均可以使用这个拆包器来实现应用层拆包。由于上面3种拆包器比较简单,读者可以自行写出Demo。接下来,我们就结合自定义协议,来学习如何使用基于长度域的拆包器来拆解数据包。

13.5 如何使用LengthFieldBasedFrameDecoder

首先,我们来回顾一下自定义协议。

mark:this 1c10aee91d9bbdaaf6aa15053827d7f4.jpg)

详细的协议分析参考客户端与服务端通信协议编解码一节,这里不再赘述。
关于拆包,我们只需要关注以下3点。

  • 在我们的自定义协议中,长度域在整个数据包的哪个地方。用专业术语来说,就是长度域相对整个数据包的偏移量是多少,这里显然是4+1+1+1=7。
  • 另外需要关注的就是,长度域的长度是多少,这里显然是4。
  • 有了长度域偏移量和长度域的长度,我们就可以构造一个拆包器。
1
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 7, 4);

其中,第一个参数指的是数据包的最大长度,第二个参数指的是长度域的偏移量,第三个参数指的是长度域的长度。写好这样一个拆包器之后,只需要在Pipeline的最前面加上这个拆包器即可。
由于这类拆包器使用最为广泛,想深入学习的读者可以参考笔者在简书上的文章。
下面我们重新组织一下服务端和客户端的Pipeline。

服务端

1
2
3
4
5
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 7, 4));
ch.pipeline().addLast(new PacketDecoder());
ch.pipeline().addLast(new LoginRequestHandler());
ch.pipeline().addLast(new MessageRequestHandler());
ch.pipeline().addLast(new PacketEncoder());

客户端

1
2
3
4
5
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 7, 4));
ch.pipeline().addLast(new PacketDecoder());
ch.pipeline().addLast(new LoginResponseHandler());
ch.pipeline().addLast(new MessageResponseHandler());
ch.pipeline().addLast(new PacketEncoder());

这样,在后续PacketDecoder进行decode操作的时候,ByteBuf就是一个个完整的自定义协议数据包了。
LengthFieldBasedFrameDecoder有很多重载的构造参数,由于篇幅原因,这里不再展开介绍。

13.6 拒绝非本协议连接

不知道大家还记不记得,我们在设计协议的时候为什么在数据包的开头加上一个魔数。我们设计魔数的原因是尽早屏蔽非本协议的客户端,通常在第一个Handler处理这段逻辑。接下来的做法是每个客户端发过来的数据包都做一次快速判断,判断当前发来的数据包是否满足我们的自定义协议。
我们只需要继承自LengthFieldBasedFrameDecoder的decode()方法,然后在decode之前判断前4字节是否等于我们定义的魔数0x12345678即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Spliter extends LengthFieldBasedFrameDecoder {
private static final int LENGTH_FIELD_OFFSET = 7;
private static final int LENGTH_FIELD_LENGTH = 4;
public Spliter() {
super(Integer.MAX_VALUE, LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
// 屏蔽非本协议的客户端
if (in.getInt(in.readerIndex()) != PacketCodeC.MAGIC_NUMBER) {
ctx.channel().close();
return null;
}
return super.decode(ctx, in);
}
}

为什么可以在decode()方法中写这段逻辑?是因为在decode()方法中,每次第二个参数in传递进来的时候,均是一个数据包的开头。
我们只需要替换如下代码即可。

1
2
3
4
5
//
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 7, 4));
// 替换为
ch.pipeline().addLast(new Spliter());

我们再来实验一下,如下图所示。

mark:this fd8b1abb9ceb535a7e37de75bff975bf.jpg)

由上图可以看到,使用telnet连接上服务端之后(与服务端建立了连接),向服务端发送一段字符串,由于这段字符串不符合我们的自定义协议,于是在第一时间,服务端就关闭了这个连接。

13.7 客户端和服务端的Pipeline结构

至此,客户端和服务端的Pipeline结构如下图所示。

mark:this 客户端的Pipeline结构与服务端的Pipeline结构.jpg)

13.8 总结

  • 我们通过一个例子来理解为什么要有拆包器。其实拆包器的作用就是,根据我们的自定义协议,把数据拼装成一个个符合自定义数据包大小的ByteBuf,然后发送到自定义协议的解码器中去解码。
  • Netty自带的拆包器包括基于固定长度的拆包器、基于换行符和自定义分隔符的拆包器,还有最重要的一种是基于长度域的拆包器。通常Netty自带的拆包器已完全满足我们的需求,无须重复造轮子。
  • 基于Netty自带的拆包器,我们可以在拆包之前判断当前连上的客户端是否支持自定义协议。如果不支持,可尽早关闭,节省资源。

13.9 思考

在IM完整的Pipeline中,如果我们不添加拆包器,客户端连续向服务端发送数据,会有什么现象发生?为什么会发生这种现象?

第 14 章 ChannelHandler 的生命周期

在前面的章节中,对于ChannelHandler,我们的重点落在了读取数据相关的逻辑。本章,我们来学习ChannelHandler的其他方法,这些方法的执行是有顺序的,而这个执行顺序可以被称为ChannelHandler的生命周期。

14.1 ChannelHandler的生命周期详解

基于前面的代码,我们添加一个自定义ChannelHandler来测试一下各个回调方法的执行顺序。

对于服务端应用程序来说,我们这里讨论的ChannelHandler更多的是ChannelInboundHandler,我们基于ChannelInboundHandlerAdapter,自定义了一个Handler:LifeCyCleTestHandler。
LifeCyCleTestHandler.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class LifeCyCleTestHandler extends ChannelInboundHandlerAdapter {
@Override
public void handlerAdded(ChannelhandlerContext ctx) throws Exception {
System.out.println("逻辑处理器被添加:handlerAdded()");
super.handlerAdded(ctx);
}

@Override
public void channelRegistered(ChannelhandlerContext ctx) throws Exception {
System.out.println("channel 绑定到线程(NioEventLoop):channelRegistered()");
super.channelRegistered(ctx);
}

@Override
public void channelActive(ChannelhandlerContext ctx) throws Exception {
System.out.println("channel 准备就绪:channelActive()");
super.channelActive(ctx);
}

@Override
public void channelRead(ChannelhandlerContext ctx, Object msg) throws Exception {
System.out.println("channel 有数据可读:channelRead()");
super.channelRead(ctx, msg);
}

@Override
public void channelReadComplete(ChannelhandlerContext ctx) throws Exception {
System.out.println("channel 某次数据读完:channelReadComplete()");
super.channelReadComplete(ctx);
}

@Override
public void channelInactive(ChannelhandlerContext ctx) throws Exception {
System.out.println("channel 被关闭:channelInactive()");
super.channelInactive(ctx);
}

@Override
public void channelUnregistered(ChannelhandlerContext ctx) throws Exception {
System.out.println("channel取消线程(NioEventLoop) 的绑定: channelUnregistered()");
super.channelUnregistered(ctx);
}

@Override
public void handlerRemoved(ChannelhandlerContext ctx) throws Exception {
System.out.println("逻辑处理器被移除:handlerRemoved()");
super.handlerRemoved(ctx);
}
}

从上面的代码可以看到,我们在每个方法被调用的时候都会打印一段文字,然后把这个事件继续往下传播。最后把这个Handler添加到我们上章构建的Pipeline中。

1
2
3
4
5
6
7
8
9
10
11
// 前面代码省略
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
// 添加到第一个
ch.pipeline().addLast(new LifeCyCleTestHandler());
ch.pipeline().addLast(new PacketDecoder());
ch.pipeline().addLast(new LoginRequestHandler());
ch.pipeline().addLast(new MessageRequestHandler());
ch.pipeline().addLast(new PacketEncoder());
}
});

我们先运行NettyServer.java,然后运行NettyClient.java。这个时候,服务端控制台的输出如下图所示。

mark:this 63217d664e4c7cc7eb847b501dc3bb0a.jpg)

由上图可以看到,ChannelHandler回调方法的执行顺序为:

1
handlerAdded() -> channelRegistered() -> channelActive() -> channelRead() -> channelReadComplete()

下面我们来逐个解释每个回调方法的含义。

  • handlerAdded():指当检测到新连接之后,调用ch.pipeline().addLast(new LifeCyCleTestHandler());之后的回调,表示在当前Channel中,已经成功添加了一个Handler处理器。
  • channelRegistered():这个回调方法表示当前Channel的所有逻辑处理已经和某个NIO线程建立了绑定关系,接收新的连接,然后创建一个线程来处理这个连接的读写,只不过在Netty里使用了线程池的方式,只需要从线程池里去抓一个线程绑定在这个Channel上即可。这里的NIO线程通常指NioEventLoop。
  • channelActive():当Channel的所有业务逻辑链准备完毕(即Channel的Pipeline中已经添加完所有的Handler),以及绑定好一个NIO线程之后,这个连接才真正被激活,接下来就会回调到此方法。
  • channelRead():客户端向服务端发送数据,每次都会回调此方法,表示有数据可读。
  • channelReadComplete():服务端每读完一次完整的数据,都回调该方法,表示数据读取完毕。

我们再把客户端关闭,这个时候对于服务端来说,其实就是Channel被关闭,如下图所示。

mark:this 8245daa236a6e5ebd221a7c16830b388.jpg)

ChannelHandler回调方法的执行顺序为:

1
channelInactive() -> channelUnregistered() -> handlerRemoved()

到这里,相信大家应该已经能够看到,这里回调方法的执行顺序是新连接建立时候的逆操作。下面我们来解释一下每个方法的含义。

  • channelInactive():表面上这个连接已经被关闭了,这个连接在TCP层面已经不再是ESTABLISH状态了。
  • channelUnregistered():既然连接已经被关闭,那么与这个连接绑定的线程就不需要对这个连接负责了。这个回调表明与这个连接对应的NIO线程移除了对这个连接的处理。
  • handlerRemoved():我们给这个连接添加的所有业务逻辑处理器都被移除。

最后,我们用下图来标识Channelhandler的生命周期。

mark:this e30b38a571e4ff3b439fc9c1f8d9c118.jpg)

光了解这些生命周期的回调方法其实是比较枯燥乏味的,接下来我们就看一下这些回调方法的使用场景。

14.2 ChannelHandler生命周期各回调方法的用法举例

Netty对于一个连接在各个不同状态下回调方法的定义还是比较细致的,好处就在于我们能够基于这个机制写出扩展性较好的应用程序。

14.2.1 ChannelInitializer的实现原理

仔细翻看一下服务端的启动代码,我们在给新连接定义Handler的时候,其实只是通过childHandler()方法给新连接设置了一个Handler。这个Handler就是ChannelInitializer,而在ChannelInitializer的initChannel()方法里,我们通过获得Channel对应的Pipeline,调用addLast()方法添加Handler。

NettyServer.java

1
2
3
4
5
6
7
8
9
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new LifeCyCleTestHandler());
ch.pipeline().addLast(new PacketDecoder());
ch.pipeline().addLast(new LoginRequestHandler());
ch.pipeline().addLast(new MessageRequestHandler());
ch.pipeline().addLast(new PacketEncoder());
}
});

这里的ChannelInitializer其实就利用了Netty的Handler生命周期中channelRegistered()与handlerAdded()两个特性,我们简单看下ChannelInitializer类的源码。

ChannelInitializer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
protected abstract void initChannel(C ch) throws Exception;

public final void channelRegistered(ChannelhandlerContext ctx) throws Exception {
// ...
initChannel(ctx);
// ...
}

public void handlerAdded(ChannelhandlerContext ctx) throws Exception {
// ...
if (ctx.channel().isRegistered()) {
initChannel(ctx);
}
// ...
}

private boolean initChannel(ChannelhandlerContext ctx) throws Exception {
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) {
initChannel((C) ctx.channel());
// ...
return true;
}
return false;
}

这里,我们把非重点代码略去,逻辑会更加清晰一些。

  • ChannelInitializer定义了一个抽象的initChannel()方法,这个抽象方法由我们自行实现。我们在服务端启动流程里的实现逻辑就是往Pipeline里组织我们的Handler链。
  • handlerAdded()方法和channelRegistered()方法都会尝试调用initChannel()方法,initChannel()方法使用putIfAbsent()方法来防止initChannel()方法被调用多次。
  • 如果读者Debug了ChannelInitializer的上述两个方法,就会发现,在handlerAdded()方法被调用的时候,Channel其实已经和某个线程绑定,所以就我们的应用程序来说,这里的channelRegistered()方法其实是多余的,那么为什么还要尝试调用一次呢?应该是担心我们自己写了一个类继承自ChannelInitializer,然后覆盖掉了handlerAdded()方法。这样即使覆盖掉,在channelRegistered()方法里还有机会再调一次initChannel()方法,把自定义的Handler都添加到Pipeline中去。

14.2.2 handlerAdded()方法与handlerRemoved()方法

这两个方法通常可以用于一些资源的申请和释放。

14.2.3 channelActive()方法与channelInActive()方法

  • 对应用程序来说,这两个方法的含义是TCP连接的建立与释放。通常我们在这两个回调里统计单机的连接数,channelActive()方法被调用,连接数加一;channelInActive()方法被调用,连接数减一。
  • 我们也可以在channelActive()方法中,实现对客户端连接IP黑白名单的过滤,具体就不展开介绍了。

14.2.4 channelRead()方法

我们在前面讲到拆包/粘包原理,服务端根据自定义协议来进行拆包,其实就是在这个方法里,每次读到一定数据,都会累加到一个容器里,然后判断是否能够拆出来一个完整的数据包。如果够就拆了之后往下进行传递。详细原理这里不过多展开。
在前面章节中,每次向客户端写数据的时候,都通过writeAndFlush()方法写数据并刷新到底层,其实这种方式并不是特别高效。我们可以把调用writeAndFlush()方法的地方都调用write()方法,然后在这个方法里调用ctx.channel().flush()方法,相当于批量刷新的机制。当然,如果你对性能要求没那么高,使用writeAndFlush()方法足矣。

14.3 总结

  • 本章详细剖析了ChannelHandler(主要是ChannelInboundHandler)的各个回调方法、连接的建立和关闭,执行回调方法有一个逆向过程。
  • 每一种回调方法都有其各自的用法,但是有的时候某些回调方法的使用边界有些模糊,恰当地使用回调方法来处理不同的逻辑,可以使你的应用程序更为简洁。

14.4 思考

  • 如何在服务端每隔一秒输出当前客户端的连接数?
  • 统计客户端的入口流量,以字节为单位。

第 15 章 使用 ChannelHandler 的热插拔实现客户端身份校验

在前面的章节中,细心的读者可能会注意到,客户端连上服务端之后,即使没有进行登录校验,服务端在收到消息之后仍然会进行消息的处理,这个逻辑其实是有问题的。本章我们学习一下如何使用Pipeline及Handler强大的热插拔机制来实现客户端身份校验。

15.1 身份检验

首先,我们在客户端登录成功之后,标记当前Channel的状态为已登录。
LoginRequestHandler.java

1
2
3
4
5
6
7
protected void channelRead0(ChannelHandlerContext ctx, LoginRequestPacketloginRequestPacket) {
if (valid(loginRequestPacket)) {
// ...
// 基于前面一节的代码,添加如下一行代码
LoginUtil.markAsLogin(ctx.channel());
} // ...
}

LoginUtil.java

1
2
3
public static void markAsLogin(Channel channel) {
channel.attr(Attributes.LOGIN).set(true);
}

在登录成功之后,我们通过给Channel打属性标记的方式,标记这个Channel已成功登录。接下来,我们是不是需要在处理后续的每一种指令前,都判断一下用户是否登录?
LoginUtil.java

1
2
3
4
public static boolean hasLogin(Channel channel) {
Attribute<Boolean> loginAttr = channel.attr(Attributes.LOGIN);
return loginAttr.get() != null;
}

判断一个用户是否登录很简单,只需要调用LoginUtil.hasLogin(channel)即可。但是,Netty的Pipeline机制帮我们省去了重复添加同一段逻辑的烦恼,我们只需要在后续所有的指令处理Handler之前插入一个用户认证Handler即可。
NettyServer.java

1
2
3
4
5
6
7
8
9
10
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new PacketDecoder());
ch.pipeline().addLast(new LoginRequestHandler());
// 新增加用户认证handler
ch.pipeline().addLast(new AuthHandler());
ch.pipeline().addLast(new MessageRequestHandler());
ch.pipeline().addLast(new PacketEncoder());
}
});

从上面的代码可以看出,我们在MessageRequestHandler之前插入了一个AuthHandler,因此MessageRequestHandler以及后续所有与指令相关的Handler(后面小节会逐个添加)的处理都会经过AuthHandler的一层过滤,只要在AuthHandler里处理完与身份认证相关的逻辑,后续所有的Handler都不用再操心身份认证这个逻辑,我们来看AuthHandler的具体实现。
AuthHandler.java

1
2
3
4
5
6
7
8
9
10
public class AuthHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (!LoginUtil.hasLogin(ctx.channel())) {
ctx.channel().close();
} else {
super.channelRead(ctx, msg);
}
}
}
  • AuthHandler继承自ChannelInboundHandlerAdapter,覆盖了channelRead()方法,表明它可以处理所有类型的数据。
  • 在channelRead()方法里,在决定是否把读到的数据传递到后续指令处理器之前,首先会判断是否登录成功。如果未登录,则直接强制关闭连接,否则,就把读到的数据向下传递,传递给后续指令处理器。

AuthHandler的处理逻辑其实就这么简单。但是,有的读者可能要问,如果客户端已经登录成功,那么在每次处理客户端数据之前,都要经历这么一段逻辑。比如,平均每次用户登录之后发送100次消息,其实剩余的99次身份校验逻辑都是没有必要的,因为只要连接未断开,只要客户端成功登录过,后续就不需要再进行客户端的身份校验。

这里我们为了演示,身份认证逻辑比较简单,在实际生产环境中,身份认证逻辑可能会更复杂。我们需要寻找一种途径来避免资源与性能的浪费,使用ChannelHandler的热插拔机制完全可以做到这一点。

15.2 移除校验逻辑

对于Netty的设计来说,Handler其实可以看作一段功能相对聚合的逻辑,然后通过Pipeline把一个个小的逻辑聚合起来,串成一个功能完整的逻辑链。既然可以把逻辑串起来,就可以做到动态删除一个或多个逻辑。
在客户端校验通过之后,我们不再需要AuthHandler这段逻辑,而删除这段逻辑只需要一行代码即可实现。
AuthHandler.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class AuthHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (!LoginUtil.hasLogin(ctx.channel())) {
ctx.channel().close();
} else {
// 一行代码实现逻辑的删除
ctx.pipeline().remove(this);
super.channelRead(ctx, msg);
}
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
if (LoginUtil.hasLogin(ctx.channel())) {
System.out.println("当前连接登录验证完毕,无须再次验证, AuthHandler 被移除");
} else {
System.out.println("无登录验证,强制关闭连接!");
}
}
}

在上面的代码中,判断如果已经经过权限认证,那么就直接调用Pipeline的remove()方法删除自身,这里的this指的其实就是AuthHandler这个对象。删除之后,这条客户端连接的逻辑链中就不再有这段逻辑了。
另外,我们覆盖了handlerRemoved()方法,主要用于后续演示部分的内容。接下来,我们进行实际演示。

15.3 身份校验演示

在演示之前,对于客户端侧的代码,在客户端向服务端发送消息的逻辑中,我们先把每次都判断是否登录的逻辑去掉,这样就可以在客户端未登录的情况下向服务端发送消息。
NettyClient.java

1
2
3
4
5
6
7
8
9
10
11
12
13
private static void startConsoleThread(Channel channel) {
new Thread(() -> {
while (!Thread.interrupted()) {
// 这里注释掉
//if(LoginUtil.hasLogin(channel)) {
System.out.println("输入消息发送至服务端: ");
Scanner sc = new Scanner(System.in);
String line = sc.nextLine();
channel.writeAndFlush(new MessageRequestPacket(line));
// }
}
}).start();
}

15.3.1 有身份认证的演示

我们先启动服务端,再启动客户端。在客户端的控制台,我们输入消息发送至服务端,此时客户端与服务端控制台的输出分别如下面两图所示。
客户端

mark:this f13ae54453b04370d9643c6e87f4859b.jpg)

服务端

mark:this 74090953fde9da43be2626df7ca47427.jpg)

观察服务端侧的控制台,我们可以看到,在客户端第一次发来消息的时候,AuthHandler判断当前用户已通过身份认证,直接移除自身。移除之后,回调handlerRemoved()方法,这块内容也是上章ChannelHandler生命周期的一部分。

15.3.2 无身份认证的演示

接下来,我们演示一下客户端在未登录的情况下如何发送消息到服务端。我们在LoginResponse- Handler中删除发送登录指令的逻辑。
LoginResponseHandler.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class LoginResponseHandler extends SimpleChannelInboundHandler<LoginResponsePacket> {
@Override
public void channelActive(ChannelHandlerContext ctx) {
// 创建登录对象
LoginRequestPacket loginRequestPacket = new LoginRequestPacket();
loginRequestPacket.setUserId(UUID.randomUUID().toString());
loginRequestPacket.setUsername("flash");
loginRequestPacket.setPassword("pwd");
// 删除登录的逻辑
// ctx.channel().writeAndFlush(loginRequestPacket);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) {
System.out.println("客户端连接被关闭!");
}
}

我们把客户端向服务端写登录指令的逻辑删除,然后覆盖channelInactive()方法,用于验证客户端连接是否会被关闭。
接下来,我们先运行服务端,再运行客户端,并且在客户端的控制台输入文本之后发送给服务端。此时客户端与服务端控制台的输出分别如下面两图所示。

客户端
mark:this 531f3bb1d4f4f12d79b12f33b7712a76.jpg)

服务端
mark:this bcffae654f8b8fd8f14f8980454892cd.jpg)

此看到,如果客户端第一个指令为非登录指令,则AuthHandler直接将客户端连接关闭,并且从有关ChannelHandler生命周期的内容中也可以看到,服务端侧的handlerRemoved()方法和客户端侧代码的channelInActive()会被回调到。

15.4 总结

  • 如果有很多业务逻辑的Handler都要进行某些相同的操作,则我们完全可以抽取出一个Handler来单独处理。
  • 如果某一个独立的逻辑在执行几次之后(这里是一次)不需要再执行,则可以通过ChannelHandler的热插拔机制来实现动态删除逻辑,使应用程序的性能处理更为高效。

15.5 思考

在最后一部分的演示中,对于客户端在登录情况下发送消息以及在未登录情况下发送消息,AuthHandler的其他回调方法分别是如何执行的,为什么?

第 16 章 客户端互聊的原理与实现

本章我们来实现客户端互聊的逻辑,我们先来看一下本章学完之后,单聊的效果是什么样的。

16.1 最终效果

下面我们来看看单聊的最终效果。
服务端
mark:this 152bdd983368c1876c8476f27fdf84ef.jpg)

服务端启动之后,两个客户端陆续登录。
客户端1

mark:this 96d776fd8f239bef6c75b5808964e753.jpg)

客户端2

mark:this da8e83d9d7e343743cdfce27db9d822a.jpg)

  1. 客户端启动之后,我们在控制台输入用户名,服务端随机分配一个userId给客户端,这里我们省去了通过账号、密码注册的过程,userId就在服务端随机生成了,生产环境中可能会持久化在数据库,然后每次通过账号、密码去“捞”。
  2. 当有两个客户端登录成功之后,在控制台输入userId+空格+消息,这里的userId是消息接收方的标识,消息接收方的控制台接着就会显示另外一个客户端发来的消息。

一对一单聊的核心逻辑其实就这么简单,稍加改动就可以用在生产环境中。下面我们就来一起学习如何实现一对一单聊。

16.2 一对一单聊的原理

一对一单聊的原理在前面的章节已经介绍过,我们再来重温一下,如下图所示。

mark:this ae344fbb2090ff9b5715f7c59b6cd671.jpg)

  1. A要和B聊天,首先A和B需要与服务器建立连接,然后进行一次登录流程,服务端保存用户标识和TCP连接的映射关系。
  2. A发消息给B,首先需要将带有B标识的消息数据包发送到服务器,然后服务器从消息数据包中获得B的标识,找到对应B的连接,将消息发送给B。

掌握原理之后,我们就来逐个实现其中的逻辑。

16.3 一对一单聊的实现

16.3.1 用户登录状态与Channel的绑定

我们来看服务端在单聊中是如何处理登录消息的。
LoginRequestHandler.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 这里略去了非关键部分的代码
protected void channelRead0(ChannelHandlerContext ctx, LoginRequestPacketloginRequestPacket) {
LoginResponsePacket loginResponsePacket = xxx;
String userId = randomUserId();
loginResponsePacket.setUserId(userId);
SessionUtil.bindSession(new Session(userId, loginRequestPacket.getUserName()), ctx.channel());
// 登录响应
ctx.channel().writeAndFlush(loginResponsePacket);
}

// 用户断线之后取消绑定
public void channelInactive(ChannelHandlerContext ctx) {
SessionUtil.unBindSession(ctx.channel());
}

登录成功之后,服务端首先创建一个Session对象,表示用户当前的会话信息。在这个应用程序里面,Session只有下面两个字段。
Session.java

1
2
3
4
5
public class Session {
// 用户唯一性标识
private String userId;
private String userName;
}

在实际生产环境中,Session中的字段可能较多,比如头像URL、年龄、性别等。
然后我们调用SessionUtil.bindSession()保存用户的会话信息,具体实现如下。
SessionUtil.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class SessionUtil {
// userId -> channel 的映射
private static final Map<String, Channel> userIdChannelMap = new ConcurrentHashMap<>();

public static void bindSession(Session session, Channel channel) {
userIdChannelMap.put(session.getUserId(), channel);
channel.attr(Attributes.SESSION).set(session);
}

public static void unBindSession(Channel channel) {
if (hasLogin(channel)) {
userIdChannelMap.remove(getSession(channel).getUserId());
channel.attr(Attributes.SESSION).set(null
);
}
}

public static boolean hasLogin(Channel channel) {
return channel.hasAttr(Attributes.SESSION);
}

public static Session getSession(Channel channel) {
return channel.attr(Attributes.SESSION).get();
}

public static Channel getChannel(String userId) {
return userIdChannelMap.get(userId);
}
}
  1. SessionUtil里维持了一个userId->Channel的映射Map,调用bindSession()方法的时候,在Map里保存这个映射关系。SessionUtil还提供了getChannel()方法,这样就可以通过userId获得对应的Channel。
  2. 除了在Map里维持映射关系,在bindSession()方法中,我们还给Channel附上了一个属性,这个属性就是当前用户的Session。我们也提供了getSession()方法,非常方便地获得对应Channel的会话信息。
  3. 这里的SessionUtil其实就是第15.1节的LoginUtil,这里进行了重构,其中hasLogin()方法,只需要判断当前是否有用户的会话信息即可。
  4. 在LoginRequestHandler中,我们还重写了channelInactive()方法。用户下线之后,我们需要在内存里自动删除userId到Channel的映射关系,这是通过调用SessionUtil.unBindSession()来实现的。

关于保存用户会话信息的逻辑其实就这么多,总结一下就是:登录的时候保存会话信息,登出的时候删除会话信息。下面我们就来实现服务端接收消息并转发的逻辑。

16.3.2 服务端接收消息并转发的实现

我们重新定义一下客户端发送给服务端的消息的数据包格式。
MessageRequestPacket.java

1
2
3
4
public class MessageRequestPacket extends Packet {
private String toUserId;
private String message;
}

数据包格式很简单,toUserId表示要发送给哪个用户,message表示具体内容。我们来看一下服务端的消息处理Handler是如何处理消息的。
MessageRequestHandler.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class MessageRequestHandler extends SimpleChannelInboundHandler<MessageRequestPacket> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageRequestPacketmessageRequestPacket) {
// 1.获得消息发送方的会话信息
Session session = SessionUtil.getSession(ctx.channel());
// 2.通过消息发送方的会话信息构造要发送的消息
MessageResponsePacket messageResponsePacket = new MessageResponsePacket();
messageResponsePacket.setFromUserId(session.getUserId());
messageResponsePacket.setFromUserName(session.getUserName());
messageResponsePacket.setMessage(messageRequestPacket.getMessage());
// 3.获得消息接收方的Channel
Channel toUserChannel = SessionUtil.getChannel(messageRequestPacket.getToUserId());
// 4.将消息发送给消息接收方
if (toUserChannel != null && SessionUtil.hasLogin(toUserChannel)) {
toUserChannel.writeAndFlush(messageResponsePacket);
} else {
System.err.println("[" + messageRequestPacket.getToUserId() + "] 不在线,发送失败 !");
}
}
}
  1. 服务端在收到客户端发来的消息之后,首先获得当前用户也就是消息发送方的会话信息。
  2. 获得消息发送方的会话信息之后,构造一个发送给客户端的消息对象MessageResponsePacket,填上消息发送方的用户标识、昵称、消息内容。
  3. 通过消息接收方的标识获得对应的Channel。
  4. 如果消息接收方当前是登录状态,则直接发送;如果不在线,则控制台打印一条警告消息。

这里,服务端的功能相当于消息转发:收到一个客户端的消息之后,构建一条发送给另一个客户端的消息,接着获得另一个客户端的Channel,然后通过writeAndFlush()写出来。我们再来看一下客户端收到消息之后的处理逻辑。

16.3.3 客户端接收消息的逻辑处理

MessageResponseHandler.java

1
2
3
4
5
6
7
8
public class MessageResponseHandler extends SimpleChannelInboundHandler<MessageResponsePacket> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageResponsePacket messageResponsePacket) {
String fromUserId = messageResponsePacket.getFromUserId();
String fromUserName = messageResponsePacket.getFromUserName();
System.out.println(fromUserId + ":" + fromUserName + " -> " + messageResponsePacket.getMessage());
}
}

客户端收到消息之后,只是把当前消息打印出来。这里把发送方的用户标识打印出来是为了方便我们在控制台回消息的时候,可以直接复制。到这里,所有的核心逻辑其实已经完成了,我们还差最后一环:在客户端控制台进行登录和发送消息的逻辑。

16.3.4 客户端控制台登录和发送消息

我们回到客户端的启动类,改造一下控制台的逻辑。
NettyClient.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private static void startConsoleThread(Channel channel) {
Scanner sc = new Scanner(System.in);
LoginRequestPacket loginRequestPacket = new LoginRequestPacket();
new Thread(() -> {
while (!Thread.interrupted()) {
if (!SessionUtil.hasLogin(channel)) {
System.out.print("输入用户名登录: ");
String username = sc.nextLine();
loginRequestPacket.setUserName(username);
// 使用默认的密码
loginRequestPacket.setPassword("pwd");
// 发送登录数据包
channel.writeAndFlush(loginRequestPacket);
waitForLoginResponse();
} else {
String toUserId = sc.next();
String message = sc.next();
channel.writeAndFlush(new MessageRequestPacket(toUserId, message));
}
}
}).start();
}

private static void waitForLoginResponse() {
try {
Thread.sleep(1000);
} catch
(InterruptedException ignored) {
}
}

在客户端启动的时候,起一个线程:

  1. 如果当前用户还未登录,我们在控制台输入一个用户名,然后构造一个登录数据包发送给服务器。发完之后,等待一个超时时间,可以当作登录逻辑的最大处理时间。
  2. 如果当前用户已经是登录状态,我们可以在控制台输入消息接收方的userId,然后输入一个空格,再输入消息的具体内容,这样我们就可以构建一个消息数据包,发送到服务端。

关于单聊的原理和实现到这里就讲解完成了,最后我们对本章内容做一下总结。

16.4 总结

  1. 我们定义一个会话类Session来维持用户的登录信息,用户登录的时候绑定Session与Channel,用户登出或者断线的时候解绑Session与Channel。
  2. 服务端处理消息的时候,通过消息接收方的标识,获得消息接收方的Channel,调用writeAndFlush()方法将消息发送给消息接收方。

16.5 思考

本章其实还少了用户登出请求和响应的指令处理,你能否说出,对于登出指令来说,服务端和客户端分别要做哪些事情?能否自行实现?

第 17 章 群聊的发起与通知

本章我们学习如何创建一个群聊,并通知群聊中的各位成员。我们依然是先来看下最终效果是什么样的。

17.1 最终效果

群聊的最终效果如下。
服务端
mark:this 3f1d8e03b96a4a7663619ea050783b6b.jpg)

创建群聊的客户端

mark:this c1bddcfc8730bf6b458bf9eb5911b84b.jpg)

其他客户端
mark:this 67e50efad7bb1c7d479f88cc7bce028a.jpg)

mark:this 0ba5fba1dc4cdedd78e8f7a4a7708c4f.jpg)

  1. 依然是三位用户依次登录服务器,分别是闪电侠、极速、萨维塔。
  2. 我们在闪电侠的控制台输入createGroup指令,提示创建群聊需要输入userId列表,然后我们输入以英文逗号分隔的userId。
  3. 群聊创建成功之后,分别在服务端和三个客户端弹出提示消息,包括群ID及群里各位用户的昵称。

17.2 群聊的原理

关于群聊的原理,我们在即时聊天系统简介中已经学习过,现在再来重温一下。
群聊指的是一个组内多位用户之间的聊天,一位用户发到群组的消息会被组内任何一个成员接收。下面来看群聊的基本流程,如下图所示。

mark:this 567ee4a4d6aa74329e66be3236cdafdb.jpg)

群聊的基本流程其实和单聊类似。

  1. A、B、C依然会经历登录流程,服务端保存用户标识对应的TCP连接。
  2. A发起群聊的时候,将A、B、C的标识发送至服务端,服务端拿到之后建立一个群ID,然后把这个ID与A、B、C的标识绑定。
  3. 群聊里的任意一方在群里聊天的时候,将群ID发送至服务端,服务端获得群ID之后,取出对应的用户标识,遍历用户标识对应的TCP连接,就可以将消息发送至每一个群聊成员。

这一章,我们把重点放在创建一个群聊上,由于控制台输入的指令越来越多,因此在正式开始之前,我们先对控制台程序稍作重构。

17.3 控制台程序重构

17.3.1 创建控制台命令执行器

首先,把在控制台要执行的操作抽象出来,抽象出一个接口。
ConsoleCommand.java

1
2
3
public interface ConsoleCommand {
void exec(Scanner scanner, Channel channel);
}

17.3.2 管理控制台命令执行器

接着,创建一个管理类来对这些操作进行管理。
ConsoleCommandManager.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ConsoleCommandManager implements ConsoleCommand {
private Map<String, ConsoleCommand> consoleCommandMap;

public ConsoleCommandManager() {
consoleCommandMap = new HashMap<>();
consoleCommandMap.put("sendToUser", new SendToUserConsoleCommand());
consoleCommandMap.put("logout", new LogoutConsoleCommand());
consoleCommandMap.put("createGroup", new CreateGroupConsoleCommand());
}

@Override
public void exec(Scanner scanner, Channel channel) {
// 获取第一个指令
String command = scanner.next();
ConsoleCommand consoleCommand = consoleCommandMap.get(command);
if (consoleCommand != null) {
consoleCommand.exec(scanner, channel);
} else {
System.err.println("无法识别[" + command + "]指令,请重新输入!");
}
}
}
  1. 在这个管理类中,把所有要管理的控制台指令都放到一个Map中。
  2. 执行具体操作的时候,先获取控制台第一个输入的指令,这里以字符串代替比较清晰(这里我们已经实现了第16章思考题中的登出操作),然后通过这个指令拿到对应的控制台命令执行器执行。

这里我们就以创建群聊为例:首先在控制台输入createGroup,然后按下回车键,就会进入CreateGroupConsoleCommand这个类进行处理。
CreateGroupConsoleCommand.java

1
2
3
4
5
6
7
8
9
10
11
12
public class CreateGroupConsoleCommand implements ConsoleCommand {
private static final String USER_ID_SPLITER = ",";

@Override
public void exec(Scanner scanner, Channel channel) {
CreateGroupRequestPacket createGroupRequestPacket = new CreateGroupRequestPacket();
System.out.print("【拉 人 群聊】输入userId列表,userId之间英文逗号隔开:");
String userIds = scanner.next();
createGroupRequestPacket.setUserIdList(Arrays.asList(userIds.split(USER_ID_SPLITER)));
channel.writeAndFlush(createGroupRequestPacket);
}
}

进入CreateGroupConsoleCommand的逻辑之后,我们创建了一个群聊创建请求的数据包,然后提示输入以英文逗号分隔的userId的列表。填充完这个数据包之后,调用writeAndFlush()方法就可以发送创建群聊的指令到服务端。
最后来看经过改造的与客户端控制台线程相关的代码。
NettyClient.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private static void startConsoleThread(Channel channel) {
ConsoleCommandManager consoleCommandManager = new ConsoleCommandManager();
LoginConsoleCommand loginConsoleCommand = new LoginConsoleCommand();
Scanner scanner = new Scanner(System.in);
new Thread(() -> {
while (!Thread.interrupted()) {
if (!SessionUtil.hasLogin(channel)) {
loginConsoleCommand.exec(scanner, channel);
} else {
consoleCommandManager.exec(scanner, channel);
}
}
}).start();
}

抽取出控制台指令执行器之后,客户端控制台的逻辑已经相比之前清晰很多了,可以非常方便地在控制台模拟各种在IM聊天窗口的操作。接下来,我们看一下如何创建群聊。

17.4 创建群聊的实现

17.4.1 客户端发送创建群聊请求

通过前面讲述控制台逻辑的重构,我们已经了解到,我们发送了一个CreateGroupRequestPacket数据包到服务端,这个数据包的格式如下。
CreateGroupRequestPacket.java

1
2
3
public class CreateGroupRequestPacket extends Packet {
private List<String> userIdList;
}

它只包含了一个列表,这个列表就是需要拉取群聊的用户列表。我们来看下服务端是如何处理的。

17.4.2 服务端处理创建群聊请求

我们依然创建一个Handler来处理新的指令。
NettyServer.java

1
2
3
4
5
6
7
8
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
// ...
// 添加一个 handler
ch.pipeline().addLast(new CreateGroupRequestHandler());
// ...
}
});

我们来看一下这个Handler具体做哪些事情。
CreateGroupRequestHandler.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class CreateGroupRequestHandler extends SimpleChannelInboundHandler<CreateGroupRequestPacket> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, CreateGroupRequestPacketcreateGroupRequestPacket) {
List<String> userIdList = createGroupRequestPacket.getUserIdList();
List<String> userNameList = new ArrayList<>();
// 1. 创建一个channel分组
ChannelGroup channelGroup = new DefaultChannelGroup(ctx.executor());
// 2. 筛选出待加入群聊的用户的channel和userName
for (String userId :userIdList){
Channel channel = SessionUtil.getChannel(userId);
if (channel != null) {
channelGroup.add(channel);
userNameList.add(SessionUtil.getSession(channel).getUserName());
}
}
// 3. 创建群聊创建结果的响应
CreateGroupResponsePacket createGroupResponsePacket = new CreateGroupResponsePacket();
createGroupResponsePacket.setSuccess(true);

createGroupResponsePacket.setGroupId(IDUtil.randomId());
createGroupResponsePacket.setUserNameList(userNameList);
// 4. 给每个客户端都发送拉群通知
channelGroup.writeAndFlush(createGroupResponsePacket);
System.out.print("群创建成功,id 为[" + createGroupResponsePacket.getGroupId() + "], ");
System.out.println("群里面有:" + createGroupResponsePacket.getUserNameList());
}
}

整个过程可以分为以下4个步骤。

  1. 创建一个ChannelGroup。这里简单介绍一下ChannelGroup:它可以把多个Channel的操作聚合在一起,可以往它里面添加、删除Channel,也可以进行Channel的批量读写、关闭等操作,详细的功能读者可以自行查阅这个接口的方法。这里的一个群组其实就是一个Channel的分组集合,使用ChannelGroup非常方便。
  2. 遍历待加入群聊的userId,如果存在该用户,就把对应的Channel添加到ChannelGroup中,用户昵称也被添加到昵称列表中。
  3. 创建一个创建群聊响应的对象,其中groupId是随机生成的,群聊创建结果共有三个字段,这里就不展开对这个类进行说明了。
  4. 调用ChannelGroup的聚合发送功能,将拉群的通知批量地发送到客户端,接着在服务端控制台打印创建群聊成功的信息。至此,服务端处理创建群聊请求的逻辑结束。

我们再来看客户端处理创建群聊响应。

17.4.3 客户端处理创建群聊响应

首先,客户端依然创建一个Handler来处理新的指令。
NettyClient.java

1
2
3
4
5
6
7
8
9
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
// ...
// 添加一个新的Handler来处理创建群聊成功响应的指令
ch.pipeline().addLast(new CreateGroupResponseHandler());
// ...
}
});

然后,在应用程序里,我们仅仅把创建群聊成功之后的具体信息打印出来。
CreateGroupResponseHandler.java

1
2
3
4
5
6
7
public class CreateGroupResponseHandler extends SimpleChannelInboundHandler<CreateGroupResponsePacket> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, CreateGroupResponsePacket createGroupResponsePacket) {
System.out.print("群创建成功,id 为[" + createGroupResponsePacket.getGroupId() + "], ");
System.out.println("群里面有:" + createGroupResponsePacket.getUserNameList());
}
}

在实际生产环境中,CreateGroupResponsePacket对象里可能有更多信息,以上逻辑的处理也会更加复杂,不过这里已经能说明问题了。

17.5 总结

  1. 群聊的原理和单聊类似,都是通过标识拿到Channel。
  2. 重构了控制台的程序结构,在实际带有UI的IM应用中,我们输入的第一个指令其实就是对应我们点击UI的某些按钮或菜单的操作。
  3. 通过ChannelGroup,可以很方便地对一组Channel进行批量操作。

17.6 思考

如何实现在某个客户端拉取群聊成员的时候,不需要输入自己的用户ID,并且展示创建群聊消息的时候,不显示自己的昵称?

第 18 章 群聊的成员管理

在上一章中,我们已经学习了如何创建群聊并通知群聊的各位成员。本章中我们来实现群成员管理,包括群的加入、退出和获取群成员列表等功能。有了前面两章的基础,相信本章的内容对读者来说会比较简单。在开始之前,我们依然先来看一下最终效果。

18.1 最终效果

群成员管理的最终效果如下图所示。
服务端

mark:this 7bb40964b01a9e7c6fb48be03219191a.jpg)

从服务端可以看到,闪电侠、逆闪、极速先后登录服务器。随后,闪电侠创建一个群聊。接下来,萨维塔也登录了。这里,我们只展示闪电侠和萨维塔的客户端控制台界面。
客户端(闪电侠)

mark:this 12131e97dbb8d7c9627b427741a75c8a.jpg)

客户端(萨维塔)

mark:this 4642d6e18751fbf99b885fe2db05a900.jpg)

我们可以看到四位用户登录成功之后的最终效果。

  1. 闪电侠先拉逆闪和极速加入了群聊,控制台输出群创建成功的消息。
  2. 随后在萨维塔的控制台输入joinGroup之后再输入群ID,加入群聊,控制台显示加入群成功。
  3. 在闪电侠的控制台输入listGroupMembers之后再输入群ID,展示了当前群聊成员包括极速、萨维塔、闪电侠、逆闪。
  4. 在萨维塔的控制台输入quitGroup之后再输入群ID,退出群聊,控制台显示退群成功。
  5. 最后在闪电侠的控制台输入listGroupMembers之后再输入群ID,展示了当前群聊成员中已无萨维塔。

接下来,我们就来实现加入群聊、退出群聊、获取群成员列表三大功能。

18.2 群的加入

18.2.1 在控制台添加群加入命令处理器

JoinGroupConsoleCommand.java

1
2
3
4
5
6
7
8
9
10
public class JoinGroupConsoleCommand implements ConsoleCommand {
@Override
public void exec(Scanner scanner, Channel channel) {
JoinGroupRequestPacket joinGroupRequestPacket = new JoinGroupRequestPacket();
System.out.print(" 输入 groupId, 加入群聊:");
String groupId = scanner.next();
joinGroupRequestPacket.setGroupId(groupId);
channel.writeAndFlush(joinGroupRequestPacket);
}
}

按照前面两章的套路,我们在控制台先添加群加入命令处理器JoinGroupConsoleCommand。在这个处理器中,我们创建一个指令对象JoinGroupRequestPacket,填上群ID之后,将数据包发送至服务端。之后,我们将该控制台指令添加到ConsoleCommandManager。
ConsoleCommandManager.java

1
2
3
4
5
6
7
public class ConsoleCommandManager implements ConsoleCommand {
public ConsoleCommandManager() {
// ...
consoleCommandMap.put("joinGroup", new JoinGroupConsoleCommand());
// ...
}
}

接下来,就轮到服务端来处理加群请求了。

18.2.2 服务端处理加群请求

在服务端的Pipeline中添加对应的Handler—JoinGroupRequestHandler。
NettyServer.java

1
2
3
4
5
6
7
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
// 添加加群请求处理器
ch.pipeline().addLast(new JoinGroupRequestHandler());
// ...
}
});

JoinGroupRequestHandler的具体逻辑如下。
JoinGroupRequestHandler.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class JoinGroupRequestHandler extends SimpleChannelInboundHandler<JoinGroupRequestPacket> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, JoinGroupRequestPacketrequestPacket) {
// 1. 获取群对应的ChannelGroup,然后将当前用户的Channel添加进去
String groupId = requestPacket.getGroupId();
ChannelGroup channelGroup = SessionUtil.getChannelGroup(groupId);
channelGroup.add(ctx.channel());
// 2. 构造加群响应发送给客户端
JoinGroupResponsePacket responsePacket = new JoinGroupResponsePacket();
responsePacket.setSuccess(true);
responsePacket.setGroupId(groupId);
ctx.channel().writeAndFlush(responsePacket);
}
}
  1. 在通过groupId拿到对应的ChannelGroup之后,只需要调用ChannelGroup.add()方法,将加入群聊的用户的Channel添加进去,服务端即完成了加入群聊的逻辑。
  2. 构造一个加群响应,填入groupId之后,调用writeAndFlush()方法把加群响应发送给加入群聊的客户端。

18.2.3 客户端处理加群响应

我们在客户端的Pipeline中添加对应的Handler—JoinGroupResponseHandler来处理加群之后的响应。
NettyClient.java

1
2
3
4
5
6
7
8
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
// 添加加群响应处理器
ch.pipeline().addLast(new JoinGroupResponseHandler());
// ...
}
});

JoinGroupResponseHandler对应的逻辑如下。
JoinGroupResponseHandler.java

1
2
3
4
5
6
7
8
9
public class JoinGroupResponseHandler extends SimpleChannelInboundHandler<JoinGroupResponsePacket> {
protected void channelRead0(ChannelHandlerContext ctx, JoinGroupResponsePacketresponsePacket) {
if (responsePacket.isSuccess()) {
System.out.println("加入群[" + responsePacket.getGroupId() + "]成功!");
} else {
System.err.println("加入群[" + responsePacket.getGroupId() + "]失败,原因为:" + responsePacket.getReason());
}
}
}

该处理器的逻辑很简单,只是简单地将加群的结果输出到控制台,实际生产环境的IM可能比这要复杂,但是修改起来也非常容易。至此,与加群相关的逻辑就讲解完成了。

18.3 群的退出

关于群的退出逻辑与群的加入逻辑非常类似,这里展示一下关键代码。
服务端退群的核心逻辑为QuitGroupRequestHandler。
QuitGroupRequestHandler.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class QuitGroupRequestHandler extends SimpleChannelInboundHandler<QuitGroupRequestPacket> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, QuitGroupRequestPacketrequestPacket) {
// 1. 获取群对应的 ChannelGroup,然后将当前用户的Channel移除
String groupId = requestPacket.getGroupId();
ChannelGroup channelGroup = SessionUtil.getChannelGroup(groupId);
channelGroup.remove(ctx.channel());
// 2. 构造退群响应发送给客户端
QuitGroupResponsePacket responsePacket = new QuitGroupResponsePacket();
responsePacket.setGroupId(requestPacket.getGroupId());
responsePacket.setSuccess(true);
ctx.channel().writeAndFlush(responsePacket);
}
}

从上面的代码可以看到,QuitGroupRequestHandler和JoinGroupRequestHandler其实是一个逆向的过程。

  1. 通过groupId拿到对应的ChannelGroup之后,只需要调用ChannelGroup.remove()方法,将当前用户的Channel删除,服务端即完成了退群的逻辑。
  2. 构造一个退群响应,填入groupId之后,调用writeAndFlush()方法把退群响应发送给退群的客户端。

至此,加群和退群的逻辑就讲解完成了。最后,我们来看一下获取群成员列表的逻辑。

18.4 获取群成员列表

18.4.1 在控制台添加获取群成员列表命令处理器

ListGroupMembersConsoleCommand.java

1
2
3
4
5
6
7
8
9
10
public class ListGroupMembersConsoleCommand implements ConsoleCommand {
@Override
public void exec(Scanner scanner, Channel channel) {
ListGroupMembersRequestPacket listGroupMembersRequestPacket = new ListGroupMembersRequestPacket();
System.out.print("输入 groupId,获取群成员列表:");
String groupId = scanner.next();
listGroupMembersRequestPacket.setGroupId(groupId);
channel.writeAndFlush(listGroupMembersRequestPacket);
}
}

依旧按照前面的套路,我们在控制台先添加获取群成员列表命令处理器ListGroupMembers- ConsoleCommand。在这个处理器中,我们创建一个指令对象ListGroupMembersRequestPacket,填上群ID之后,将数据包发送至服务端。之后,将该控制台指令添加到ConsoleCommandManager。
ConsoleCommandManager.java

1
2
3
4
5
6
7
public class ConsoleCommandManager implements ConsoleCommand {
public ConsoleCommandManager() {
// ...
consoleCommandMap.put("listGroupMembers", new ListGroupMembersConsoleCommand());
// ...
}
}

接着,轮到服务端来处理获取群成员列表请求。

18.4.2 服务端处理获取群成员列表请求

在服务端的Pipeline中添加对应的Handler—ListGroupMembersRequestHandler。
NettyServer.java

1
2
3
4
5
6
7
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
// 添加获取群成员列表请求处理器
ch.pipeline().addLast(new ListGroupMembersRequestHandler());
// ...
}
});

ListGroupMembersRequestHandler的具体逻辑如下。
ListGroupMembersRequestHandler.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class ListGroupMembersRequestHandler extends SimpleChannelInboundHandler<ListGroupMembersRequestPacket> {
protected void channelRead0(ChannelHandlerContext ctx, JoinGroupRequestPacketrequestPacket) {
// 1. 获取群的ChannelGroup
String groupId = requestPacket.getGroupId();
ChannelGroup channelGroup = SessionUtil.getChannelGroup(groupId);
// 2. 遍历群成员的Channel对应的Session,构造群成员的信息
List<Session> sessionList = new
ArrayList<>();
for (Channel channel : channelGroup) {
Session session = SessionUtil.getSession(channel);
sessionList.add(session);
}
// 3. 构建获取群成员列表响应,写回客户端
ListGroupMembersResponsePacket responsePacket = new ListGroupMembersResponsePacket();
responsePacket.setGroupId(groupId);
responsePacket.setSessionList(sessionList);
ctx.channel().writeAndFlush(responsePacket);
}
}
  1. 通过groupId拿到对应的ChannelGroup。
  2. 创建一个sessionList用来装载群成员信息,遍历Channel的每个Session,把对应的用户信息都装到sessionList中。在实际生产环境中,这里可能会构造另外一个对象来装载用户信息而非Session。
  3. 构造一个获取群成员列表的响应指令数据包,填入groupId和群成员信息之后,调用writeAndFlush()方法把响应发送给发起获取群成员列表的客户端。

最后,就剩下客户端来处理获取群成员列表的响应了。

18.4.3 客户端处理获取群成员列表响应

和前面一样,我们在客户端的Pipeline中添加一个Handler—ListGroupMembersResponseHandler。
NettyClient.java

1
2
3
4
5
6
7
8
.handler(new ChannelInitializer<SocketChannel>() {
public void initChannel(SocketChannel ch) {
// ...
// 添加获取群成员响应处理器
ch.pipeline().addLast(new ListGroupMembersResponseHandler());
// ...
}
});

而ListGroupMembersResponseHandler的逻辑也只是在控制台展示一下群成员的信息。
ListGroupMembersResponseHandler.java

1
2
3
4
5
public class ListGroupMembersResponseHandler extends SimpleChannelInboundHandler<ListGroupMembersResponsePacket> {
protected void channelRead0(ChannelHandlerContext ctx, ListGroupMembersResponsePacket responsePacket) {
System.out.println("群[" + responsePacket.getGroupId() + "]中的人包括:" + responsePacket.getSessionList());
}
}

至此,群成员加入、退出,以及获取群成员列表对应的逻辑就全部实现了,其实从本章和前面两章大家可以看到,添加一个新功能是有一定套路的,我们在最后的总结中给出这个套路。

18.5 总结

添加一个服务端和客户端交互的新功能只需要遵循以下步骤。

  1. 创建控制台指令对应的ConsoleCommand,并将其添加到ConsoleCommandManager。
  2. 在控制台输入指令和数据之后,填入协议对应的指令数据包—xxxRequestPacket,将请求写到服务端。
  3. 服务端创建对应的xxxRequestPacketHandler,并将其添加到服务端的Pipeline中,在xxxRequestPacketHandler处理完后构造对应的xxxResponsePacket发送给客户端。
  4. 客户端创建对应的xxxResponsePacketHandler,并将其添加到客户端的Pipeline中,最后在xxxResponsePacketHandler中完成响应的处理。
  5. 最容易忽略的是,新添加xxxPacket时别忘了完善编解码器PacketCodec中的packetTypeMap。

18.6 思考

  1. 实现以下功能:客户端加入或者退出群聊,将加入群聊的消息也通知给群聊中的其他客户端,这个消息需要和发起群聊的客户端区分开,类似“xxx加入群聊yyy”的格式。
  2. 实现:当一个群的人数为0的时候,清理内存中与该群相关的信息。

第 19 章 群聊消息的收发及Netty性能优化

通过第16~18章的学习,相信读者看到本章的标题就已经知道该如何实现本章的功能了。本章在实现了群聊消息收发之后,还会介绍一些与性能优化相关的内容。
开始之前,我们先来看一下群聊消息的最终效果。

19.1 群聊消息的最终效果

群聊消息的最终效果如下图所示。
服务端

mark:this 08a19bcd524c05168130bdbebee7d9fc.jpg)

闪电侠、逆闪、极速先后登录,然后闪电侠拉逆闪、极速和自己加入群聊。下面我们来看一下各个客户端的控制台界面。
客户端(闪电侠)

mark:this a503e2c534f0688be12bf1a565100e46.jpg)

闪电侠第一个输入sendToGroup发送群聊消息。
客户端(逆闪)

mark:this 7088ce432b96726d6d43035ad3fcc2fe.jpg)

逆闪第二个输入sendToGroup发送群聊消息,他已经收到了闪电侠的消息。
客户端(极速)

mark:this 6487b97b5d74c2a11754beac20343fd8.jpg)

逆闪最后一个输入sendToGroup发送群聊消息,他已经收到了闪电侠和逆闪的消息。

  1. 在闪电侠的控制台,输入sendToGroup指令之后,再输入groupId+空格+消息内容,发送消息给群里的各位用户。随后,群里所有用户的控制台都显示了群聊消息。
  2. 陆续在逆闪和极速的控制台做相同的操作,群里所有用户的控制台陆续都显示了群聊消息。

这个实现过程和前面一样,下面我们仅关注核心实现部分。


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!