Stomp

default

介绍

目的

实现轻量级的前后端消息订阅系统,简化面向用户的消息逻辑。

优点

  • 标准的消息传输协议和消息格式,有社区类库支持
  • 可以(选择性地)使用消息代理(如RabbitMQ、ActiveMQ和其他)来管理订阅和广播消息
  • 封装了 WebSocketHandler,更简单
  • 添加了Header,可以用于鉴权

说明

STOMP是一个基于框架的协议,其框架是以HTTP为模型的。

客户端可以使用 SENDSUBSCRIBE 命令来发送或订阅消息,以及描述消息内容和谁应该收到它的 destination header。这就实现了一个简单的发布-订阅机制,你可以用它来通过 broker 向其他连接的客户端发送消息,或者向服务器发送消息以请求执行某些工作。

消息结构:

1
2
3
4
5
COMMAND
header1:value1
header2:value2
 
Body

Spingboot 中 stomp 原理

当你使用Spring的STOMP支持时,Spring WebSocket 应用程序充当客户的STOMP broker。消息被路由到 @Controller 消息处理方法或简单的内存中 broker,该 broker 跟踪订阅并将消息广播给订阅用户。你也可以将Spring配置为与专门的STOMP broker(如RabbitMQ、ActiveMQ等)合作,进行消息的实际广播。在这种情况下,Spring维护与 broker 的TCP连接,向其转发消息,并将消息从它那里传递给连接的WebSocket客户端。因此,Spring web 应用可以依靠统一的基于HTTP的 security、通用验证和熟悉的编程模型来处理消息。

SpringBoot 实现 Stomp

StompConfig

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package com.leador.scd.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

@Configuration
@EnableWebSocketMessageBroker
public class StompConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/websocket")
                .setAllowedOriginPatterns("*")
                .withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        //表明在topic、queue、users这三个域上可以向客户端发消息。
        registry.enableSimpleBroker("/topic", "/queue", "/user");
        //客户端向服务端发起请求时,需要以/app为前缀。
        registry.setApplicationDestinationPrefixes("/app");
        //给指定用户发送一对一的消息前缀是/user/。
        registry.setUserDestinationPrefix("/user/");
    }

    /**
     * 定义用户入端通道拦截器
     *
     * @param registration
     */
    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.taskExecutor().corePoolSize(10)
                .maxPoolSize(20)
                .keepAliveSeconds(60);
        registration.interceptors(createUserInterceptor());
    }

    /**
     * 将自定义的客户端渠道拦截器加入IOC容器中
     *
     * @return
     */
    @Bean
    public UserChannelInterceptor createUserInterceptor() {
        return new UserChannelInterceptor();
    }
}

UserChannelInterceptor

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Slf4j
public class UserChannelInterceptor implements ChannelInterceptor {

    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
        if (StompCommand.CONNECT.equals(accessor.getCommand())) {
            String token = accessor.getNativeHeader("Authorization").get(0);
            //校验token
            ResultBean resultBean = TokenManageHandler.getInvokeStrategy().validateToken(token);
            if (!resultBean.getSuccess()) {
                throw new IllegalStateException("websocket 中 Authorization 无效");
            }
            UserDTO user = TokenManageHandler.getInvokeStrategy().getUserByToken(token);
            // 这里是针对 token 来管理的,意味着可以管理每一个登录设备。如果要针对用户进行管理,需要改为用户名
            accessor.setUser(new UserPrincipal(token));
            log.info("Stomp 用户【{}】上线,token:{}", user.getUserName(), token);
        } else if (StompCommand.DISCONNECT.equals(accessor.getCommand())) {
            String token = accessor.getUser().getName();
            log.info("Stomp 用户下线, token:{}", token);
        }
        return message;
    }
}

StompMsgDTO

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
@Data
@AllArgsConstructor
public class StompMsgDTO {

    private String topic;

    private Date time;

    private Object data;

}

StompService

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Slf4j
@Service
public class StompService {

    @Autowired
    private SimpMessageSendingOperations simpMessageSendingOperations;

    @Autowired
    private SimpUserRegistry simpUserRegistry;


    /**
     * 推送消息
     * 服务端推送消息--一对一
     * 单体服务
     * 客服端 订阅地址为/users/{username}/message
     *
     * @param token   代币
     * @param topic   话题
     * @param message 消息
     */
    public void pushMessage(String token, String topic, String message) {
        try {
            //根据用户名查询当前节点在线用户
            SimpUser simpUser = simpUserRegistry.getUser(token);
            if (null == simpUser) {
                return;
            }
            log.info("Stomp--服务端指定用户发送消息,to【{}】, msg:{}", simpUser.getName(), message);
            StompMsgDTO stompMsgDTO = new StompMsgDTO(topic, new Date(), message);
            simpMessageSendingOperations.convertAndSendToUser(token, "/message", JSON.toJSONString(stompMsgDTO));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 向所有人推送消息
     * 服务器端推送消息--广播
     * 客服端 订阅地址为/topic/message
     * 单体服务
     *
     * @param topic   话题
     * @param message 消息
     */
    public void pushMessageToAll(String topic, String message) {
        try {
            log.info("Stomp--服务端广播消息:{}", message);
            StompMsgDTO stompMsgDTO = new StompMsgDTO(topic, new Date(), message);
            simpMessageSendingOperations.convertAndSend("/topic/message", JSON.toJSONString(stompMsgDTO));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

StompController

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Slf4j
@Api(tags = "Stomp测试")
@RestController
@RequestMapping(value = "/stomp")
@RequiredArgsConstructor
public class StompController {

    private final StompService stompService;

    @PostMapping("/pushMessage")
    public ResultBean<Boolean> pushMessage(String token, String topic, String msg) {
        stompService.pushMessage(token, "topic", msg);
        return ResultBean.success(true);
    }

    @PostMapping("/pushMessageToAll")
    public ResultBean<Boolean> pushMessageToAll(String topic, String msg) {
        stompService.pushMessageToAll("topic", msg);
        return ResultBean.success(true);
    }

}

前端客户端实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
<!DOCTYPE html>
<html lang="en">

<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Document</title>

</head>

<body>

</body>
<script src="/sockjs.min.js"></script>
<script src="/stomp.min.js"></script>
<script>
    //websocket端点地址,如是网关,写成对应网关地址
    var socket = new SockJS('http://127.0.0.1:8888/websocket');
    stompClient = Stomp.over(socket);
    stompClient.connect({
        //传输token参数 进行用户鉴权
        Authorization: "95038206-e455-42e2-bfbe-8e02b5547913"
    }, function (frame) {
        console.log('Connected: ' + frame);
        //订阅公共频道
        stompClient.subscribe('/topic/message', function (greeting) {
            console.log(greeting.body);
        })
        //订阅个人频道,这里用的token【95038206-e455-42e2-bfbe-8e02b5547913】,如果是针对用户进行管理,需要改为用户名
        stompClient.subscribe('/user/95038206-e455-42e2-bfbe-8e02b5547913/message', function (greeting) {
            console.log(greeting.body);
        })

    });
</script>

</html>
Licensed under CC BY-NC-SA 4.0
Gear(夕照)的博客。记录开发、生活,以及一些不足为道的思考……