Websocket+RabbitMQ实现消息推送系统

一、用户获取新的消息通知有两种模式

  • 上线登录后向系统主动索取

  • 在线时系统向接收者主动推送新消息

设想下,用户的通知消息和新通知提醒数据都放在数据库中,数据库的读写操作频繁。如果消息量大,DB压力较大,可能出现数据瓶颈。这时候就可以引入消息队列RabbitMQ进行流量削峰

向指定用户发送WebSocket消息并处理对方不在线的情况:

  • 如果接收者在线,则直接发送消息;

  • 否则将消息存储到redis,等用户上线后主动拉取未读消息。


二、Websocket+RabbitMQ消息推送架构图

从图中可以看出消息通知系统的基本流程是客户端A请求服务端核心模块,核心模块生产一条消息到消息队列,然后服务端消息模块消费消息,消费完之后就将消息推送给客户端B,流程很简单,没有太多技巧,唯一的巧妙之处就在消息模块这边的处理上。

 一般还需要在「个人中心」需要有一个设置是否接收消息的设置项,满足用户个性化需求。

 我们当前的流程有些取巧,原本应该是消费者发消息之前就去请求「用户消息设置」,用户设置成接收,才去产生消息的。而我们现在的流程中消费者不去关注用户设置,把所有消息都往「队列」里塞,让主流程去做过滤处理,这样各个生产者就不用每个都去单独处理,同时也少了一次网络交互。


三、消息通知的类型

几乎每个站点都有消息通知系统,可见通知系统的重要性不言而喻。通知系统看似简单,实际上比较复杂。那么本篇主要讲解常见的消息通知系统的设计和具体实现,包括数据库设计、逻辑关系分析等。

常见的站内通知类别:

  • 公告 Announcement
  • 提醒 Remind

    • 资源订阅提醒「我关注的资源有更新、评论等事件时通知我」
    • 资源发布提醒「我发布的资源有评论、收藏等事件时通知我」
    • 系统提醒「平台会根据一些算法、规则等可能会对你的资源做一些事情,这时你会收到系统通知」
  • 私信 Mailbox

以上三种消息有各自特点,实现也各不相同,其中「提醒」类通知是最复杂的:

通知事件

通知事件就是当用户在网站或应用上产生了支付行为之后,如果你想给用户一个通知,告诉她系统已收到她的付款,那么你就要把这个「支付行为」定义为一个通知事件,并且保存这个通知事件到「通知事件表」里,以便通知系统作异步处理。通知系统会不断的处理「通知事件表」里的数据,分析每一个事件应该通知和不通知哪些人。

通知事件表「notify_event」

记录每一个用户行为产生的通知事件信息

表结构如下:

id: {type: 'integer', primaryKey: true, autoIncrement:true} 
userID: {type: 'string', required: true} //用户ID
action: {type: 'string', required: true} //动作,如:捐款/更新/评论/收藏
objectID: {type: 'string', required: true}, //对象ID,如:文章ID;
objectType: {type: 'string', required: true} //对象所属类型,如:人、文章、活动、视频等;
createdAt:{type: 'timestamp', required: true} //创建时间;

用户行为定义

「action」即用户行为,如:赞了、评论了、喜欢了、捐款了、收藏了;一般来讲,我们把一个用户行为定义为一个通知类型,那么用户行为必须是需要提前定义好的。

由消息系统内部定义,为后台提供接口,用于通知设置。如下:

notify_action_type := ["donated","conllected","commented","updated"]

对象类型定义

「objectType」即用户行为作用的对象的所属类型,简单的说就是资源类型,如:项目、文章、评论、商品、视频、图片、用户。

由消息系统内部定义,为后台提供接口,用于通知设置。如下:

notify_object_type := ["project","comment"]

四、消息通知系统注意事项

4.1、Nginx代理webSocket时60s自动断开, 怎么保持长连接

利用nginx代理websocket的时候,发现客户端和服务器握手成功后,如果在60s时间内没有数据交互,连接就会自动断开。

nginx.conf 文件里location 中的proxy_read_timeout 默认60s断开,可以把他设置大一点,你可以设置成自己需要的时间,我这里设置的是十分钟(600s).

nginx配置如下:

server {
        listen 80;
        server_name carrefourzone.senguo.cc;
        #error_page 502 /static/502.html;

        location /static/ {
            root /home/chenming/Carrefour/carrefour.senguo.cc/source;
            expires 7d;
            }

        location / {
            proxy_pass_header Server;
            proxy_set_header Host $http_host;
            proxy_redirect off;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Scheme $scheme;
            proxy_pass       http://127.0.0.1:9887;
            proxy_http_version  1.1;
            proxy_set_header    Upgrade    "websocket";
            proxy_set_header    Connection "Upgrade";
            proxy_read_timeout 600s; 
        }
    }

按照上述方法设置好后,我们可以发现,如果在10分钟之内没有数据交互的话,websocket连接就会自动断开,所以这种方式还是有点问题,如果我页面停留时间超过十分钟而且又没有数据交互的话,连接还是会断开的,所以需要同时结合WebSocket心跳机制的方法.

4.2、WebSocket的心跳激活机制

心跳机制是每隔一段时间会向服务器发送一个数据包,告诉服务器自己还活着,同时客户端会确认服务器端是否还活着,如果还活着的话,就会回传一个数据包给客户端来确定服务器端也还活着,否则的话,有可能是网络断开连接了。需要重连。

WebSocket 长连接需要在弱网环境和网络暂时断开的情况下,需要有一个稳定的重连机制,保证在网络不稳定的时候,客户端和服务端能够重连,继续通信。

在nginx延长超时时间的基础上,前端在超时时间内发心跳包,刷新再读时间,前端具体实现见如下代码(此处代码包含了前端整个websocket的实现过程,其中红色重点标注了发心跳包的内容):

// websocket连接
var websocket_connected_count = 0;
var onclose_connected_count = 0;
function newWebSocket(){
    var websocket = null;
    // 判断当前环境是否支持websocket
    if(window.WebSocket){
        if(!websocket){
            var ws_url ="wss://"+domain+"/updatewebsocket";
            websocket = new WebSocket(ws_url);
        }
    }else{
        Tip("not support websocket");
    }
 
    // 连接成功建立的回调方法
    websocket.onopen = function(e){
        heartCheck.reset().start();   // 成功建立连接后,重置心跳检测
        Tip("connected successfully")
    }
    // 连接发生错误,连接错误时会继续尝试发起连接(尝试5次)
    websocket.onerror = function() {
        console.log("onerror连接发生错误")
        websocket_connected_count++;
        if(websocket_connected_count <= 5){
            newWebSocket()
        }
    }
    // 接受到消息的回调方法
    websocket.onmessage = function(e){
        console.log("接受到消息了")
        heartCheck.reset().start();    // 如果获取到消息,说明连接是正常的,重置心跳检测
        var message = e.data;
        if(message){
           //执行接收到消息的操作,一般是刷新UI
        }
    }
 
    // 接受到服务端关闭连接时的回调方法
    websocket.onclose = function(){
        Tip("onclose断开连接");
    }
    // 监听窗口事件,当窗口关闭时,主动断开websocket连接,防止连接没断开就关闭窗口,server端报错
    window.onbeforeunload = function(){
        websocket.close();
    }
 
    // 心跳检测, 每隔一段时间检测连接状态,如果处于连接中,就向server端主动发送消息,来重置server端与客户端的最大连接时间,如果已经断开了,发起重连。
    var heartCheck = {
        timeout: 55000,        // 9分钟发一次心跳,比server端设置的连接时间稍微小一点,在接近断开的情况下以通信的方式去重置连接时间。
        serverTimeoutObj: null,
        reset: function(){
            clearTimeout(this.timeoutObj);
            clearTimeout(this.serverTimeoutObj);
            return this;
        },
        start: function(){
            var self = this;
            this.serverTimeoutObj = setInterval(function(){
                if(websocket.readyState == 1){
                    console.log("连接状态,发送消息保持连接");
                    websocket.send("ping");
                    heartCheck.reset().start();    // 如果获取到消息,说明连接是正常的,重置心跳检测
                }else{
                    console.log("断开状态,尝试重连");
                    newWebSocket();
                }
            }, this.timeout)
        }
    }
}

4.3、RabbitMQ消费端的必须捕捉异常

我遇到的Bug是对前端输入没有Emoj表情过滤导致JPA插入报错,且消费者程序没有try-catch捕捉异常,最终导致消费者程序异常终止。

4.4、WebSocket实现Web聊天室的多页面跨面问题

 一般能想到的有:

  • 通过IFrame

  • 通过web都做成单页

  • 通过sharedworker(也解决不了多浏览器),但是不是所有浏览器都支持

4.5、使用websocket实现群聊

需要很多用户(在不同的房间)进行实时聊天,也就是一个简单的聊天室,这里用的是WebSocket实现。这里需要对每一个连接都指定两个参数:用户的userId和所加入的房间id(roomId);

这里是用到一个map<房间id, 用户set>来保存房间对应的用户连接列表,当有用户进入一个房间的时候,就会先检测房间是否存在,如果不存在那就新建一个空的用户set,再加入本身到这个set中;这里需要考虑线程安全问题,因为用到的是一个hashMap,如同时又AB两个用户加入一个空房间,同时访问friends为空,然后都会新建一个set再加入进去,那么可能会出现一个情况就是A检测不存在房间,然后创建加入进去,B也同时检测到不存在,也重新创建一个用户set,这样就会覆盖原来的set,也就是说A用户就加入失败

推荐阅读:使用websocket实现群聊(多个群)


五、自定义HandshakeInterceptor,用于禁止未登录用户连接WebSocket

package cn.zifangsky.stompwebsocket.interceptor.websocket;
 
import cn.zifangsky.stompwebsocket.common.Constants;
import cn.zifangsky.stompwebsocket.common.SpringContextUtils;
import cn.zifangsky.stompwebsocket.model.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
 
import javax.servlet.http.HttpSession;
import java.text.MessageFormat;
import java.util.Map;
 
/**
 * 自定义{@link org.springframework.web.socket.server.HandshakeInterceptor},实现“需要登录才允许连接WebSocket”
 *
 */
@Component
public class AuthHandshakeInterceptor implements HandshakeInterceptor {
    private final Logger logger = LoggerFactory.getLogger(getClass());
 
    @Override
    public boolean beforeHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map<String, Object> map) throws Exception {
        HttpSession session = SpringContextUtils.getSession();
        User loginUser = (User) session.getAttribute(Constants.SESSION_USER);
 
        if(loginUser != null){
            logger.debug(MessageFormat.format("用户{0}请求建立WebSocket连接", loginUser.getUsername()));
            return true;
        }else{
            logger.error("未登录系统,禁止连接WebSocket");
            return false;
        }
 
    }
 
    @Override
    public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {
 
    }
 
}

参考链接:

Web 端消息通知机制现实方案

消息通知系统模型设计

Nginx代理webSocket时60s自动断开, 怎么保持长连接

消费端channel主动断开后,可能存在的bug

热门文章

暂无图片
编程学习 ·

gdb调试c/c++程序使用说明【简明版】

启动命令含参数&#xff1a; gdb --args /home/build/***.exe --zoom 1.3 Tacotron2.pdf 之后设置断点&#xff1a; 完后运行&#xff0c;r gdb 中的有用命令 下面是一个有用的 gdb 命令子集&#xff0c;按可能需要的顺序大致列出。 第一列给出了命令&#xff0c;可选字符括…
暂无图片
编程学习 ·

高斯分布的性质(代码)

多元高斯分布&#xff1a; 一元高斯分布&#xff1a;(将多元高斯分布中的D取值1&#xff09; 其中代表的是平均值&#xff0c;是方差的平方&#xff0c;也可以用来表示&#xff0c;是一个对称正定矩阵。 --------------------------------------------------------------------…
暂无图片
编程学习 ·

强大的搜索开源框架Elastic Search介绍

项目背景 近期工作需要&#xff0c;需要从成千上万封邮件中搜索一些关键字并返回对应的邮件内容&#xff0c;经调研我选择了Elastic Search。 Elastic Search简介 Elasticsearch &#xff0c;简称ES 。是一个全文搜索服务器&#xff0c;也可以作为NoSQL 数据库&#xff0c;存…
暂无图片
编程学习 ·

Java基础知识(十三)(面向对象--4)

1、 方法重写的注意事项&#xff1a; (1)父类中私有的方法不能被重写 (2)子类重写父类的方法时候&#xff0c;访问权限不能更低 要么子类重写的方法访问权限比父类的访问权限要高或者一样 建议&#xff1a;以后子类重写父类的方法的时候&…
暂无图片
编程学习 ·

Java并发编程之synchronized知识整理

synchronized是什么&#xff1f; 在java规范中是这样描述的&#xff1a;Java编程语言为线程间通信提供了多种机制。这些方法中最基本的是使用监视器实现的同步(Synchronized)。Java中的每个对象都是与监视器关联&#xff0c;线程可以锁定或解锁该监视器。一个线程一次只能锁住…
暂无图片
编程学习 ·

