跳至主要內容

websocket

HeChuangJun约 1719 字大约 6 分钟

1. websocket与http的比较

  • WebSocket协议实现了浏览器与服务器的全双工通信,扩展了浏览器与服务端的通信功能,使服务端也能主动向客户端发送数据。JavaEE 7中出了JSR-356:Java API for WebSocket规范。不少Web容器,如Tomcat,Nginx,Jetty等都支持WebSocket。Tomcat从7.0.27开始支持 WebSocket,从7.0.47开始支持JSR-356,下面的Demo代码也是需要部署在Tomcat7.0.47(不包括)以上的版本才能运行
  • webscoket相比于http的优势主要有服务器能主动发信息给前端,发的信息是轻量级的,所以服务器的压力较少

2. 服务端

2.1. pom.xml

<dependencies>
	<dependency>
	<groupId>javax</groupId>
	<artifactId>javaee-api</artifactId>
	<version>7.0</version>
	</dependency>
</dependencies>

2.2. java

package com.junye.testwebsocket;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;

@ServerEndpoint("/websocket")
public class TestWebscoket {
	/**
	* @ServerEndpoint 注解是一个类层次的注解,它的功能主要是将目前的类定义成一个websocket服务器端,
	* 注解的值将被用于监听用户连接的终端访问URL地址,客户端可以通过这个URL来连接到WebSocket服务器端
	*/
	
	//与某个客户端的连接会话,需要通过它来给客户端发送数据
	private Session session;
	
	/**
	* 连接建立成功调用的方法
	* @param session  可选的参数。
	*	session为与某个客户端的连接会话,需要通过它来给客户端发送数据
	*/
	@OnOpen
	public void onOpen(Session session){
		this.session = session;
	}

	/**
	* 连接关闭调用的方法
	*/
	@OnClose
	public void onClose(){
		System.out.println("连接关闭");
	}

	/**
	* 收到客户端消息后调用的方法
	* @param message 客户端发送过来的消息
	* @param session 可选的参数
	*/
	@OnMessage
	public void onMessage(String message, Session session) {
		System.out.println("来自客户端的消息:" + message);
	}

	/**
	* 发生错误时调用
	* @param session
	* @param error
	*/
	@OnError
	public void onError(Session session, Throwable error){
		System.out.println("发生错误");
		error.printStackTrace();
	}

	/**
	* 这个方法与上面几个方法不一样。没有用注解,是根据自己需要添加的方法。
	* @param message
	* @throws IOException
	*/
	public void sendMessage(String message) throws IOException{
		this.session.getBasicRemote().sendText(message);
		//this.session.getAsyncRemote().sendText(message);
	}
}

3. springboot整合websocket

3.1. pom.xml

<dependencies>
	<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-websocket</artifactId>
	</dependency>
</dependencies>

