Netty网络编码的应用

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.43.Final</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.10.0-rc-1</version>
</dependency>

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
/**
* 服务器入口类
*/
public class ServerMain {
/**
* 日志对象
*/
static private final Logger LOGGER = LoggerFactory.getLogger(ServerMain.class);

/**
* 服务器端口号
*/
static private final int SERVER_PORT = 12345;

/**
* 应用主函数
*
* @param argvArray 命令行参数数组
*/
static public void main(String[] argvArray) {
// 设置 log4j 属性文件
PropertyConfigurator.configure(ServerMain.class.getClassLoader().getResourceAsStream("log4j.properties"));

// 初始化命令处理器工厂
CmdHandlerFactory.init();
// 初始化消息识别器
GameMsgRecognizer.init();

EventLoopGroup bossGroup = new NioEventLoopGroup(); // 拉客的, 也就是故事中的美女
EventLoopGroup workerGroup = new NioEventLoopGroup(); // 干活的, 也就是故事中的服务生

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);
b.channel(NioServerSocketChannel.class); // 服务器信道的处理方式
b.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new HttpServerCodec(), // Http 服务器编解码器
new HttpObjectAggregator(65535), // 内容长度限制
new WebSocketServerProtocolHandler("/websocket"), // WebSocket 协议处理器, 在这里处理握手、ping、pong 等消息
new GameMsgDecoder(), // 自定义的消息解码器
new GameMsgEncoder(), // 自定义的消息编码器
new GameMsgHandler() // 自定义的消息处理器
);
}
});

try {
// 绑定 12345 端口,
// 注意: 实际项目中会使用 argvArray 中的参数来指定端口号
ChannelFuture f = b.bind(SERVER_PORT).sync();

if (f.isSuccess()) {
LOGGER.info("服务器启动成功!");
}

// 等待服务器信道关闭,
// 也就是不要立即退出应用程序, 让应用程序可以一直提供服务
f.channel().closeFuture().sync();
} catch (Exception ex) {
// 如果遇到异常, 打印详细信息...
LOGGER.error(ex.getMessage(), ex);
} finally {
// 关闭服务器, 大家都歇了吧
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}

ProtoBuf协议

netty

1
2
3
4
5
6
7
8
Protobuf 协议文档:

syntax = “proto3”;
消息命名规则是 XxxCmd 和 XxxResult;
XxxCmd 代表客户端发往服务器的消息;
XxxResult 代表服务器返回给客户端的消息;
UserEntryCmd;
UserEntryResult;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package msg;
option java_package = "org.tinygame.herostory.msg";

// 消息代号
enum MsgCode {
USER_ENTRY_CMD = 0;
USER_ENTRY_RESULT = 1;
WHO_ELSE_IS_HERE_CMD = 2;
WHO_ELSE_IS_HERE_RESULT = 3;
USER_MOVE_TO_CMD = 4;
USER_MOVE_TO_RESULT = 5;
USER_QUIT_RESULT = 6;
USER_STOP_CMD = 7;
USER_STOP_RESULT = 8;
USER_ATTK_CMD = 9;
USER_ATTK_RESULT = 10;
USER_SUBTRACT_HP_RESULT = 11;
USER_DIE_RESULT = 12;
};

//
// 用户入场
///////////////////////////////////////////////////////////////////////
// 指令
message UserEntryCmd {
// 用户 Id
uint32 userId = 1;
// 英雄形象
string heroAvatar = 2;
}

// 结果
message UserEntryResult {
// 用户 Id
uint32 userId = 1;
// 英雄形象
string heroAvatar = 2;
}
1
2
3
4
5
6
7
8
9
Protobuf 命令行工具:

https://github.com/protocolbuffers/protobuf
到 releases 页面中找到下载链接;
解压缩后设置 path 环境变量;
执行命令 protoc;
protoc.exe --java_out=${目标目录} .\GameMsgProtocol.proto

就会把协议转换成java代码,提供一些协议转换器供调用

消息解码器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* 自定义的消息解码器
*/
public class GameMsgDecoder extends ChannelInboundHandlerAdapter {
/**
* 日志对象
*/
static private final Logger LOGGER = LoggerFactory.getLogger(GameMsgDecoder.class);

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (null == ctx ||
null == msg) {
return;
}

if (!(msg instanceof BinaryWebSocketFrame)) {
return;
}

try {
BinaryWebSocketFrame inputFrame = (BinaryWebSocketFrame) msg;
ByteBuf byteBuf = inputFrame.content();

byteBuf.readShort(); // 读取消息的长度
int msgCode = byteBuf.readShort(); // 读取消息编号

// 拿到消息体
byte[] msgBody = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(msgBody);

// 获取消息构建器
Message.Builder msgBuilder = GameMsgRecognizer.getBuilderByMsgCode(msgCode);
msgBuilder.clear();
msgBuilder.mergeFrom(msgBody);

// 构建消息实体
Message cmd = msgBuilder.build();

if (null != cmd) {
ctx.fireChannelRead(cmd);
}
} catch (Exception ex) {
// 记录错误日志
LOGGER.error(ex.getMessage(), ex);
}
}
}

