前言

群聊中未读消息如何设计,以及是推消息还是拉去消息如何选择是需要讨论的。推送消息是推送全量消息还是推送信号消息让客户端再去拉取。其中方案如何选型会比较纠结。 首先基本的推拉结合思路是在线用户推送消息。用户离线的话上线去拉取消息。这是简单的推拉结合。问题在于群聊消息的特点有发送消息比较频繁。假设群里面有一秒钟发送了100条消息。如果推送的话一个人需要推送一百次。但是拉取的话只需要一次就可以拉取所有消息。但是通过 信号消息这样的方案使得设计非常复杂。详情看这篇文章。 目前已经写的文章有。并且有对应视频版本。 git项目地址 【IM即时通信系统(企聊聊)】点击可跳转 sprinboot单体项目升级成springcloud项目 【第一期】 前端项目技术选型以及页面展示【第二期】 分布式权限 shiro + jwt + redis【第三期】 给为服务添加运维模块 统一管理【第四期】 微服务数据库模块【第五期】 netty与mq在项目中的使用(第六期(废弃))】 分布式websocket即时通信(IM)系统构建指南【第七期】 分布式websocket即时通信(IM)系统保证消息可靠性【第八期】 分布式websocket IM聊天系统相关问题问答【第九期】 什么?websocket也有权限!这个应该怎么做?【第十期】 分布式ID是什么,以美团Leaf为例改造融入自己项目【第十一期】 IM聊天系统为什么需要做消息幂等?如何使用Redis以及Lua脚本做消息幂等【第12期】 微信发送一条消息经历哪些过程。企业微信以及钉钉的IM架构对比【第13期】 微信群为什么上限是500人,IM设计系统中的群聊的设计难点【第14期】 【分布式websocket】RocketMQ发送消息保证消息最终一致性需要做哪些处理?【第15期】 B站上面关注我呐 B站和CSDN同名,B站1000粉丝后建群。然后B站关注我后可以私信CSDN来的,然后后面我建群的时候拉你!

群聊中消息已读未读如何设计

数据模型设计

在群聊系统中,管理未读消息的两种常见方法是:记录每个用户与每条消息之间的已读/未读状态,以及记录用户的最后一次阅读消息ID。每种方法都有其优缺点,适用于不同的场景。 结论 如果你的系统需要精确跟踪每条消息的阅读状态,或者需要支持复杂的消息阅读状态查询,可以选择记录每个用户与每条消息之间的已读/未读状态。 如果你的系统更注重性能和可扩展性,或者只需要基本的未读消息功能,记录用户的最后一次阅读消息ID是一个更高效的选择。 通常,考虑到性能和实现的复杂度,许多现代的群聊系统倾向于使用记录最后一次阅读消息ID的方法。这种方法能够满足大多数场景的需求,同时保持系统的高效和简洁。 我们采用 记录用户的最后一次阅读消息ID。

CREATE TABLE yan_im_read(

user_id VARCHAR(255) NOT NULL,

group_id VARCHAR(255) NOT NULL,

last_read_message_id BIGINT,

count int

);

具体未读这块会在下一篇离线消息设计中说明 设计思路 功能实现

发送消息: 当用户发送消息时,将消息存入消息表,并为每个接收者在用户-会话关系表中的未读消息数加一。 对于群聊,为群内每个成员(除了发送者)的未读消息数加一。阅读消息: 当用户打开一个聊天窗口时,系统将该会话的未读消息数重置为0,并更新最后阅读时间或最后阅读的消息ID。 同时,前端展示未读消息数,并将其清零。查询未读消息数: 用户登录或在主界面时,系统查询用户-会话关系表,获取每个会话的未读消息数,以及总的未读消息数。 这些信息用于在用户的聊天列表中显示每个聊天窗口的未读消息数,以及应用图标上显示的总未读数。

群聊中消息推送模型采用推送还是用户自己拉取

