利用Stompjs与Mq通信记录

近期遇到需要前端直接与Rabbitmq创建连接实现一个聊天框的功能实现,Rabbitmq官网提供http/web-stomp协议应用于web端与Rabbitmq通讯,下面前端开发围绕着stomp协议展开。 官网文档参考 RabbitMQ Web STOMP Plugin — RabbitMQ

实时通信的三种解决办法:

  1. Ajax轮询:此技术通过设置浏览器周期性地向服务器发送请求,以检查是否有新信息更新。
  2. HTTP长轮询:类似于Ajax轮询,但采用了阻塞模式,即客户端连接后若无新消息则保持连接不返回响应,直到接收到新消息后才返回响应,随后客户端重新建立连接,循环往复。
  3. WebSocket:作为HTML5的一部分,它允许通过单一的TCP连接实现全双工通信。WebSocket仅需一次HTTP握手,即可建立起浏览器与服务器之间的快速通道,实现数据的实时双向传输,无需频繁的查询和等待。相比之下,Ajax轮询依赖于服务器的高速处理能力和资源,而HTTP长轮询则要求服务器具备高并发处理能力,WebSocket则在资源消耗上更为高效。

STOMP

STOMP(Simple Text-Orientated Messaging Protocol) 面向消息的简单文本协议; WebSocket是一个消息架构,不强制使用任何特定的消息协议,它依赖于应用层解释消息的含义. 与HTTP不同,WebSocket是处在TCP上非常薄的一层,会将字节流转化为文本/二进制消息,因此,对于实际应用来说,WebSocket的通信形式层级过低,因此,可以在 WebSocket 之上使用STOMP协议,来为浏览器 和 server间的 通信增加适当的消息语义。

STOMP与WebSocket 的关系

  • 超文本传输协议(HTTP)规范了网络浏览器如何发起请求以及网络服务器如何响应请求的具体机制。倘若HTTP这一协议不复存在,开发者将被迫采用TCP套接字这一基础通信手段来实现网络应用的构建,这一设想无疑会引发技术实现的复杂性考量。

  • WebSocket(结合SockJS)技术,虽然在功能上提供了类似于直接利用TCP套接字的通信能力,但由于缺乏高级协议的支持,开发者需自行定义应用之间消息传递的语义,并确保通信双方严格遵守这些自定义的语义标准。

  • 类似地,正如HTTP在TCP套接字之上建立了请求-响应模型层,简单文本导向消息协议(STOMP)在WebSocket的基础上引入了一个基于帧的线路格式层,该层旨在为消息交换提供明确的语义定义,从而增强了通信的规范性和互操作性。

代码实现

建立 stomp文件夹,下有 index.jsgolbal.js,具体内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
// index.js
import Stomp from 'stompjs'
import { RMQ_SERVER, RMQ_VIRTUAL_HOST, RMQ_ACCOUNT, RMQ_PASSWORD } from './golbal'
import { getCurrentContext } from '@/utils/auth'
import { getUUID } from '@/utils'

// tips
// 发送消息
// 客户端向服务端发送消息利用send()方法,此方法有三个参数:第一个参数(string)必需,为发送消息的目的地;第二个参数(object)可选,包含了额外的头部信息;第三个参数(string)可选,为发送的消息。
// client.send(destination, {}, body)

// 订阅消息
// 订阅消息也就是客户端接收服务端发送的消息,订阅消息可以利用subscribe()方法,此方法有三个参数:第一个参数(string)必需,为接收消息的目的地;第二个参数必需为回调函数;第三个参数{object}为可选,包含额外的头部信息。
// client.subscribe(destination, callback, {})

// 取消订阅消息可以利用unsubscribe()方法:
// let mySubscribe = client.subscribe
// mySubscribe.unsubscribe()

// 客户端订阅消息可以订阅广播,如下所示:
// client.subscribe('/topic/msg',function(messages){
// console.log(messages)
// })