消息编码器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/**
* 游戏消息编码器
*/
public class GameMsgEncoder extends ChannelOutboundHandlerAdapter {
/**
* 日志对象
*/
static private final Logger LOGGER = LoggerFactory.getLogger(GameMsgEncoder.class);

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (null == ctx ||
null == msg) {
return;
}

try {
if (!(msg instanceof GeneratedMessageV3)) {
super.write(ctx, msg, promise);
return;
}

// 消息编码
int msgCode = GameMsgRecognizer.getMsgCodeByClazz(msg.getClass());

if (-1 == msgCode) {
LOGGER.error(
"无法识别的消息类型, msgClazz = {}",
msg.getClass().getSimpleName()
);
super.write(ctx, msg, promise);
return;
}

// 消息体
byte[] msgBody = ((GeneratedMessageV3) msg).toByteArray();

ByteBuf byteBuf = ctx.alloc().buffer();
byteBuf.writeShort((short) msgBody.length); // 消息的长度
byteBuf.writeShort((short) msgCode); // 消息编号
byteBuf.writeBytes(msgBody); // 消息体

// 写出 ByteBuf,把数据封装成protoBuf的格式,再以二进制流的方式发送
BinaryWebSocketFrame outputFrame = new BinaryWebSocketFrame(byteBuf);
super.write(ctx, outputFrame, promise);
} catch (Exception ex) {
// 记录错误日志
LOGGER.error(ex.getMessage(), ex);
}
}
}

消息处理器

通过定义一个Netty提供的类,静态的ChannelGroup存储当前登录的所有用户,有其他用户登录时,把登录的用户信息同步给给已经在线的用户。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
/**
* 自定义的消息处理器
*/
public class GameMsgHandler extends SimpleChannelInboundHandler<Object> {
/**
* 日志对象
*/
static private final Logger LOGGER = LoggerFactory.getLogger(GameMsgHandler.class);

@Override
public void channelActive(ChannelHandlerContext ctx) {
if (null == ctx) {
return;
}

try {
super.channelActive(ctx);
Broadcaster.addChannel(ctx.channel());
} catch (Exception ex) {
// 记录错误日志
LOGGER.error(ex.getMessage(), ex);
}
}

//如果用户退出登录,发送群发消息,并且清除内部缓存
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
if (null == ctx) {
return;
}

try {
super.handlerRemoved(ctx);


// 将用户 Id 保存至 Session
//ctx.channel().attr(AttributeKey.valueOf("userId")).set(userId);

//从channel里获取用户的id 类似于session
Integer userId = (Integer) ctx.channel().attr(AttributeKey.valueOf("userId")).get();

if (null == userId) {
return;
}

//内存移除登录的用户ID
UserManager.removeByUserId(userId);
//清除用户登录缓存的channel
Broadcaster.removeChannel(ctx.channel());

//构建退出登录的消息
GameMsgProtocol.UserQuitResult.Builder resultBuilder = GameMsgProtocol.UserQuitResult.newBuilder();
resultBuilder.setQuitUserId(userId);

GameMsgProtocol.UserQuitResult newResult = resultBuilder.build();
//群发用户退出登录信息
Broadcaster.broadcast(newResult);
} catch (Exception ex) {
// 记录错误日志
LOGGER.error(ex.getMessage(), ex);
}
}

//当有信息进来时,输出当前信息
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
if (null == ctx ||
null == msg) {
return;
}

LOGGER.info(
"收到客户端消息, msgClazz = {}, msgBody = {}",
msg.getClass().getSimpleName(),
msg
);

try {
//使用工厂模式,创建不同的命令对象,调用不同的方法
//在并发下是多线程运行,业务上有可能会产生问题,下文会继续优化。
ICmdHandler<? extends GeneratedMessageV3> cmdHandler = CmdHandlerFactory.create(msg.getClass());

if (null != cmdHandler) {
cmdHandler.handle(ctx, cast(msg));
}
} catch (Exception ex) {
// 记录错误日志
LOGGER.error(ex.getMessage(), ex);
}
}

