概述
当前服务第一次适用于聊天项目。
目的
- 方便后期切换
socket
或其它连接技术。
- 更好的扩展使用场景。
整体思路
- 消息收发与业务分离;
- 连接方式动态扩展(位置共享、聊天多设备等);
socket
技术切换不影响实现;
配置
ServerEndpointExporter
1 2 3 4 5 6 7 8
| @Configuration public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
|
主要依赖
1 2 3 4 5 6 7 8 9 10 11
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> </exclusions> </dependency>
|
SocketServer
socket
服务
socket
服务一般使用
onOpen
打开连接
onClose
关闭连接
onError
错误
onMessage
接收消息
1 2 3 4 5 6 7
| @Component @ServerEndpoint("/test/{userId}/{userKey}") @Slf4j public class WebSocketServer { }
|
test
用于标记路径,与 Api
一样(不一样在于 socket
长连接,不能像 Api
一样可以方便的切换路径)
/{userId}/{userKey}
动态参数
- 这里使用
userId
与 userKey
用来标记连接,目的是:
- 一个
userId
标识连接的客户身份;
- 一个
userKey
标识连接的客户设备;
- 一个用户可以使用多个设备连接(具体参考 连接存储);
onOpen
打开连接
1 2 3 4 5 6 7 8 9 10 11
| public class WebSocketServer { @OnOpen public void onOpen(Session session, @PathParam(value = "userId") String userId, @PathParam(value = "userKey") String userKey) { log.info("client login => {}", session.getId()); log.info("client login userId => {}", userId); log.info("client login userKey => {}", userKey); } }
|
@PathParam(value = "userId") String userId
用户获取连接路径中的动态参数
onClose
关闭连接
1 2 3 4 5 6 7 8 9
| public class WebSocketServer { @OnClose public void onClose(Session session, @PathParam(value = "userId") String userId, @PathParam(value = "userKey") String userKey) { log.info("client logout => {}", session.getId()); socketSessionStorage.outLine(userId); } }
|
onError
错误
1 2 3 4 5
| public class WebSocketServer { @OnError public void onError(Throwable throwable) { } }
|
onMessage
接收消息
1 2 3 4 5 6 7
| public class WebSocketServer { @OnMessage public void onMessage(Session session, String msg, @PathParam(value = "userId") String currentUserId, @PathParam(value = "userKey") String userKey) { } }
|
连接存储
连接存储使用多种类型的存储。
统一接口
下面是存储的接口(省略无用注释);
这里还使用了动态参数,主要是因为上面考虑客户可以使用多个设备的情况,这样实现就可以兼容 同时在线一人、多设备在线、**不同类型设备同时在线 ** 等。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| import javax.websocket.Session;
public interface ISocketSessionStorage {
default void onLine(Session session, String... keys) { }
default void outLine(String... keys) { }
default boolean isLine(String... keys) { return false; }
default Session getSession(String... keys) { return null; } }
|
举例:同时在线一人
这里对于同时在线一人的处理,并不是像 QQ
、微信
一样让之前的人下线,而是让之前的人收不到消息。想让之前的人下线,需要执行 outLine(keys);
或者有自己的处理。
代码参考
消息收发
接收消息
消息接收后,直接交给消息分发接口(代码参考)
在消息处理后,根据需要发送消息给前端(代码参考)
WebSocketServer
服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public class WebSocketServer { @Resource(name = "socketRouter") private ISocketRouter socketRouter;
@OnMessage public void onMessage(Session session, String msg, @PathParam(value = "userId") String currentUserId, @PathParam(value = "userKey") String userKey) { SocketResult socketResult = chatRouter.goRoute(currentUserId, msg); sendMsg(socketResult); } }
|
发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public class WebSocketServer {
private void sendMsg(SocketResult socketResult) { List<SocketResultMsg> msgList = socketResult.getMsgs(); if (msgList == null || msgList.isEmpty()) { return; } Session sessionTemp; for (SocketResultMsg socketResultMsg : msgList) { if (!socketSessionStorage.isLine(socketResultMsg.getSocketKey())) { continue; } sessionTemp = socketSessionStorage.getSession(socketResultMsg.getSocketKey()); try { sessionTemp.getBasicRemote().sendText(socketResultMsg.getMessage()); } catch (IOException e) { log.error("消息发送失败! socketKey={} e:{} \nStackTrace={}", socketResultMsg.getSocketKey(), e.getMessage(), e.getStackTrace()); } } } }
|
消息处理
在消息服务中 (WebSocketServer
) 这是一种比较简单的消息处理方式。
这种方式不用关心业务,业务处理只需要继承 ISocketRouter
接口,并实现 goRoute
方法.
SocketResult
(代码参考) 是对响应结果的封装
ISocketRouter
消息分发
1 2 3 4 5 6 7 8 9 10 11
| public interface ISocketRouter {
default SocketResult goRoute(String currentUserId, String message) { return null; } }
|
其它代码
SocketSignSessionStorage
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
| import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service;
import javax.websocket.Session; import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger;
@Slf4j @Service("socketSignSessionStorage") public class SocketSignSessionStorage implements ISocketSessionStorage { public static final String SOCKET_KEYS_IS_NULL = "Socket keys is null "; private static final AtomicInteger ON_LINE_NUM = new AtomicInteger(); private static final Map<String, Session> CLIENTS = new ConcurrentHashMap<>();
@Override public void onLine(Session session, String[] keys) { checkKeys(keys); CLIENTS.put(keys[0], session); ON_LINE_NUM.addAndGet(1); }
@Override public void outLine(String[] keys) { checkKeys(keys); Session session = CLIENTS.get(keys[0]); if (session != null && session.isOpen()) { try { session.close(); } catch (IOException e) { log.warn("Socket close fail; keys : {} ", keys[0]); } } CLIENTS.remove(keys[0]); ON_LINE_NUM.addAndGet(-1); }
@Override public boolean isLine(String[] keys) { checkKeys(keys); Session session = CLIENTS.get(keys[0]); if (session == null || !session.isOpen()) { CLIENTS.remove(keys[0]); ON_LINE_NUM.addAndGet(-1); return false; } return true; }
@Override public Session getSession(String[] keys) { checkKeys(keys); return CLIENTS.get(keys[0]); }
private static void checkKeys(String[] keys) { if (keys == null || keys.length == 0) { throw new IllegalArgumentException(SOCKET_KEYS_IS_NULL); } } }
|
SocketResult
1 2 3 4 5 6 7 8
|
@Data public class SocketResult { List<SocketResultMsg> msgs; }
|
SocketResultMsg
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @Data public class SocketResultMsg { private String socketKey; private String message;
public boolean isEmpty() { return this.socketKey == null || this.message == null; } }
|
本文地址: https://github.com/maxzhao-it/blog/post/61051/