介绍
目的
实现轻量级的前后端消息订阅系统,简化面向用户的消息逻辑。
优点
- 标准的消息传输协议和消息格式,有社区类库支持
- 可以(选择性地)使用消息代理(如RabbitMQ、ActiveMQ和其他)来管理订阅和广播消息
- 封装了 WebSocketHandler,更简单
- 添加了Header,可以用于鉴权
说明
STOMP是一个基于框架的协议,其框架是以HTTP为模型的。
客户端可以使用 SEND 或 SUBSCRIBE 命令来发送或订阅消息,以及描述消息内容和谁应该收到它的 destination header。这就实现了一个简单的发布-订阅机制,你可以用它来通过 broker 向其他连接的客户端发送消息,或者向服务器发送消息以请求执行某些工作。
消息结构:
1
2
3
4
5
|
COMMAND
header1:value1
header2:value2
Body
|
Spingboot 中 stomp 原理
当你使用Spring的STOMP支持时,Spring WebSocket 应用程序充当客户的STOMP broker。消息被路由到 @Controller 消息处理方法或简单的内存中 broker,该 broker 跟踪订阅并将消息广播给订阅用户。你也可以将Spring配置为与专门的STOMP broker(如RabbitMQ、ActiveMQ等)合作,进行消息的实际广播。在这种情况下,Spring维护与 broker 的TCP连接,向其转发消息,并将消息从它那里传递给连接的WebSocket客户端。因此,Spring web 应用可以依靠统一的基于HTTP的 security、通用验证和熟悉的编程模型来处理消息。
SpringBoot 实现 Stomp
StompConfig
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
package com.leador.scd.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
@Configuration
@EnableWebSocketMessageBroker
public class StompConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/websocket")
.setAllowedOriginPatterns("*")
.withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
//表明在topic、queue、users这三个域上可以向客户端发消息。
registry.enableSimpleBroker("/topic", "/queue", "/user");
//客户端向服务端发起请求时,需要以/app为前缀。
registry.setApplicationDestinationPrefixes("/app");
//给指定用户发送一对一的消息前缀是/user/。
registry.setUserDestinationPrefix("/user/");
}
/**
* 定义用户入端通道拦截器
*
* @param registration
*/
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.taskExecutor().corePoolSize(10)
.maxPoolSize(20)
.keepAliveSeconds(60);
registration.interceptors(createUserInterceptor());
}
/**
* 将自定义的客户端渠道拦截器加入IOC容器中
*
* @return
*/
@Bean
public UserChannelInterceptor createUserInterceptor() {
return new UserChannelInterceptor();
}
}
|
UserChannelInterceptor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
@Slf4j
public class UserChannelInterceptor implements ChannelInterceptor {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (StompCommand.CONNECT.equals(accessor.getCommand())) {
String token = accessor.getNativeHeader("Authorization").get(0);
//校验token
ResultBean resultBean = TokenManageHandler.getInvokeStrategy().validateToken(token);
if (!resultBean.getSuccess()) {
throw new IllegalStateException("websocket 中 Authorization 无效");
}
UserDTO user = TokenManageHandler.getInvokeStrategy().getUserByToken(token);
// 这里是针对 token 来管理的,意味着可以管理每一个登录设备。如果要针对用户进行管理,需要改为用户名
accessor.setUser(new UserPrincipal(token));
log.info("Stomp 用户【{}】上线,token:{}", user.getUserName(), token);
} else if (StompCommand.DISCONNECT.equals(accessor.getCommand())) {
String token = accessor.getUser().getName();
log.info("Stomp 用户下线, token:{}", token);
}
return message;
}
}
|
StompMsgDTO
1
2
3
4
5
6
7
8
9
10
11
|
@Data
@AllArgsConstructor
public class StompMsgDTO {
private String topic;
private Date time;
private Object data;
}
|
StompService
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
@Slf4j
@Service
public class StompService {
@Autowired
private SimpMessageSendingOperations simpMessageSendingOperations;
@Autowired
private SimpUserRegistry simpUserRegistry;
/**
* 推送消息
* 服务端推送消息--一对一
* 单体服务
* 客服端 订阅地址为/users/{username}/message
*
* @param token 代币
* @param topic 话题
* @param message 消息
*/
public void pushMessage(String token, String topic, String message) {
try {
//根据用户名查询当前节点在线用户
SimpUser simpUser = simpUserRegistry.getUser(token);
if (null == simpUser) {
return;
}
log.info("Stomp--服务端指定用户发送消息,to【{}】, msg:{}", simpUser.getName(), message);
StompMsgDTO stompMsgDTO = new StompMsgDTO(topic, new Date(), message);
simpMessageSendingOperations.convertAndSendToUser(token, "/message", JSON.toJSONString(stompMsgDTO));
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 向所有人推送消息
* 服务器端推送消息--广播
* 客服端 订阅地址为/topic/message
* 单体服务
*
* @param topic 话题
* @param message 消息
*/
public void pushMessageToAll(String topic, String message) {
try {
log.info("Stomp--服务端广播消息:{}", message);
StompMsgDTO stompMsgDTO = new StompMsgDTO(topic, new Date(), message);
simpMessageSendingOperations.convertAndSend("/topic/message", JSON.toJSONString(stompMsgDTO));
} catch (Exception e) {
e.printStackTrace();
}
}
}
|
StompController
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
@Slf4j
@Api(tags = "Stomp测试")
@RestController
@RequestMapping(value = "/stomp")
@RequiredArgsConstructor
public class StompController {
private final StompService stompService;
@PostMapping("/pushMessage")
public ResultBean<Boolean> pushMessage(String token, String topic, String msg) {
stompService.pushMessage(token, "topic", msg);
return ResultBean.success(true);
}
@PostMapping("/pushMessageToAll")
public ResultBean<Boolean> pushMessageToAll(String topic, String msg) {
stompService.pushMessageToAll("topic", msg);
return ResultBean.success(true);
}
}
|
前端客户端实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Document</title>
</head>
<body>
</body>
<script src="/sockjs.min.js"></script>
<script src="/stomp.min.js"></script>
<script>
//websocket端点地址,如是网关,写成对应网关地址
var socket = new SockJS('http://127.0.0.1:8888/websocket');
stompClient = Stomp.over(socket);
stompClient.connect({
//传输token参数 进行用户鉴权
Authorization: "95038206-e455-42e2-bfbe-8e02b5547913"
}, function (frame) {
console.log('Connected: ' + frame);
//订阅公共频道
stompClient.subscribe('/topic/message', function (greeting) {
console.log(greeting.body);
})
//订阅个人频道,这里用的token【95038206-e455-42e2-bfbe-8e02b5547913】,如果是针对用户进行管理,需要改为用户名
stompClient.subscribe('/user/95038206-e455-42e2-bfbe-8e02b5547913/message', function (greeting) {
console.log(greeting.body);
})
});
</script>
</html>
|