** 模式一 推送模式(Push)** 在推送模式下,当有新消息时,服务器主动将消息推送给客户端。这种模式可以实现实时通信,用户体验较好。 优点: 实时性:用户可以即时接收到消息,无需主动查询,提高了通信的实时性。 减轻客户端负担:客户端无需定时向服务器发送查询请求,减少了网络请求和资源消耗。 缺点: 服务器负担较重:需要服务器跟踪每个客户端的连接状态,并实时推送消息。 可能存在消息丢失:在网络不稳定或客户端离线时,推送的消息可能会丢失。 模式二 拉取模式(Pull)

在拉取模式下,客户端定时向服务器发送请求,查询是否有新消息。如果有,客户端再拉取这些新消息。优点: 简单可靠:客户端主动拉取,可以根据需要重试,减少了消息丢失的风险。 服务器处理简单:服务器不需要跟踪客户端的连接状态,只需响应客户端的请求。缺点: 延迟:用户接收到消息的速度取决于拉取的频率,可能无法做到实时通信。 增加客户端负担:客户端需要定时发送请求,增加了网络请求和资源消耗。

模式三 推送通知后客户端拉取消息 在这种策略中,当群聊中有新消息时,服务器不直接发送消息内容,而是发送一个有新消息的通知给群成员,由客户端在收到通知后主动向服务器拉取新消息。 优点: 减轻了服务器推送的压力,尤其是在高并发场景下。 更灵活,可以根据客户端的实际情况(如网络状况、用户设置等)决定是否拉取新消息。 方便实现对离线消息的处理,客户端上线后可以主动拉取期间的所有新消息。 缺点: 实时性略差,用户收到消息有一定的延迟。 客户端逻辑更复杂,需要实现拉取新消息的逻辑。

结合使用 在实际应用中,为了兼顾实时性和系统资源的有效利用,往往会结合推送和拉取两种模式:

推送+拉取:对于实时性要求高的消息,如即时聊天消息,采用推送模式,确保用户能够及时收到。对于实时性要求不高的信息,如离线消息或通知,可以在用户上线时通过拉取模式获取。状态同步:使用推送模式进行实时消息通信,同时,客户端在特定情况下(如启动、网络恢复等)主动拉取最新状态,以确保没有遗漏的消息。根据消息优先级去选择推送或者拉取:还可以根据消息的优先级和类型,选择不同的推送策略。对于重要或紧急的消息,可能采用直接推送的方式;而对于普通消息,则采用通知加拉取的方式注意事项 推送策略:在推送模式下,需要合理设计推送策略,比如使用消息队列管理待推送消息,以应对高并发场景。 拉取策略:在拉取模式下,需要考虑合理的拉取频率,避免过于频繁导致的资源浪费,或过于稀疏导致的实时性不足。 用户体验:在设计消息推送模型时,应考虑到用户体验,提供稳定、可靠、及时的消息服务。

实时推送的优化策略

1.消息队列优化 利用消息队列来管理消息的发送。当有新消息时,先将消息发送到消息队列中,然后使用消费者服务批量从队列中取出消息进行处理和推送。这种方式可以有效地平衡负载,提高消息处理的效率。 2.分批推送 对于大群聊,一次性向所有成员推送可能会导致服务器压力过大。可以将群成员分批,每批包含一定数量的用户,然后逐批推送。这样既可以减轻服务器压力,又可以避免网络拥塞。 3.WebSocket连接池 对于基于WebSocket或长连接的推送方式,使用连接池来管理和复用连接。这样可以减少频繁建立和断开连接的开销,提高推送效率。 4. 监控和调优 持续监控推送系统的性能指标,如推送延迟、失败率等。根据监控结果调整批处理大小、分批策略和资源分配,以达到最优的推送效率。

MQ批量消费逻辑

先说明一下消费什么。批量消费可以解决什么问题。在之前的消息链路消息有落库的环节。

落库的话需要监听mq然后消费这条消息进行落库。这里采用的策略是批量落库。避免发送一条消息保存一条消息这样的频繁访问数据库。

org.apache.rocketmq

rocketmq-spring-boot-starter

2.2.0

@Service

@RocketMQMessageListener(topic = "yourTopic", consumerGroup = "yourConsumerGroup", consumeMode = ConsumeMode.CONCURRENTLY, consumeThreadMax = 10, messageModel = MessageModel.CLUSTERING)