/**
* 转型为命令对象
*
* @param msg
* @param <TCmd>
* @return
*/
static private <TCmd extends GeneratedMessageV3> TCmd cast(Object msg) {
if (null == msg) {
return null;
} else {
return (TCmd) msg;
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
* 广播员
*/
public final class Broadcaster {
/**
* 信道组, 注意这里一定要用 static,
* 否则无法实现群发
*/
static private final ChannelGroup _channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

/**
* 私有化类默认构造器
*/
private Broadcaster() {
}

/**
* 添加信道
*
* @param ch
*/
static public void addChannel(Channel ch) {
if (null != ch) {
_channelGroup.add(ch);
}
}

/**
* 移除信道
*
* @param ch
*/
static public void removeChannel(Channel ch) {
if (null != ch) {
_channelGroup.remove(ch);
}
}

/**
* 广播消息
*
* @param msg
*/
static public void broadcast(Object msg) {
if (null != msg) {
_channelGroup.writeAndFlush(msg);
}
}
}

处理器工厂

使用工厂模式创建不同命令不同逻辑的判断。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 命令处理器接口
*
* @param <TCmd>
*/
public interface ICmdHandler<TCmd extends GeneratedMessageV3> {
/**
* 处理命令
*
* @param ctx
* @param cmd
*/
void handle(ChannelHandlerContext ctx, TCmd cmd);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//抽取出来的的用户登录命令需要处理的逻辑
public class UserEntryCmdHandler implements ICmdHandler<GameMsgProtocol.UserEntryCmd> {
@Override
public void handle(ChannelHandlerContext ctx, GameMsgProtocol.UserEntryCmd cmd) {
if (null == ctx ||
null == cmd) {
return;
}

// 获取用户 Id 和英雄形象
int userId = cmd.getUserId();
String heroAvatar = cmd.getHeroAvatar();

User newUser = new User();
newUser.userId = userId;
newUser.heroAvatar = heroAvatar;
UserManager.addUser(newUser);

// 将用户 Id 保存至 Session
ctx.channel().attr(AttributeKey.valueOf("userId")).set(userId);

GameMsgProtocol.UserEntryResult.Builder resultBuilder = GameMsgProtocol.UserEntryResult.newBuilder();
resultBuilder.setUserId(userId);
resultBuilder.setHeroAvatar(heroAvatar);

// 构建结果并广播
GameMsgProtocol.UserEntryResult newResult = resultBuilder.build();
Broadcaster.broadcast(newResult);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
/**
* 命令处理器工厂类
*/
public final class CmdHandlerFactory {
/**
* 日志对象
*/
static private final Logger LOGGER = LoggerFactory.getLogger(CmdHandlerFactory.class);

/**
* 命令处理器字典
*/
static private final Map<Class<?>, ICmdHandler<? extends GeneratedMessageV3>> _handlerMap = new HashMap<>();

/**
* 私有化类默认构造器
*/
private CmdHandlerFactory() {
}

/**
* 初始化
*/
static public void init() {
LOGGER.info("==== 完成命令与处理器的关联 ====");

// 获取包名称
final String packageName = CmdHandlerFactory.class.getPackage().getName();
// 获取 ICmdHandler 所有的实现类
Set<Class<?>> clazzSet = PackageUtil.listSubClazz(
packageName,
true,
ICmdHandler.class
);

for (Class<?> handlerClazz : clazzSet) {
if (null == handlerClazz ||
0 != (handlerClazz.getModifiers() & Modifier.ABSTRACT)) {
continue;
}

// 获取方法数组
Method[] methodArray = handlerClazz.getDeclaredMethods();
// 消息类型
Class<?> cmdClazz = null;

for (Method currMethod : methodArray) {
if (null == currMethod ||
!currMethod.getName().equals("handle")) {
continue;
}

// 获取函数参数类型数组
Class<?>[] paramTypeArray = currMethod.getParameterTypes();

if (paramTypeArray.length < 2 ||
paramTypeArray[1] == GeneratedMessageV3.class ||
!GeneratedMessageV3.class.isAssignableFrom(paramTypeArray[1])) {
continue;
}

cmdClazz = paramTypeArray[1];
break;
}

if (null == cmdClazz) {
continue;
}

try {
// 创建命令处理器实例
ICmdHandler<?> newHandler = (ICmdHandler<?>) handlerClazz.newInstance();

LOGGER.info(
"{} <==> {}",
cmdClazz.getName(),
handlerClazz.getName()
);

_handlerMap.put(cmdClazz, newHandler);
} catch (Exception ex) {
// 记录错误日志
LOGGER.error(ex.getMessage(), ex);
}
}
}

/**
* 创建命令处理器
*
* @param msgClazz
* @return
*/
static public ICmdHandler<? extends GeneratedMessageV3> create(Class<?> msgClazz) {
if (null == msgClazz) {
return null;
}
return _handlerMap.get(msgClazz);
}
}

加解密工厂

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
/**
* 消息识别器
*/
public final class GameMsgRecognizer {
/**
* 消息编号 -> 消息对象字典
*/
static private final Map<Integer, GeneratedMessageV3> _msgCodeAndMsgObjMap = new HashMap<>();

/**
* 消息类 -> 消息编号字典
*/
static private final Map<Class<?>, Integer> _clazzAndMsgCodeMap = new HashMap<>();

/**
* 私有化类默认构造器
*/
private GameMsgRecognizer() {
}

/**
* 初始化
*/
static public void init() {
_msgCodeAndMsgObjMap.put(GameMsgProtocol.MsgCode.USER_ENTRY_CMD_VALUE, GameMsgProtocol.UserEntryCmd.getDefaultInstance());
_msgCodeAndMsgObjMap.put(GameMsgProtocol.MsgCode.WHO_ELSE_IS_HERE_CMD_VALUE, GameMsgProtocol.WhoElseIsHereCmd.getDefaultInstance());
_msgCodeAndMsgObjMap.put(GameMsgProtocol.MsgCode.USER_MOVE_TO_CMD_VALUE, GameMsgProtocol.UserMoveToCmd.getDefaultInstance());

_clazzAndMsgCodeMap.put(GameMsgProtocol.UserEntryResult.class, GameMsgProtocol.MsgCode.USER_ENTRY_RESULT_VALUE);
_clazzAndMsgCodeMap.put(GameMsgProtocol.WhoElseIsHereResult.class, GameMsgProtocol.MsgCode.WHO_ELSE_IS_HERE_RESULT_VALUE);
_clazzAndMsgCodeMap.put(GameMsgProtocol.UserMoveToResult.class, GameMsgProtocol.MsgCode.USER_MOVE_TO_RESULT_VALUE);
_clazzAndMsgCodeMap.put(GameMsgProtocol.UserQuitResult.class, GameMsgProtocol.MsgCode.USER_QUIT_RESULT_VALUE);
}

/**
* 根据消息编号获取消息构建器
*
* @param msgCode
* @return
*/
static public Message.Builder getBuilderByMsgCode(int msgCode) {
if (msgCode < 0) {
return null;
}

GeneratedMessageV3 defaultMsg = _msgCodeAndMsgObjMap.get(msgCode);

if (null == defaultMsg) {
return null;
} else {
return defaultMsg.newBuilderForType();
}
}

/**
* 根据消息类获取消息编号
*
* @param msgClazz
* @return
*/
static public int getMsgCodeByClazz(Class<?> msgClazz) {
if (null == msgClazz) {
return -1;
}

Integer msgCode = _clazzAndMsgCodeMap.get(msgClazz);

if (null == msgCode) {
return -1;
} else {
return msgCode.intValue();
}
}
}

可以通过反射来改进上文的加解密工厂的init方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
/**
* 消息识别器
*/
public final class GameMsgRecognizer {
/**
* 日志对象
*/
static private final Logger LOGGER = LoggerFactory.getLogger(GameMsgRecognizer.class);

/**
* 消息编号 -> 消息对象字典
*/
static private final Map<Integer, GeneratedMessageV3> _msgCodeAndMsgObjMap = new HashMap<>();

/**
* 消息类 -> 消息编号字典
*/
static private final Map<Class<?>, Integer> _msgClazzAndMsgCodeMap = new HashMap<>();

/**
* 私有化类默认构造器
*/
private GameMsgRecognizer() {
}

/**
* 初始化
*/
static public void init() {
LOGGER.info("==== 完成消息类与消息编号的映射 ====");

// 获取GameMsgProtocol中的内部类
Class<?>[] innerClazzArray = GameMsgProtocol.class.getDeclaredClasses();

for (Class<?> innerClazz : innerClazzArray) {
// 如果不是GeneratedMessageV3消息类,则跳过
if (null == innerClazz || !GeneratedMessageV3.class.isAssignableFrom(innerClazz)) {
continue;
}

// 获取类名称并小写
String clazzName = innerClazz.getSimpleName();
clazzName = clazzName.toLowerCase(); //UserEntryResult -> userentryresult

for (GameMsgProtocol.MsgCode msgCode : GameMsgProtocol.MsgCode.values()) {
if (null == msgCode) {
continue;
}

// 获取消息编码
String strMsgCode = msgCode.name();
strMsgCode = strMsgCode.replaceAll("_", "");
strMsgCode = strMsgCode.toLowerCase(); //USER_ENTRY_RESULT_VALUE -> userentryresultvalue

//前缀完全包含
if (!strMsgCode.startsWith(clazzName)) {
continue;
}

try {
// 反射,相当于调用 UserEntryCmd.getDefaultInstance();
Object returnObj = innerClazz.getDeclaredMethod("getDefaultInstance").invoke(innerClazz);

LOGGER.info(
"{} <==> {}",
innerClazz.getName(),
msgCode.getNumber()
);

_msgCodeAndMsgObjMap.put(
msgCode.getNumber(),
(GeneratedMessageV3) returnObj
);

_msgClazzAndMsgCodeMap.put(
innerClazz,
msgCode.getNumber()
);
} catch (Exception ex) {
// 记录错误日志
LOGGER.error(ex.getMessage(), ex);
}
}
}
}

/**
* 根据消息编号获取消息构建器
*
* @param msgCode
* @return
*/
static public Message.Builder getBuilderByMsgCode(int msgCode) {
if (msgCode < 0) {
return null;
}

GeneratedMessageV3 defaultMsg = _msgCodeAndMsgObjMap.get(msgCode);

if (null == defaultMsg) {
return null;
} else {
return defaultMsg.newBuilderForType();
}
}

/**
* 根据消息类获取消息编号
*
* @param msgClazz
* @return
*/
static public int getMsgCodeByClazz(Class<?> msgClazz) {
if (null == msgClazz) {
return -1;
}

Integer msgCode = _msgClazzAndMsgCodeMap.get(msgClazz);

if (null == msgCode) {
return -1;
} else {
return msgCode.intValue();
}
}
}

扫描包的工具类

扫描某个路径下的所有类对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
import java.io.File;
import java.io.FileInputStream;
import java.net.URL;
import java.util.*;
import java.util.jar.JarEntry;
import java.util.jar.JarInputStream;

/**
* 名称空间实用工具
*/
public final class PackageUtil {
/**
* 类默认构造器
*/
private PackageUtil() {
}

/**
* 列表指定包中的所有子类
*
* @param packageName 包名称
* @param recursive 是否递归查找
* @param superClazz 父类的类型
* @return 子类集合
*/
static public Set<Class<?>> listSubClazz(
String packageName,
boolean recursive,
Class<?> superClazz) {
if (superClazz == null) {
return Collections.emptySet();
} else {
return listClazz(packageName, recursive, superClazz::isAssignableFrom);
}
}

/**
* 列表指定包中的所有类
*
* @param packageName 包名称
* @param recursive 是否递归查找?
* @param filter 过滤器
* @return 符合条件的类集合
*/
static public Set<Class<?>> listClazz(
String packageName, boolean recursive, IClazzFilter filter) {

if (packageName == null ||
packageName.isEmpty()) {
return null;
}

// 将点转换成斜杠
final String packagePath = packageName.replace('.', '/');
// 获取类加载器
ClassLoader cl = Thread.currentThread().getContextClassLoader();

// 结果集合
Set<Class<?>> resultSet = new HashSet<>();

try {
// 获取 URL 枚举
Enumeration<URL> urlEnum = cl.getResources(packagePath);

while (urlEnum.hasMoreElements()) {
// 获取当前 URL
URL currUrl = urlEnum.nextElement();
// 获取协议文本
final String protocol = currUrl.getProtocol();
// 定义临时集合
Set<Class<?>> tmpSet = null;

if ("FILE".equalsIgnoreCase(protocol)) {
// 从文件系统中加载类
tmpSet = listClazzFromDir(
new File(currUrl.getFile()), packageName, recursive, filter
);
} else if ("JAR".equalsIgnoreCase(protocol)) {
// 获取文件字符串
String fileStr = currUrl.getFile();

if (fileStr.startsWith("file:")) {
// 如果是以 "file:" 开头的,
// 则去除这个开头
fileStr = fileStr.substring(5);
}

if (fileStr.lastIndexOf('!') > 0) {
// 如果有 '!' 字符,
// 则截断 '!' 字符之后的所有字符
fileStr = fileStr.substring(0, fileStr.lastIndexOf('!'));
}

// 从 JAR 文件中加载类
tmpSet = listClazzFromJar(
new File(fileStr), packageName, recursive, filter
);
}

if (tmpSet != null) {
// 如果类集合不为空,
// 则添加到结果中
resultSet.addAll(tmpSet);
}
}
} catch (Exception ex) {
// 抛出异常!
throw new RuntimeException(ex);
}

return resultSet;
}

/**
* 从目录中获取类列表
*
* @param dirFile 目录
* @param packageName 包名称
* @param recursive 是否递归查询子包
* @param filter 类过滤器
* @return 符合条件的类集合
*/
static private Set<Class<?>> listClazzFromDir(
final File dirFile, final String packageName, final boolean recursive, IClazzFilter filter) {

if (!dirFile.exists() ||
!dirFile.isDirectory()) {
// 如果参数对象为空,
// 则直接退出!
return null;
}

// 获取子文件列表
File[] subFileArr = dirFile.listFiles();

if (subFileArr == null ||
subFileArr.length <= 0) {
return null;
}

// 文件队列, 将子文件列表添加到队列
Queue<File> fileQ = new LinkedList<>(Arrays.asList(subFileArr));

// 结果对象
Set<Class<?>> resultSet = new HashSet<>();

while (!fileQ.isEmpty()) {
// 从队列中获取文件
File currFile = fileQ.poll();

if (currFile.isDirectory() &&
recursive) {
// 如果当前文件是目录,
// 并且是执行递归操作时,
// 获取子文件列表
subFileArr = currFile.listFiles();

if (subFileArr != null &&
subFileArr.length > 0) {
// 添加文件到队列
fileQ.addAll(Arrays.asList(subFileArr));
}
continue;
}

if (!currFile.isFile() ||
!currFile.getName().endsWith(".class")) {
// 如果当前文件不是文件,
// 或者文件名不是以 .class 结尾,
// 则直接跳过
continue;
}

// 类名称
String clazzName;

// 设置类名称
clazzName = currFile.getAbsolutePath();
// 清除最后的 .class 结尾
clazzName = clazzName.substring(dirFile.getAbsolutePath().length(), clazzName.lastIndexOf('.'));
// 转换目录斜杠
clazzName = clazzName.replace('\\', '/');
// 清除开头的 /
clazzName = trimLeft(clazzName, "/");
// 将所有的 / 修改为 .
clazzName = join(clazzName.split("/"), ".");
// 包名 + 类名
clazzName = packageName + "." + clazzName;

try {
// 加载类定义
Class<?> clazzObj = Class.forName(clazzName);

if (null != filter &&
!filter.accept(clazzObj)) {
// 如果过滤器不为空,
// 且过滤器不接受当前类,
// 则直接跳过!
continue;
}

// 添加类定义到集合
resultSet.add(clazzObj);
} catch (Exception ex) {
// 抛出异常
throw new RuntimeException(ex);
}
}

return resultSet;
}

/**
* 从 .jar 文件中获取类列表
*
* @param jarFilePath .jar 文件路径
* @param packageName 包名称
* @param recursive 是否递归查询子包
* @param filter 类过滤器
* @return 符合条件的类集合
*/
static private Set<Class<?>> listClazzFromJar(
final File jarFilePath, final String packageName, final boolean recursive, IClazzFilter filter) {

if (jarFilePath == null ||
jarFilePath.isDirectory()) {
// 如果参数对象为空,
// 则直接退出!
return null;
}

// 结果对象
Set<Class<?>> resultSet = new HashSet<>();

try {
// 创建 .jar 文件读入流
JarInputStream jarIn = new JarInputStream(new FileInputStream(jarFilePath));
// 进入点
JarEntry entry;

while ((entry = jarIn.getNextJarEntry()) != null) {
if (entry.isDirectory()) {
continue;
}

// 获取进入点名称
String entryName = entry.getName();

if (!entryName.endsWith(".class")) {
// 如果不是以 .class 结尾,
// 则说明不是 JAVA 类文件, 直接跳过!
continue;
}


//
// 如果没有开启递归模式,
// 那么就需要判断当前 .class 文件是否在指定目录下?
//
// 获取目录名称
String tmpStr = entryName.substring(0, entryName.lastIndexOf('/'));
// 将目录中的 "/" 全部替换成 "."
tmpStr = join(tmpStr.split("/"), ".");

if (!recursive) {
if (!packageName.equals(tmpStr)) {
// 如果不是我们要找的包,
continue;
}
} else {
if (!tmpStr.startsWith(packageName)) {
// 如果不是子包,
continue;
}
}

String clazzName;

// 清除最后的 .class 结尾
clazzName = entryName.substring(0, entryName.lastIndexOf('.'));
// 将所有的 / 修改为 .
clazzName = join(clazzName.split("/"), ".");

// 加载类定义
Class<?> clazzObj = Class.forName(clazzName);

if (null != filter &&
!filter.accept(clazzObj)) {
// 如果过滤器不为空,
// 且过滤器不接受当前类,
// 则直接跳过!
continue;
}

// 添加类定义到集合
resultSet.add(clazzObj);
}

// 关闭 jar 输入流
jarIn.close();
} catch (Exception ex) {
// 抛出异常
throw new RuntimeException(ex);
}

return resultSet;
}

/**
* 类名称过滤器
*
* @author hjj2019
*/
@FunctionalInterface
static public interface IClazzFilter {
/**
* 是否接受当前类?
*
* @param clazz 被筛选的类
* @return 是否符合条件
*/
boolean accept(Class<?> clazz);
}

/**
* 使用连接符连接字符串数组
*
* @param strArr 字符串数组
* @param conn 连接符
* @return 连接后的字符串
*/
static private String join(String[] strArr, String conn) {
if (null == strArr ||
strArr.length <= 0) {
return "";
}

StringBuilder sb = new StringBuilder();

for (int i = 0; i < strArr.length; i++) {
if (i > 0) {
// 添加连接符
sb.append(conn);
}

// 添加字符串
sb.append(strArr[i]);
}

return sb.toString();
}

/**
* 清除源字符串左边的字符串
*
* @param src 原字符串
* @param trimStr 需要被清除的字符串
* @return 清除后的字符串
*/
static private String trimLeft(String src, String trimStr) {
if (null == src ||
src.isEmpty()) {
return "";
}

if (null == trimStr ||
trimStr.isEmpty()) {
return src;
}

if (src.equals(trimStr)) {
return "";
}

while (src.startsWith(trimStr)) {
src = src.substring(trimStr.length());
}

return src;
}
}

命令处理器工厂

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
/**
* 命令处理器工厂类
*/
public final class CmdHandlerFactory {
/**
* 日志对象
*/
static private final Logger LOGGER = LoggerFactory.getLogger(CmdHandlerFactory.class);

/**
* 命令处理器字典
*/
static private final Map<Class<?>, ICmdHandler<? extends GeneratedMessageV3>> _handlerMap = new HashMap<>();

/**
* 私有化类默认构造器
*/
private CmdHandlerFactory() {
}

/**
* 初始化
*/
static public void init() {
LOGGER.info("==== 完成命令与处理器的关联 ====");

// 获取包名称
final String packageName = CmdHandlerFactory.class.getPackage().getName();
// 获取 ICmdHandler 所有的实现类 (包名,是否递归遍历,定位的Class的子类)
Set<Class<?>> clazzSet = PackageUtil.listSubClazz(
packageName,
true,
ICmdHandler.class
);

for (Class<?> handlerClazz : clazzSet) {
if (null == handlerClazz ||
0 != (handlerClazz.getModifiers() & Modifier.ABSTRACT)) {
continue;
}

// 获取方法数组
Method[] methodArray = handlerClazz.getDeclaredMethods();
// 消息类型
Class<?> cmdClazz = null;

for (Method currMethod : methodArray) {
if (null == currMethod ||
!currMethod.getName().equals("handle")) {
continue;
}

// 获取函数参数类型数组
Class<?>[] paramTypeArray = currMethod.getParameterTypes();

if (paramTypeArray.length < 2 ||
paramTypeArray[1] == GeneratedMessageV3.class ||
!GeneratedMessageV3.class.isAssignableFrom(paramTypeArray[1])) {
continue;
}

cmdClazz = paramTypeArray[1];
break;
}

if (null == cmdClazz) {
continue;
}

try {
// 创建命令处理器实例
ICmdHandler<?> newHandler = (ICmdHandler<?>) handlerClazz.newInstance();

LOGGER.info(
"{} <==> {}",
cmdClazz.getName(),
handlerClazz.getName()
);

_handlerMap.put(cmdClazz, newHandler);
} catch (Exception ex) {
// 记录错误日志
LOGGER.error(ex.getMessage(), ex);
}
}
}

/**
* 创建命令处理器
*
* @param msgClazz
* @return
*/
static public ICmdHandler<? extends GeneratedMessageV3> create(Class<?> msgClazz) {
if (null == msgClazz) {
return null;
}

return _handlerMap.get(msgClazz);
}
}

单线程设计

由于消息处理器是多线程的,在并发的情况下,如果每个用户都有血量,进行并发扣除的话,会产生问题,为了简化业务逻辑,通常都是采用单线程处理。

通过加锁、CAS、原子类对业务入侵性太强。

通过把消息处理器GameMsgHandler的消息处理方法抽取出来,使用单线程业务处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class GameMsgHandler extends SimpleChannelInboundHandler<Object> {
/*
省略部分代码
*/

@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
if (null == ctx ||
null == msg) {
return;
}
//迁移到MainMsgProcessor类中,单线程运行
MainMsgProcessor.getInstance().process(ctx, msg);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
    /*
* 主消息处理器
*/
public final class MainMsgProcessor {
/**
* 日志对象
*/
static private final Logger LOGGER = LoggerFactory.getLogger(MainMsgProcessor.class);

/**
* 单例对象
*/
static private final MainMsgProcessor _instance = new MainMsgProcessor();

/**
* 创建一个单线程的线程池
*/
private final ExecutorService _es = Executors.newSingleThreadExecutor((newRunnable) -> {
Thread newThread = new Thread(newRunnable);
newThread.setName("MainMsgProcessor");
return newThread;
});

/**
* 私有化类默认构造器
*/
private MainMsgProcessor() {
}

/**
* 获取单例对象
*
* @return 单例对象
*/
static public MainMsgProcessor getInstance() {
return _instance;
}

/**
* 处理消息
*
* @param ctx
* @param msg
*/
public void process(ChannelHandlerContext ctx, Object msg) {
if (null == ctx ||
null == msg) {
return;
}

final Class<?> msgClazz = msg.getClass();

LOGGER.info(
"收到客户端消息, msgClazz = {}, msgObj = {}",
msgClazz.getSimpleName(),
msg
);

//单线程运行
_es.submit(() -> {
try {
// 获取命令处理器
ICmdHandler<? extends GeneratedMessageV3> cmdHandler = CmdHandlerFactory.create(msgClazz);

if (null != cmdHandler) {
cmdHandler.handle(ctx, cast(msg));
}
} catch (Exception ex) {
// 记录错误日志
LOGGER.error(ex.getMessage(), ex);
}
});
}

/**
* 转型为命令对象
*
* @param msg 消息对象
* @param <TCmd> 消息类
* @return 命令对象
*/
@SuppressWarnings("unchecked")
static private <TCmd extends GeneratedMessageV3> TCmd cast(Object msg) {
if (!(msg instanceof GeneratedMessageV3)) {
return null;
} else {
return (TCmd) msg;
}
}
}

多线程设计

若在单线程设计中,有业务逻辑需要查询数据库,将会产生IO阻塞,有可能会影响整个业务,需要把IO行为做成异步分离出去,并且IO查询完成后,接着回到单线程的MainMsgProcessor类中运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
/**
* 主消息处理器
*/
public final class MainThreadProcessor {
/**
* 日志对象
*/
static private final Logger LOGGER = LoggerFactory.getLogger(MainThreadProcessor.class);

/**
* 单例对象
*/
static private final MainThreadProcessor _instance = new MainThreadProcessor();

/**
* 创建一个单线程的线程池
*/
private final ExecutorService _es = Executors.newSingleThreadExecutor((r) -> {
Thread newThread = new Thread(r);
newThread.setName("MainMsgProcessor");
return newThread;
});

/**
* 私有化类默认构造器
*/
private MainThreadProcessor() {
}

/**
* 获取单例对象
*
* @return 单例对象
*/
static public MainThreadProcessor getInstance() {
return _instance;
}

/**
* 处理消息
*
* @param ctx 客户端信道上下文
* @param msg 消息对象
*/
public void process(ChannelHandlerContext ctx, Object msg) {
if (null == ctx ||
null == msg) {
return;
}

// 获取消息类
final Class<?> msgClazz = msg.getClass();

LOGGER.info(
"收到客户端消息, msgClazz = {}",
msgClazz.getName()
);

// 获取命令处理器
final ICmdHandler<? extends GeneratedMessageV3> handlerImpl = CmdHandlerFactory.create(msgClazz);

if (null == handlerImpl) {
LOGGER.error(
"未找到相对应的命令处理器, msgClazz = {}",
msgClazz.getName()
);
return;
}

this.process(() -> handlerImpl.handle(
ctx, cast(msg)
));
}

/**
* 处理 Runnable 实例
*
* @param r Runnable 实例
*/
public void process(Runnable r) {
if (null != r) {
_es.submit(new SafeRun(r));
}
}

/**
* 转型为命令对象
*
* @param msg 消息对象
* @param <TCmd> 消息类
* @return 命令对象
*/
@SuppressWarnings("unchecked")
static private <TCmd extends GeneratedMessageV3> TCmd cast(Object msg) {
if (!(msg instanceof GeneratedMessageV3)) {
return null;
} else {
return (TCmd) msg;
}
}

/**
* 安全运行
*/
static private class SafeRun implements Runnable {
/**
* 内置运行实例
*/
private final Runnable _innerR;

/**
* 类参数构造器
*
* @param innerR 内置运行实例
*/
SafeRun(Runnable innerR) {
_innerR = innerR;
}

@Override
public void run() {
if (null == _innerR) {
return;
}

try {
// 运行
_innerR.run();
} catch (Exception ex) {
// 记录错误日志
LOGGER.error(ex.getMessage(), ex);
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 异步操作接口
*/
public interface IAsyncOperation {
/**
* 获取绑定 Id
*
* @return 绑定 Id
*/
default int getBindId() {
return 0;
}

/**
* 执行异步操作
*/
void doAsync();

/**
* 执行完成逻辑
*/
default void doFinish() {
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
/**
* 异步操作处理器
*/
public final class AsyncOperationProcessor {
/**
* 日志对象
*/
static private final Logger LOGGER = LoggerFactory.getLogger(AsyncOperationProcessor.class);

/**
* 单例对象
*/
static private final AsyncOperationProcessor _instance = new AsyncOperationProcessor();

/**
* 单线程数组
*/
private final ExecutorService[] _esArray = new ExecutorService[8];

/**
* 私有化类默认构造器
*/
private AsyncOperationProcessor() {
for (int i = 0; i < _esArray.length; i++) {
// 线程名称
final String threadName = MessageFormat.format("AsyncOperationProcessor[ {0} ]", i);
// 创建单线程
_esArray[i] = Executors.newSingleThreadExecutor((r) -> {
Thread t = new Thread(r);
t.setName(threadName);
return t;
});
}
}

/**
* 获取单例对象
*
* @return 单例对象
*/
static public AsyncOperationProcessor getInstance() {
return _instance;
}

/**
* 执行异步操作
*
* @param op 操作对象
*/
public void process(IAsyncOperation op) {
if (null == op) {
return;
}

int bindId = Math.abs(op.getBindId());
int esIndex = bindId % _esArray.length;

_esArray[esIndex].submit(() -> {
try {
// 执行异步操作
op.doAsync();
// 回到主线程执行完成逻辑
MainThreadProcessor.getInstance().process(op::doFinish);
} catch (Exception ex) {
// 记录错误日志
LOGGER.error(ex.getMessage(), ex);
}
});
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/**
* 用户登陆
*/
public class UserLoginCmdHandler implements ICmdHandler<GameMsgProtocol.UserLoginCmd> {
/**
* 日志对象
*/
static private final Logger LOGGER = LoggerFactory.getLogger(UserLoginCmdHandler.class);

@Override
public void handle(ChannelHandlerContext ctx, GameMsgProtocol.UserLoginCmd cmd) {
if (null == ctx ||
null == cmd) {
return;
}

String userName = cmd.getUserName();
String password = cmd.getPassword();

if (null == userName ||
null == password) {
return;
}

LOGGER.info("当前线程 = {}", Thread.currentThread().getName());

// 获取用户实体
LoginService.getInstance().userLogin(userName, password, (userEntity) -> {
GameMsgProtocol.UserLoginResult.Builder resultBuilder = GameMsgProtocol.UserLoginResult.newBuilder();

LOGGER.info("当前线程 = {}", Thread.currentThread().getName());

if (null == userEntity) {
resultBuilder.setUserId(-1);
resultBuilder.setUserName("");
resultBuilder.setHeroAvatar("");
} else {
User newUser = new User();
newUser.userId = userEntity.userId;
newUser.userName = userEntity.userName;
newUser.heroAvatar = userEntity.heroAvatar;
newUser.currHp = 100;
UserManager.addUser(newUser);

// 将用户 Id 保存至 Session
ctx.channel().attr(AttributeKey.valueOf("userId")).set(newUser.userId);

resultBuilder.setUserId(userEntity.userId);
resultBuilder.setUserName(userEntity.userName);
resultBuilder.setHeroAvatar(userEntity.heroAvatar);
}

GameMsgProtocol.UserLoginResult newResult = resultBuilder.build();
ctx.writeAndFlush(newResult);

return null;
});
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
/**
* 登陆服务
*/
public final class LoginService {
/**
* 日志服务
*/
static private final Logger LOGGER = LoggerFactory.getLogger(LoginService.class);

/**
* 单例对象
*/
static private final LoginService _instance = new LoginService();

/**
* 私有化类默认构造器
*/
private LoginService() {
}

/**
* 获取单例对象
*
* @return 单例对象
*/
static public LoginService getInstance() {
return _instance;
}

/**
* 用户登陆
*
* @param userName 用户名称
* @param password 密码
* @param callback 回调函数
*/
public void userLogin(String userName, String password, Function<UserEntity, Void> callback) {
if (null == userName ||
null == password) {
return;
}

AsyncOperationProcessor.getInstance().process(new AsyncGetUserEntity(userName, password) {
@Override
public void doFinish() {
if (null != callback) {
callback.apply(this.getUserEntity());
}
}
});
}

/**
* 更新 Redis 中的用户基本信息
*
* @param userEntity 用户实体
*/
private void updateBasicInfoInRedis(UserEntity userEntity) {
if (null == userEntity) {
return;
}

try (Jedis redis = RedisUtil.getJedis()) {
JSONObject jsonObj = new JSONObject();
jsonObj.put("userName", userEntity.userName);
jsonObj.put("heroAvatar", userEntity.heroAvatar);

redis.hset("User_" + userEntity.userId, "BasicInfo", jsonObj.toJSONString());
} catch (Exception ex) {
// 记录错误日志
LOGGER.error(ex.getMessage(), ex);
}
}

/**
* 异步方式获取用户实体
*/
static private class AsyncGetUserEntity implements IAsyncOperation {
/**
* 用户名称
*/
private final String _userName;

/**
* 密码
*/
private final String _password;

/**
* 用户实体
*/
private UserEntity _userEntity;

/**
* 类参数构造器
*
* @param userName 用户名称
* @param password 密码
*/
AsyncGetUserEntity(String userName, String password) {
_userName = userName;
_password = password;
}

/**
* 获取用户实体
*
* @return 用户实体
*/
UserEntity getUserEntity() {
return _userEntity;
}

@Override
public int getBindId() {
if (null == _userName) {
return 0;
} else {
return _userName.charAt(_userName.length() - 1);
}
}

@Override
public void doAsync() {
try (SqlSession mySqlSession = MySqlSessionFactory.openSession()) {
// 获取 DAO
IUserDao dao = mySqlSession.getMapper(IUserDao.class);
// 获取用户实体
UserEntity userEntity = dao.getByUserName(_userName);

if (null != userEntity) {
if (!_password.equals(userEntity.password)) {
throw new RuntimeException("密码错误");
}
} else {
userEntity = new UserEntity();
userEntity.userName = _userName;
userEntity.password = _password;
userEntity.heroAvatar = "Hero_Shaman";

dao.insertInto(userEntity);
}

LoginService.getInstance().updateBasicInfoInRedis(userEntity);
_userEntity = userEntity;
} catch (Exception ex) {
// 记录错误日志
LOGGER.error(ex.getMessage(), ex);
}
}
}
}

最后更新: 2021年04月01日 22:21

原始链接: https://midkuro.gitee.io/2020/10/30/io-netty/

× 请我吃糖~
打赏二维码