Java博文
JAVA 21 都体验了吧
Java程序员必备的Intellij插件(长期更新,截止到2018-05-03) - 掘金
32.6k star🔥原来国内的独立开发者都在做这些事情
工作六年,我学会了用 Arthas 来辅助我的日常工作
太方便了!Arthas,生产问题大杀器 - 掘金
新一代Java高性能构建工具Maven-mvnd【实践可行版】
怎么在业务团队写好发消息的代码?
Intellij 开源热加载插件 HotSwapHelper 发布,兼容若依、jeecg 等框架
SpringBoot多环境日志配置_Java_快乐非自愿限量之名_InfoQ写作社区
VSCode配置JAVA开发环境_Java_IT蜗壳-Tango_InfoQ写作社区
Java虚拟线程探究与性能解析
Jakarta EE 11 发布,增强企业 Java 开发人员生产力和性能
重要:Java25正式发布(长期支持版)!
Access Token + Refresh Token 全解析:前后端分离架构的认证与安全方案
设计一个支持千万级用户的 IM 系统:消息推送如何保证可靠性
Spring Boot + CRaC 启动速度提升了10倍!
Java 25 新特性 更简洁、更高效、更现代
玩转 Java8 Stream,让你代码更高效紧凑简洁文章目录前言一、Stream特性二、Stream创建2.1用集合创 - 掘金
Guava 简介:让 Java 开发更高效
横空出世!MyBatis-Plus 同款 ES ORM 框架,用起来够优雅!
一个Java工程师的17个日常效率工具
Quarkus:轻量级 Java 的未来?
OpenJDK、Temurin、GraalVM...到底该装哪个?
Lombok坑哭了!若依框架一行@Data炸出Param为null,我卡了一下午才发现BaseEntity的猫腻
缓存性能王者,阿里巴巴二级缓存JetCache框架
MapStruct使用反思与简单易用性封装
Dockerfile 构建 Java 应用瘦身优化
还在手动搭Maven多模块?这款IDEA插件让我效率提升10倍(真实体验)
本文档使用 MrDoc 发布
-
+
设计一个支持千万级用户的 IM 系统:消息推送如何保证可靠性
作为一名拥有八年 Java 后端开发经验的技术人员,我参与过多个大型 IM 系统的设计与实现。在这篇博客中,我将分享如何设计一个支持千万级用户的 IM 系统,并重点探讨消息推送可靠性的关键技术和实现方案。 ### 业务场景分析 在设计 IM 系统之前,我们需要明确业务场景和需求: 1. **用户规模**:支持千万级在线用户,高峰期并发消息量可能达到每秒数万条 2. **消息类型**:文本、图片、语音、视频等多种消息格式 3. **可靠性要求**:消息不能丢失,重要消息需要确保送达 4. **实时性要求**:消息推送延迟控制在 1 秒以内 5. **离线消息**:用户离线时保存消息,上线后推送 6. **多端同步**:支持手机、平板、PC 等多端消息同步 ### 架构设计概览 一个支持千万级用户的 IM 系统通常采用以下架构: 1. **接入层**:负载均衡、长连接管理、协议解析 2. **服务层**:消息处理、会话管理、好友关系、群组管理 3. **存储层**:消息存储、用户信息存储、离线消息存储 4. **推送层**:消息推送、离线推送、状态同步 5. **监控层**:系统监控、性能监控、故障预警 ### 可靠性保障的核心技术方案 #### 1\. 消息确认机制 为确保消息可靠送达,我们需要实现严格的消息确认机制: ```scss @Service public class MessageAckServiceImpl implements MessageAckService { private static final long ACK_TIMEOUT = 30000; private static final int MAX_RETRIES = 5; private final ConcurrentHashMap<String, Message> pendingMessages = new ConcurrentHashMap<>(); private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10); @Autowired private MessageStore messageStore; @Autowired private PushService pushService; @Override public void sendMessage(Message message) { messageStore.saveMessage(message); boolean success = pushService.pushToDevice(message.getReceiverId(), message); if (success) { pendingMessages.put(message.getMessageId(), message); scheduler.schedule(() -> checkMessageAck(message.getMessageId()), ACK_TIMEOUT, TimeUnit.MILLISECONDS); } else { message.setStatus(MessageStatus.FAILED); messageStore.updateMessageStatus(message.getMessageId(), MessageStatus.FAILED); } } @Override public void handleMessageAck(String messageId, String receiverId) { Message message = pendingMessages.remove(messageId); if (message != null) { message.setStatus(MessageStatus.CONFIRMED); messageStore.updateMessageStatus(messageId, MessageStatus.CONFIRMED); log.info("消息 {} 已确认接收", messageId); } } private void checkMessageAck(String messageId) { Message message = pendingMessages.get(messageId); if (message != null) { message.incrementRetryCount(); if (message.getRetryCount() <= MAX_RETRIES) { boolean success = pushService.pushToDevice(message.getReceiverId(), message); if (success) { scheduler.schedule(() -> checkMessageAck(messageId), ACK_TIMEOUT, TimeUnit.MILLISECONDS); } else { message.setStatus(MessageStatus.FAILED); messageStore.updateMessageStatus(messageId, MessageStatus.FAILED); pendingMessages.remove(messageId); log.error("消息 {} 重试 {} 次后仍然失败", messageId, message.getRetryCount()); } } else { message.setStatus(MessageStatus.FAILED); messageStore.updateMessageStatus(messageId, MessageStatus.FAILED); pendingMessages.remove(messageId); log.error("消息 {} 达到最大重试次数 {}", messageId, MAX_RETRIES); notifySenderMessageFailed(message); } } } private void notifySenderMessageFailed(Message message) { Message notification = new Message( UUID.randomUUID().toString(), "system", message.getSenderId(), "消息发送失败:" + message.getMessageId() ); sendMessage(notification); } } ``` #### 2\. 多机房部署与故障转移 为保证系统高可用,我们采用多机房部署方案: ```scss @Service public class MessageRouterServiceImpl implements MessageRouterService { private final Map<String, DataCenterConfig> dataCenters = new ConcurrentHashMap<>(); private final String localDataCenterId; private final LoadingCache<String, String> userSessionCache; @Autowired public MessageRouterServiceImpl(ConfigService configService) { this.dataCenters.putAll(configService.loadDataCenterConfig()); this.localDataCenterId = configService.getLocalDataCenterId(); this.userSessionCache = Caffeine.newBuilder() .maximumSize(10_000_000) .expireAfterWrite(10, TimeUnit.MINUTES) .build(this::fetchUserSession); } @Override public String routeMessage(Message message) { String receiverId = message.getReceiverId(); try { String serverId = userSessionCache.get(receiverId); if (isServerInLocalDataCenter(serverId)) { return serverId; } else { String targetDataCenterId = getServerDataCenter(serverId); return dataCenters.get(targetDataCenterId).getGatewayServerId(); } } catch (ExecutionException e) { log.error("获取用户会话失败: {}", receiverId, e); return getLeastLoadedServer(); } } @Override public void handleDataCenterFailure(String failedDataCenterId) { DataCenterConfig failedConfig = dataCenters.get(failedDataCenterId); if (failedConfig != null) { failedConfig.setAvailable(false); log.warn("机房 {} 已标记为不可用", failedDataCenterId); } migrateUserSessions(failedDataCenterId); notifyOtherDataCenters(failedDataCenterId); } private void migrateUserSessions(String failedDataCenterId) { List<String> affectedUsers = userSessionCache.asMap().entrySet().stream() .filter(entry -> getServerDataCenter(entry.getValue()).equals(failedDataCenterId)) .map(Map.Entry::getKey) .collect(Collectors.toList()); log.info("需要迁移的用户数量: {}", affectedUsers.size()); for (String userId : affectedUsers) { try { String newServerId = selectNewServerForUser(userId); updateUserSession(userId, newServerId); sendReconnectNotification(userId, newServerId); } catch (Exception e) { log.error("迁移用户会话失败: {}", userId, e); } } } } ``` #### 3\. 离线消息处理 为确保用户不会错过重要消息,需要实现可靠的离线消息存储和推送机制: ```scss @Service public class OfflineMessageServiceImpl implements OfflineMessageService { private final BlockingQueue<Message> offlineMessageQueue = new LinkedBlockingQueue<>(100000); private static final int BATCH_SIZE = 100; private final ExecutorService offlineProcessors = Executors.newFixedThreadPool(10); @Autowired private MessageStore messageStore; @Autowired private PushService pushService; @PostConstruct public void init() { for (int i = 0; i < 10; i++) { offlineProcessors.submit(this::processOfflineMessages); } } @Override public void storeOfflineMessage(Message message) { try { offlineMessageQueue.put(message); log.debug("消息 {} 已加入离线队列,接收者: {}", message.getMessageId(), message.getReceiverId()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("存储离线消息被中断", e); } } private void processOfflineMessages() { List<Message> batchMessages = new ArrayList<>(BATCH_SIZE); while (true) { try { Message message = offlineMessageQueue.poll(1, TimeUnit.SECONDS); if (message != null) { batchMessages.add(message); if (batchMessages.size() >= BATCH_SIZE) { processBatchMessages(batchMessages); batchMessages.clear(); } } else if (!batchMessages.isEmpty()) { processBatchMessages(batchMessages); batchMessages.clear(); } } catch (Exception e) { log.error("处理离线消息出错", e); if (!batchMessages.isEmpty()) { saveProcessedMessages(batchMessages); batchMessages.clear(); } } } } private void processBatchMessages(List<Message> messages) { Map<String, List<Message>> userMessages = messages.stream() .collect(Collectors.groupingBy(Message::getReceiverId)); for (Map.Entry<String, List<Message>> entry : userMessages.entrySet()) { String receiverId = entry.getKey(); List<Message> userMsgs = entry.getValue(); try { boolean isUserOnline = checkUserOnline(receiverId); if (isUserOnline) { for (Message msg : userMsgs) { boolean success = pushService.pushToDevice(receiverId, msg); if (success) { msg.setStatus(MessageStatus.DELIVERED); messageStore.updateMessageStatus(msg.getMessageId(), MessageStatus.DELIVERED); } } } else { storeMessagesForOfflineUser(receiverId, userMsgs); } } catch (Exception e) { log.error("处理用户 {} 的离线消息失败", receiverId, e); } } } } ``` ### 性能优化与扩展性设计 针对千万级用户的 IM 系统,我们还需要考虑以下性能优化和扩展性设计: 1. **消息存储优化**:采用分库分表策略,按用户 ID 哈希分片 2. **缓存策略**:高频访问数据(如用户在线状态)使用 Redis 缓存 3. **异步处理**:非核心业务(如消息计数、统计)采用消息队列异步处理 4. **水平扩展**:服务无状态设计,支持按需扩展服务器数量 5. **熔断与限流**:使用 Sentinel 或 Hystrix 防止服务雪崩 ### 监控与告警 完善的监控系统是保证系统可靠性的重要组成部分: 1. **连接监控**:实时监控长连接数量、连接成功率、断开率 2. **消息监控**:监控消息发送成功率、延迟、堆积情况 3. **性能监控**:监控服务器 CPU、内存、网络带宽使用情况 4. **告警机制**:设置阈值自动触发告警,如消息堆积超过 10 万条 ### 总结 设计一个支持千万级用户的 IM 系统并保证消息推送的可靠性是一个复杂的工程问题。通过采用消息确认机制、多机房部署、离线消息处理、以及完善的监控系统,我们可以构建一个高可用、高性能、高可靠的 IM 系统。 在实际开发过程中,还需要根据具体业务场景进行适当调整和优化。例如,对于金融类 IM 应用,可能需要更高的消息可靠性保证;而对于社交类应用,则可能更注重系统的扩展性和性能。
admin
2025年9月24日 00:30
转发文档
收藏文档
上一篇
下一篇
手机扫码
复制链接
手机扫一扫转发分享
复制链接
Markdown文件
PDF文档(打印)
分享
链接
类型
密码
更新密码