public class MyBatchConsumerService implements RocketMQListener> {

@Override

public void onMessage(List messages) {

for (MessageExt message : messages) {

// 处理每条消息

System.out.println(new String(message.getBody()));

}

// 实现批量处理逻辑

}

}

配置消费者属性 在application.properties或application.yml中配置消费者的属性,特别是consumeMessageBatchMaxSize,这个属性决定了消费者每次批量拉取处理的消息最大数量。

rocketmq:

consumer:

consumeMessageBatchMaxSize: 10

如果需要更细致地控制消费者的配置,可以通过编程方式自定义消费者。这通常涉及到创建DefaultMQPushConsumer的实例,并设置相关属性。

@Configuration

public class RocketMQConsumerConfig {

@Value("${rocketmq.name-server}")

private String nameServer;

@Value("${rocketmq.consumer.group}")

private String consumerGroup;

@Bean

public DefaultMQPushConsumer batchConsumer() throws MQClientException {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);

consumer.setNamesrvAddr(nameServer);

consumer.setConsumeMessageBatchMaxSize(10); // 设置批量消费的大小

consumer.subscribe("YourTopic", "*"); // 订阅主题

consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {

// 实现批量消息处理逻辑

msgs.forEach(msg -> {

System.out.println(new String(msg.getBody()));

});

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

});

consumer.start();

return consumer;

}

}

. 注意事项

消息大小限制:确保批量消费的总消息大小不超过RocketMQ的限制(默认不超过4MB)。异常处理:在批量处理消息时,确保有适当的异常处理机制。如果批量中的某个消息处理失败,需要决定是重试整个批次还是仅重试失败的消息。性能测试:在实际部署前,进行充分的性能测试,以确定最优的consumeMessageBatchMaxSize值。过大或过小的批量大小都可能影响消费效率。 通过上述配置和建议,可以在Spring Boot应用中有效地实现RocketMQ的批量消费,提高消息处理的效率和应用性能。

介绍群聊模式三

当群聊中有新消息时,服务器不直接发送消息内容,而是发送一个有新消息的通知给群成员,由客户端在收到通知后主动向服务器拉取新消息。这种模式实现起来暂时有点复杂。先按照简单的推拉结合处理。但是可以保留着思路,然后后面解决。 方案概述

信号消息推送:当有新消息发送到群聊时,服务器不会直接将完整的消息内容推送给所有群成员。相反,它只发送一个信号消息,通知客户端有新消息可用。客户端拉取消息:收到信号消息后,客户端会主动向服务器发起请求,拉取自上次更新以来的所有新消息。 方案优点 减少带宽消耗:由于不是所有的消息内容都通过推送发送,这种方案可以显著减少网络带宽的消耗。 提高效率:客户端可以根据实际需要批量拉取消息,减少网络请求的次数,提高数据同步的效率。 保证消息完整性:通过拉取机制,即使在网络不稳定的情况下,客户端也能确保最终获取到所有的消息,避免消息丢失。 支持离线消息:当用户离线时,信号消息可以被服务器暂存,用户上线后再拉取所有未读消息,保证消息的完整同步。 方案实现要点

信号消息设计:信号消息应包含足够的信息,以便客户端知道从哪里开始拉取新消息。例如,可以包含最新消息的ID或时间戳。

消息存储:服务器需要有效地存储和管理消息,以支持高效的消息拉取操作。通常需要对消息进行索引,以便快速查询到新消息。 拉取策略:客户端可以实现智能的拉取策略,例如,在用户查看群聊时主动拉取新消息,或者在收到多个信号消息时合并请求,减少服务器的负载。 错误处理和重试机制:为了保证消息的完整性,客户端在拉取消息时应该实现错误处理和重试机制,确保在网络不稳定时也能成功获取消息。 适用场景 这种结合推送和拉取的方案非常适合消息量大、用户基数广的群聊系统,尤其是在需要优化网络资源消耗、保证消息可靠传递的场景下。同时,这种方案也适用于需要支持离线消息同步的应用。

好文链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: