文章目录
前言一、数据库设计二、实现代码1.SessionWrap2.websocket3.insertMessage4.清除未读
前言
使用WebSocket实现一对一的聊天功能与未读消息功能
一、数据库设计
会话表
字段名字段类型长度注释conversation_idint11会话IDcreate_timedatetime创建时间conversation_typeint1会话类型
消息表
字段名字段类型长度注释message_idint11消息IDconversation_idint11会话IDsender_idint11发送者IDreceiver_idin t11接收者IDcontenttext消息内容typeint2消息类型informationvarchar255信息sender_imgint11发送者头像IDreceiver_imgint11接收者头像IDmessage_statusint1消息状态(1已读,0未读)create_timedatetime创建时间
二、实现代码
1.SessionWrap
@Data
public class SessionWrap {
private String from; // 连接人id
private String type; // 连接类型
private Session session;
private Date lastTime;
}
2.websocket
@Component
@ServerEndpoint(value = "/api/websocket/{from}/{type}")
public class WebSocketServer {
@Autowired
private RqriMessageService rqriMessageService;
public static WebSocketServer webSocketServer;
// 所有的连接会话
private static CopyOnWriteArraySet
private String from;
private String type;
@PostConstruct
public void init() {
webSocketServer = this;
webSocketServer.rqriMessageService = this.rqriMessageService;
}
@OnOpen
public void onOpen(Session session, @PathParam(value = "from") String from, @PathParam(value = "type") String type) {
this.from = from;
this.type = type;
try {
// 遍历list,如果有会话,更新,如果没有,创建一个新的
for (SessionWrap item : sessionList) {
if (item.getFrom().equals(from) && item.getType().equals(type)) {
item.setSession(session);
item.setLastTime(new Date());
log.info("【websocket消息】更新连接,总数为:" + sessionList.size());
return;
}
}
SessionWrap sessionWrap = new SessionWrap();
sessionWrap.setFrom(from);
sessionWrap.setType(type);
sessionWrap.setSession(session);
sessionWrap.setLastTime(new Date());
sessionList.add(sessionWrap);
log.info("【websocket消息】有新的连接,总数为:" + sessionList.size());
} catch (Exception e) {
log.info("【websocket消息】连接失败!错误信息:" + e.getMessage());
}
}
@OnClose
public void onClose() {
try {
sessionList.removeIf(item -> item.getFrom().equals(from) && item.getType().equals(type));
log.info("【websocket消息】连接断开,总数为:" + sessionList.size());
} catch (Exception e) {
log.info("【websocket消息】连接断开失败!错误信息:" + e.getMessage());
}
}
@OnMessage
public void onMessage(String message, Session session) {
try {
if ("ping".equals(message)) {
session.getBasicRemote().sendText("ping"); // 心跳检测
} else {
// 将消息插入到数据库
JSONObject r = webSocketServer.rqriMessageService.insertMessage(message);
// 成功
if (r.getInteger("code") == 200) {
JSONObject data = r.getJSONObject("data");
String senderId = data.getString("senderId"); // 发送者
String receiverId = data.getString("receiverId"); // 接收者
for (SessionWrap item : sessionList) {
if (senderId.equals(item.getFrom()) || receiverId.equals(item.getFrom()) ) {
item.getSession().getBasicRemote().sendText(r.toJSONString());
}
}
log.info("【websocket消息】发送消息:" + r.toJSONString());
}
}
} catch (Exception e) {
log.info("【websocket消息】发送消息失败!错误信息:" + e.getMessage());
}
}
@OnError
public void onError(Session session, Throwable error) {
log.error("用户错误,原因:"+error.getMessage());
error.printStackTrace();
}
}
3.insertMessage
private final String rqriMessageStr = "rqri_message_unread_";
public JSONObject insertMessage(String message) {
JSONObject jsonObject = new JSONObject();
RqriMessage rqriMessage = JSONObject.parseObject(message, RqriMessage.class);
// 把消息添加到数据库
int i = rqriMessageMapper.insertSelective(rqriMessage);
// 将未读信息添加到redis 添加接收者的未读
String conversationId = String.valueOf(rqriMessage.getConversationId());
String receiverId = String.valueOf(rqriMessage.getReceiverId());
String key = rqriMessageStr + conversationId + "_" + receiverId;
if (redisUtils.get(key) == null) {
redisUtils.set(key, 1, 0); // 设置永不过期
} else {
redisUtils.incr(key, 1); // 未读数量添加1
}
jsonObject.put("code", 200);
jsonObject.put("data", rqriMessage);
// 发送者的id和未读数量,返回给前端渲染到页面
HashMap
map.put("num", Integer.valueOf(redisUtils.get(key).toString()));
map.put("id", rqriMessage.getSenderId());
jsonObject.put("isread", map);
return jsonObject;
}
4.清除未读
最后在进入聊天页面和退出聊天页面时把未读数量清零。
文章链接
发表评论