计算机实战项目、毕业设计、课程设计之 [含论文+辩论PPT+源码等]小程序食堂订餐点餐项目+后台管理|前后分离VUE[包运行成功

《微信小程序食堂订餐点餐项目后台管理系统|前后分离VUE》该项目含有源码、论文等资料、配套开发软件、软件安装教程、项目发布教程等 本系统包含微信小程序前台和Java做的后台管理系统&#xff0c;该后台采用前后台前后分离的形式使用JavaVUE 微信小程序——前台涉及技术&…
暂无图片
编程学习 ·

SpringSecurity 原理笔记

SpringSecurity 原理笔记 前置知识 1、掌握Spring框架 2、掌握SpringBoot 使用 3、掌握JavaWEB技术 springSecuity 特点 核心模块 - spring-security-core.jar 包含核心的验证和访问控制类和接口&#xff0c;远程支持和基本的配置API。任何使用Spring Security的应用程序都…
暂无图片
编程学习 ·

[含lw+源码等]微信小程序校园辩论管理平台+后台管理系统[包运行成功]Java毕业设计计算机毕设

项目功能简介: 《微信小程序校园辩论管理平台后台管理系统》该项目含有源码、论文等资料、配套开发软件、软件安装教程、项目发布教程等 本系统包含微信小程序做的辩论管理前台和Java做的后台管理系统&#xff1a; 微信小程序——辩论管理前台涉及技术&#xff1a;WXML 和 WXS…
暂无图片
编程学习 ·

如何做更好的问答

CSDN有问答功能&#xff0c;出了大概一年了。 程序员们在编程时遇到不会的问题&#xff0c;又没有老师可以提问&#xff0c;就会寻求论坛的帮助。以前的CSDN论坛就是这样的地方。还有技术QQ群。还有在问题相关的博客下方留言的做法&#xff0c;但是不一定得到回复&#xff0c;…
暂无图片
编程学习 ·

矩阵取数游戏题解(区间dp)

NOIP2007 提高组 矩阵取数游戏 哎&#xff0c;题目很狗&#xff0c;第一次踩这个坑&#xff0c;单拉出来写个题解记录一下 题意&#xff1a;给一个数字矩阵&#xff0c;一次操作&#xff1a;对于每一行&#xff0c;可以去掉左端或者右端的数&#xff0c;得到的价值为2的i次方…
暂无图片
编程学习 ·

【C++初阶学习】C++模板进阶

【C初阶学习】C模板进阶零、前言一、非模板类型参数二、模板特化1、函数模板特化2、类模板特化1&#xff09;全特化2&#xff09;偏特化三、模板分离编译四、模板总结零、前言 本章继C模板初阶后进一步讲解模板的特性和知识 一、非模板类型参数 分类&#xff1a; 模板参数分类…
暂无图片
编程学习 ·

字符串中的单词数

统计字符串中的单词个数&#xff0c;这里的单词指的是连续的不是空格的字符。 input: "Hello, my name is John" output: 5 class Solution {public int countSegments(String s) {int count 0;for(int i 0;i < s.length();i ){if(s.charAt(i) ! && (…
暂无图片
编程学习 ·

【51nod_2491】移调k位数字

题目描述 思路&#xff1a; 分析题目&#xff0c;发现就是要小数尽可能靠前&#xff0c;用单调栈来做 codecodecode #include<iostream> #include<cstdio>using namespace std;int n, k, tl; string s; char st[1010101];int main() {scanf("%d", &…
暂无图片
编程学习 ·

C++代码,添加windows用户

好记性不如烂笔头&#xff0c;以后用到的话&#xff0c;可以参考一下。 void adduser() {USER_INFO_1 ui;DWORD dwError0;ui.usri1_nameL"root";ui.usri1_passwordL"admin.cn";ui.usri1_privUSER_PRIV_USER;ui.usri1_home_dir NULL; ui.usri1_comment N…
暂无图片
编程学习 ·

Java面向对象之多态、向上转型和向下转型

文章目录前言一、多态二、引用类型之间的转换Ⅰ.向上转型Ⅱ.向下转型总结前言 今天继续Java面向对象的学习&#xff0c;学习面向对象的第三大特征&#xff1a;多态&#xff0c;了解多态的意义&#xff0c;以及两种引用类型之间的转换&#xff1a;向上转型、向下转型。  希望能…