kafka和websocket实时数据推送
需求
已有Kafka服务,通过kafka服务数据(GPS)落地到本地磁盘(以文本文件存储)。现要根据echarts实现一个实时车辆的地图。
分析
- 前端实时展现:使用websocket技术,实现服务器端数据推送到前端展现
- 通过Java的kafka client端获取数据,并且通过websock推送到前端。
websocket
简介
websocket是HTML5开始提供的一种在单位TCP连接上进行全双工通讯的协议。在websocket api中,浏览器和服务器只需要做一次握手的动作,然后浏览器和服务器之间就形成了一条快速通道。两者之间就可以数据互相传送。
开发
- 服务器端
|
|
前端
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566<html><head><title>Java后端WebSocket的Tomcat实现</title></head><body>Welcome<br/><input id="text" type="text"/><button onclick="send()">发送消息</button><hr/><button onclick="closeWebSocket()">关闭WebSocket连接</button><hr/><div id="message"></div></body><script type="text/javascript">var websocket = null;//判断当前浏览器是否支持WebSocketif ('WebSocket' in window) {websocket = new WebSocket("ws://localhost:8081/onepic/websocket");}else {alert('当前浏览器 Not support websocket')}//连接发生错误的回调方法websocket.onerror = function () {setMessageInnerHTML("WebSocket连接发生错误");};//连接成功建立的回调方法websocket.onopen = function () {setMessageInnerHTML("WebSocket连接成功");}//接收到消息的回调方法websocket.onmessage = function (event) {setMessageInnerHTML(event.data);}//连接关闭的回调方法websocket.onclose = function () {setMessageInnerHTML("WebSocket连接关闭");}//监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。window.onbeforeunload = function () {closeWebSocket();}//将消息显示在网页上function setMessageInnerHTML(innerHTML) {document.getElementById('message').innerHTML += innerHTML + '<br/>';}//关闭WebSocket连接function closeWebSocket() {websocket.close();}//发送消息function send() {var message = document.getElementById('text').value;websocket.send(message);}</script></html>
测试
注意点
webSocketSet
设置为全局静态变量,为其他类提供调用1public static CopyOnWriteArraySet<CommodityServer> webSocketSet = new CopyOnWriteArraySet<CommodityServer>();服务端都是用注解实现的 @ServerEndpoint @OnOpen @OnClose @OnMessage @OnError
Tomcat7.0.47以上版本支持websocket1.0
pom中添加jar支持
123456<dependency><groupId>javax</groupId><artifactId>javaee-api</artifactId><version>7.0</version><scope>provided</scope></dependency>
Kafka
简介
Kafka是一个分布式的、可分区的、可复制的消息系统。
开发
window系统搭建kafka环境,请参照我的《 kafka环境搭建(windows) 》笔记
- kafka client for java
|
|
注意topic 和 bootstrap.servers配置
- 调用类
|
|
- web.xml 配置
|
|
测试
注意点
- ConsumerKafka 需要在web.xml中配置监听,不然在ConsumerKafka 类中不能获取webSocketSet变量
- 引用webSocketSet变量方式
import static com.ykkj.weiyi.socket.CommodityServer.webSocketSet;
- 注意topic 和 bootstrap.servers配置
结束语
这里只是做了技术验证,还没有真正实现。真实场景下,kafka获取到的数据需要进行数据清洗,把不符合当前规范的数据清洗掉,并且按照前端展示需要的格式组装数据。