3.2. WebSocketConfig、WebSocketInterceptor、MyWebSocketHandler

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
  @Bean
  public TextWebSocketHandler myWebSocketHandler() {
    return new MyWebSocketHandler();
  }
  @Override
  public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
    registry.addHandler(myWebSocketHandler(), "/myweb/socket").addInterceptors(new WebSocketInterceptor()).setAllowedOrigins("*");//https://www.cnblogs.com/exmyth/p/11720371.html  
    //registry.addHandler(myWebSocketHandler(), "/myweb/sockjs").addInterceptors(new WebSocketInterceptor()).withSockJS();  
  }
  @Bean
  public TaskScheduler taskScheduler() {//避免找不到TaskScheduler Bean
    ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
    taskScheduler.setPoolSize(10);
    taskScheduler.initialize();
    return taskScheduler;
  }
}
public class WebSocketInterceptor extends HttpSessionHandshakeInterceptor {
  @Override
  public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
    String channel = ((ServletServerHttpRequest)request).getServletRequest().getParameter("ch");
    attributes.put("channel", channel);//传参
    return super.beforeHandshake(request, response, wsHandler, attributes);
  }
  @Override
  public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception ex) {
    super.afterHandshake(request, response, wsHandler, ex);
  }
}
@Slf4j
public class MyWebSocketHandler extends TextWebSocketHandler{
  @Autowired MyWebSocketService myWebSocketService;//注入需要的Service
  @Override
  public void afterConnectionEstablished(WebSocketSession session) throws Exception {
    String channel = (String)session.getAttributes().get("channel");//获取参数
    //记下session和参数用于下一步发消息...
  }
  @Override
  public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
    String channel = (String)session.getAttributes().get("channel");
    //做会话关闭后的处理...
  }
  @Override
  protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
    log.debug("receive text message: " + message.getPayload());
    //收到消息的处理...
  }
  public void send(WebSocketSession session, String text) {
    try {
      TextMessage message = new TextMessage(text);
      session.sendMessage(message);//发送消息的方法
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}

3.3. SseEmitter

  • Controller方法返回SseEmitter对象即可为客户端提供EventSource
private static Set<SseEmitter> emitters = new HashSet<>();
@RequestMapping("/myweb/eventsource")
@ResponseBody
SseEmitter eventSource(String ch) {
	SseEmitter emitter = new SseEmitter(0L);
	emitters.put(emitter);//记下emitter用于之后发送数据
	emitter.onCompletion(() -> {
		emitters.remove(emitter);//做连接关闭后的处理(ch, emitter)...
	});
	emitter.onTimeout(() -> {
		emitter.complete();
	});
	emitter.onError((e) -> {
		emitter.completeWithError(e);
	});
	return emitter;
	}
	向所有的emitters发送数据text

	SseEventBuilder builder = SseEmitter.event().data(text);
	emitters.forEach(emitter -> {
	try {
		emitter.send(builder);
	} catch (Exception e) {
		errors.add(emitter);
	}
});

4. 客户端连接 前端js对象WebSocket和EventSource分别用于连接这两种服务。

<!DOCTYPE html>
<html>
	<head>
	<title>Java后端WebSocket的Tomcat实现</title>
	</head>
<body>
	Welcome<br/><input id="text" type="text"/>
	<button onclick="send()">发送消息</button>
	<hr/>
	<button onclick="closeWebSocket()">关闭WebSocket连接</button>
	<hr/>
	<button onclick="onclear()">清空</button>
	<hr/>
	<div id="message"></div>
</body>

<script type="text/javascript">
     var websocket = null;
     //判断当前浏览器是否支持WebSocket
     if ('WebSocket' in window) {
         websocket = new WebSocket("ws://localhost:8080/testwebscoket/websocket");
     }
    else {
         alert('当前浏览器 Not support websocket')
    }
     //连接发生错误的回调方法
     websocket.onerror = function () {
         setMessageInnerHTML("WebSocket连接发生错误");
    };
 
     //连接成功建立的回调方法
     websocket.onopen = function () {
         setMessageInnerHTML("WebSocket连接成功");
     }
 
     //接收到消息的回调方法
     websocket.onmessage = function (event) {
         setMessageInnerHTML(event.data);
     }
 
     //连接关闭的回调方法
     websocket.onclose = function () {
         setMessageInnerHTML("WebSocket连接关闭");
     }
 
     //监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,
	 //防止连接还没断开就关闭窗口,server端会抛异常。
     window.onbeforeunload = function () {
         closeWebSocket();
     }
 
     //将消息显示在网页上
     function setMessageInnerHTML(innerHTML) {
         document.getElementById('message').innerHTML += innerHTML + '<br/>';
    }
 
    //关闭WebSocket连接
     function closeWebSocket() {
         websocket.close();
     }
 
     //发送消息
     function send() {
         var message = document.getElementById('text').value;
         websocket.send(message);
     }
	 function onclear(){
        document.getElementById('message').innerHTML = "";
    }
	</script>
</html>

5. Nginx需要的额外配置

5.1. EventSource

proxy_http_version 1.1;
proxy_set_header Connection '';
proxy_buffering off;
proxy_cache off;
gzip off;
chunked_transfer_encoding off;

5.2. WebSocket

proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "Upgrade";

6. 已知问题

  • 火狐下EventSource中断之后不会自动重连。
  • IE系列浏览器都不支持EventSource。

7. 获得httpssession的方法

  • 由于WebSocket与Http协议的不同,故在使用常用的HttpSession方面就存在了一些问题。

7.1. GetHttpSessionConfigurator

package per.zww.web;
import javax.servlet.http.HttpSession;
import javax.websocket.HandshakeResponse;
import javax.websocket.server.HandshakeRequest;
import javax.websocket.server.ServerEndpointConfig;
import javax.websocket.server.ServerEndpointConfig.Configurator;
/*
* 获取HttpSession
* 
*/
public class GetHttpSessionConfigurator extends Configurator {
	@Override
	public void modifyHandshake(ServerEndpointConfig sec,HandshakeRequest request, HandshakeResponse response) {
		// TODO Auto-generated method stub
		HttpSession httpSession=(HttpSession) request.getHttpSession();
		sec.getUserProperties().put(HttpSession.class.getName(),httpSession);
	}
}

7.2. 在@ServerEndpoint注解里面添加configurator属性

  • @ServerEndpoint(value="/socketTest",configurator=GetHttpSessionConfigurator.class)

7.3. 在onOpen方法里加入参数EndpointConfig config即可获取HttpSession

@OnOpen
public void onOpen(Session session,EndpointConfig config) {
	HttpSession httpSession= (HttpSession) config.getUserProperties().get(HttpSession.class.getName());
	if (httpSession == null){  
		httpSession = new HttpSession() {  
			@Override  
			public long getCreationTime() {  return 0;  }  

			@Override  
			public String getId() {  return null;  }  

			@Override  
			public long getLastAccessedTime() {  return 0;  }  

			@Override  
			public ServletContext getServletContext() {  return null;  }  

			@Override  
			public void setMaxInactiveInterval(int i) {}  

			@Override  
			public int getMaxInactiveInterval() {  return 0;  }  

			@Override  
			public HttpSessionContext getSessionContext() {  return null;  }  

			@Override  
			public Object getAttribute(String s) {  return null;  }  

			@Override  
			public Object getValue(String s) {  return null;  }  

			@Override  
			public Enumeration<String> getAttributeNames() {  return null;  }  

			@Override  
			public String[] getValueNames() {  return new String[0];  }  

			@Override  
			public void setAttribute(String s, Object o) {}  

			@Override  
			public void putValue(String s, Object o) {}  

			@Override  
			public void removeAttribute(String s) {}  

			@Override  
			public void removeValue(String s) {}  

			@Override  
			public void invalidate() {}  

			@Override  
			public boolean isNew() {  return false; }  
		};  
	}
	sessionMap.put(session.getId(), session);
}