概述 当前服务第一次适用于聊天项目。
目的
方便后期切换 socket
或其它连接技术。
更好的扩展使用场景。
整体思路
消息收发与业务分离;
连接方式动态扩展(位置共享、聊天多设备等);
socket
技术切换不影响实现;
配置 ServerEndpointExporter
1 2 3 4 5 6 7 8 @Configuration public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter () { return new ServerEndpointExporter (); } }
主要依赖 1 2 3 4 5 6 7 8 9 10 11 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-websocket</artifactId > <exclusions > <exclusion > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-logging</artifactId > </exclusion > </exclusions > </dependency >
SocketServer
socket
服务socket
服务一般使用
onOpen
打开连接
onClose
关闭连接
onError
错误
onMessage
接收消息
1 2 3 4 5 6 7 @Component @ServerEndpoint("/test/{userId}/{userKey}") @Slf4j public class WebSocketServer { }
test
用于标记路径,与 Api
一样(不一样在于 socket
长连接,不能像 Api
一样可以方便的切换路径)
/{userId}/{userKey}
动态参数
这里使用 userId
与 userKey
用来标记连接,目的是:
一个 userId
标识连接的客户身份;
一个 userKey
标识连接的客户设备;
一个用户可以使用多个设备连接(具体参考 连接存储 );
onOpen
打开连接1 2 3 4 5 6 7 8 9 10 11 public class WebSocketServer { @OnOpen public void onOpen (Session session, @PathParam(value = "userId") String userId, @PathParam(value = "userKey") String userKey) { log.info("client login => {}" , session.getId()); log.info("client login userId => {}" , userId); log.info("client login userKey => {}" , userKey); } }
@PathParam(value = "userId") String userId
用户获取连接路径中的动态参数
onClose
关闭连接1 2 3 4 5 6 7 8 9 public class WebSocketServer { @OnClose public void onClose (Session session, @PathParam(value = "userId") String userId, @PathParam(value = "userKey") String userKey) { log.info("client logout => {}" , session.getId()); socketSessionStorage.outLine(userId); } }
onError
错误1 2 3 4 5 public class WebSocketServer { @OnError public void onError (Throwable throwable) { } }
onMessage
接收消息1 2 3 4 5 6 7 public class WebSocketServer { @OnMessage public void onMessage (Session session, String msg, @PathParam(value = "userId") String currentUserId, @PathParam(value = "userKey") String userKey) { } }
连接存储 连接存储使用多种类型的存储。
统一接口 下面是存储的接口(省略无用注释);
这里还使用了动态参数,主要是因为上面考虑客户可以使用多个设备的情况,这样实现就可以兼容 同时在线一人 、 多设备在线 、**不同类型设备同时在线 ** 等。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 import javax.websocket.Session;public interface ISocketSessionStorage { default void onLine (Session session, String... keys) { } default void outLine (String... keys) { } default boolean isLine (String... keys) { return false ; } default Session getSession (String... keys) { return null ; } }
举例:同时在线一人 这里对于同时在线一人的处理,并不是像 QQ
、微信
一样让之前的人下线,而是让之前的人收不到消息。想让之前的人下线,需要执行 outLine(keys);
或者有自己的处理。
代码参考
消息收发 接收消息 消息接收后,直接交给消息分发接口(代码参考 )
在消息处理后,根据需要发送消息给前端(代码参考 )
WebSocketServer
服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class WebSocketServer { @Resource(name = "socketRouter") private ISocketRouter socketRouter; @OnMessage public void onMessage (Session session, String msg, @PathParam(value = "userId") String currentUserId, @PathParam(value = "userKey") String userKey) { SocketResult socketResult = chatRouter.goRoute(currentUserId, msg); sendMsg(socketResult); } }
发送消息 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class WebSocketServer { private void sendMsg (SocketResult socketResult) { List<SocketResultMsg> msgList = socketResult.getMsgs(); if (msgList == null || msgList.isEmpty()) { return ; } Session sessionTemp; for (SocketResultMsg socketResultMsg : msgList) { if (!socketSessionStorage.isLine(socketResultMsg.getSocketKey())) { continue ; } sessionTemp = socketSessionStorage.getSession(socketResultMsg.getSocketKey()); try { sessionTemp.getBasicRemote().sendText(socketResultMsg.getMessage()); } catch (IOException e) { log.error("消息发送失败! socketKey={} e:{} \nStackTrace={}" , socketResultMsg.getSocketKey(), e.getMessage(), e.getStackTrace()); } } } }
消息处理 在消息服务中 (WebSocketServer
) 这是一种比较简单的消息处理方式。
这种方式不用关心业务,业务处理只需要继承 ISocketRouter
接口,并实现 goRoute
方法.
SocketResult
(代码参考 ) 是对响应结果的封装
ISocketRouter
消息分发 1 2 3 4 5 6 7 8 9 10 11 public interface ISocketRouter { default SocketResult goRoute (String currentUserId, String message) { return null ; } }
其它代码 SocketSignSessionStorage
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Service;import javax.websocket.Session;import java.io.IOException;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.atomic.AtomicInteger;@Slf4j @Service("socketSignSessionStorage") public class SocketSignSessionStorage implements ISocketSessionStorage { public static final String SOCKET_KEYS_IS_NULL = "Socket keys is null " ; private static final AtomicInteger ON_LINE_NUM = new AtomicInteger (); private static final Map<String, Session> CLIENTS = new ConcurrentHashMap <>(); @Override public void onLine (Session session, String[] keys) { checkKeys(keys); CLIENTS.put(keys[0 ], session); ON_LINE_NUM.addAndGet(1 ); } @Override public void outLine (String[] keys) { checkKeys(keys); Session session = CLIENTS.get(keys[0 ]); if (session != null && session.isOpen()) { try { session.close(); } catch (IOException e) { log.warn("Socket close fail; keys : {} " , keys[0 ]); } } CLIENTS.remove(keys[0 ]); ON_LINE_NUM.addAndGet(-1 ); } @Override public boolean isLine (String[] keys) { checkKeys(keys); Session session = CLIENTS.get(keys[0 ]); if (session == null || !session.isOpen()) { CLIENTS.remove(keys[0 ]); ON_LINE_NUM.addAndGet(-1 ); return false ; } return true ; } @Override public Session getSession (String[] keys) { checkKeys(keys); return CLIENTS.get(keys[0 ]); } private static void checkKeys (String[] keys) { if (keys == null || keys.length == 0 ) { throw new IllegalArgumentException (SOCKET_KEYS_IS_NULL); } } }
SocketResult
1 2 3 4 5 6 7 8 @Data public class SocketResult { List<SocketResultMsg> msgs; }
SocketResultMsg
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Data public class SocketResultMsg { private String socketKey; private String message; public boolean isEmpty () { return this .socketKey == null || this .message == null ; } }
本文地址: https://github.com/maxzhao-it/blog/post/61051/