// 也可以进行一对一消息的接收:
// 第一种方式
// const userId = 666;
// client.subscribe('/user/' + userId + '/msg',function(messages){
// console.log(messages)
// })
// 第二种方式
// client.subscribe('/msg',function(messages){
// console.log(messages)
// }, {"userId ": userId })

class StompClient {
constructor(rmqServer, rmqVirtualHost, rmqAccount, rmqPassword) {
this.queueName = '' // 队列名
this.client = null
this.frame = null
this.options = {
vhost: rmqVirtualHost,
incoming: 10000, // 接收频率,默认10000ms
outgoing: 10000, // 发送频率,默认10000ms
account: rmqAccount,
pwd: rmqPassword,
server: `ws://${rmqServer}/ws`,
message: '' // mq返回消息
}
}

// routerKey
getQueueName() {
const userInfo = getCurrentContext() || {}
this.queueName = userInfo.tokenUserName + getUUID() || ''
}

connect(options) {
this.options = { ...this.options, ...options }
const mqUrl = this.options.server
const ws = new WebSocket(mqUrl)
ws.onclose = (close) => {
console.log('webSocket关闭', close)
}
ws.onerror = (error) => {
console.log('webSocket异常', error)
}
ws.onopen = (success) => {
console.log('webSocket连接成功', success)
}
this.client = Stomp.over(ws)
this.client.heartbeat.incoming = this.options.incoming
this.client.heartbeat.outgoing = this.options.outgoing
this.client.debug = null // 关闭控制台调试
// mq连接
const onConnect = async () => {
console.log('stomp 连接成功!')
// 订阅消息
this.subscribe()
}
// mq错误重新进行连接
const onError = (errorMsg) => {
console.error(`stomp 断开连接,正在准备重新连接...`, errorMsg)
this.connect(this.options)
}
// 连接
this.client.connect(
this.options.account, // 用户名
this.options.pwd, // 密码
onConnect,
onError,
this.options.vhost
)
}
// 消息订阅
subscribe() {
this.getQueueName()
if (!this.queueName) {
return
}
this.client.subscribe(
`/exchange/amq.direct/${this.queueName}`,
this.onMessage.bind(this),
this.onSubscribeFailed.bind(this),
{
'auto-delete': 'true'
}
)
}
// 关闭mq连接
closeConnect() {
this.client.disconnect(() => {
console.log('已关闭mq连接')
})
}
onMessage(frame) {
const data = frame && JSON.parse(frame.body)
console.log('data :>> ', data.index)
this.options.message = data
// console.log('data :>> ', data)
}

onSubscribeFailed(frame) {
console.log('rabbitmq subscribe failed')
}
}
export default new StompClient(RMQ_SERVER, RMQ_VIRTUAL_HOST, RMQ_ACCOUNT, RMQ_PASSWORD)

1
2
3
4
5
// golbal.js
export const RMQ_SERVER = '10.0.20.188:15674' // mq服务地址
export const RMQ_VIRTUAL_HOST = 'gt' // 虚拟主机
export const RMQ_ACCOUNT = 'lggt' // 用户名
export const RMQ_PASSWORD = 'lggt' // 密码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
<template>
<div @click="init">
</div>
</template>
<script>
import StompClient from './stomp/index.js'

export default {
data() {
return {
opt: '',
fileDetail: {}
}
},
watch: {
opt: {
deep: true,
immediate: true,
handler(newVal) {
this.fileDetail = (newVal && newVal.message) || {}
// console.log('fileDetail :>> ', this.fileDetail)
}
}
},
mounted() {
StompClient.connect()
this.opt = StompClient.options
},
beforeDestroy() {
StompClient.closeConnect()
},
methods: {
// 点击动作
init() {
if (StompClient && StompClient.queueName) {
// 队列名传值给后端
const params = {}
params.routingKey = StompClient.queueName
}
url(params).then()
}
}
}
</script>
<style lang="scss" scoped></style>

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!