欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

利用javacv解析rtsp流,通过websocket将视频帧传输到web前端显示成视频

程序员文章站 1970-01-01 07:57:30
...

javacv拉取rtsp流通过websocket传输到web前端显示

说明一下,我这里只是介绍一下如何实现的一个小demo,因为我做的这个rtsp解析主要是一个测试工具,简单说一下需求,其他的服务器为我提供了了一个rtsp流,我要做的就是将这个视频流解析在web端进行展示,我的这个需求很简单,只是作为一个测试工具来说,并不是商业版本,所以为了追求效率,并且也不会遇到什么高并发的情况,只是提供这样一种思路做出快速开发满足于业务需求。其实这就相当于直播的感觉,因为我获取的rtsp流都是通过监控摄像头获取的实时数据,所以我这边是要去解析rtsp流的,至于为什么是rtsp,就要问提供流的为什么是rtsp呢?为了满足需求,想过几种方案,纯前端直接解析rtsp,但是需要组件,而且查询资料后发现这些插件好像并不好使,不是这个不兼容就这那个不兼容,要不是就是需要浏览器去安装插件,太麻烦。后端解析,前端显示呢,大部分的方案都说是将rtsp转换成rtmp流,然后通过前端组件,比如video.js这种去重新解析,还要通过nginx去搭建服务武器,太麻烦,我最不想要的就是搭建各种环境,实际条件也不允许这么麻烦还要搭建环境,最快捷的方式就是写的这个项目,最好直接就能跑起来,所以,我就想到了视频反正都是一帧一帧的图片,不如前端就直接不停的刷新图片得了,这不就是视频了么,再说一下,我这个是监控视频显示,对音频没要求,所以我的处理是没处理音频的,这里只是做视频显示,说了这么多,下面开始正题吧。项目地址:https://github.com/001ItSky/videoservice

1、首先我们需要引入相关的jar包,javacv和websocket的,pom文件如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.de</groupId>
    <artifactId>videoservice</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>videoservice</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <commons.io.version>2.5</commons.io.version>
        <commons.fileupload.version>1.3.3</commons.fileupload.version>
        <hutool.version>4.6.4</hutool.version>
        <fastjson.version>1.2.47</fastjson.version>
        <lang3.version>3.9</lang3.version>
        <jsckson.version>2.10.3</jsckson.version>
        <javacv.version>1.5.1</javacv.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-thymeleaf</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>${commons.io.version}</version>
        </dependency>
        <!--文件上传工具类 -->
        <dependency>
            <groupId>commons-fileupload</groupId>
            <artifactId>commons-fileupload</artifactId>
            <version>${commons.fileupload.version}</version>
        </dependency>
        <!-- 阿里JSON解析器 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>
        <!--好用的工具集-->
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>${hutool.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>${lang3.version}</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>${jackson.version}</version>
        </dependency>
        <dependency>
            <groupId>org.bytedeco</groupId>
            <artifactId>javacv-platform</artifactId>
            <version>${javacv.version}</version>
            <type>pom</type>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

2、需要配置websocket,开启websocket支持

package com.de.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 *  
 *  * @projectName videoservice
 *  * @title     WebSocketConfig   
 *  * @package    com.de.config  
 *  * @description    开启websocket支持
 *  * @author IT_CREAT     
 *  * @date  2020 2020/4/12 0012 下午 16:39  
 *  * @version V1.0.0 
 *  
 */
@Configuration
public class WebSocketConfig {

    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

3、编写websocket服务

注意,因为websocket用的是tomcat的,websoket发送的数据有三种数据类型,string类型,bytebuffer类型,object类型,但是说白了其实就只有两种,也就是string和bytebuffer,对象是要在底层经行编码转换的,所以,对于对象而言我们先要写个编码器,不然到时候发送的时候会报解析失败的错误。

 1)存放图片数据的实体,因为我们要想前端发送数据的是图片数据

package com.de.entity;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;

/**
 *  
 *  * @projectName videoservice
 *  * @title     Image   
 *  * @package    com.de.entity  
 *  * @description    存放图片数据的实体 
 *  * @author IT_CREAT     
 *  * @date  2020 2020/4/18 0018 下午 22:54  
 *  * @version V1.0.0 
 *  
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
@Accessors(chain = true)
public class Image {
    private byte[] imageByte;
}

2)转吗器(转码器的作用就是将数据转成json字符串返回),发送时websocket会自行调用,这里要用到了AjaxResult,这是自己写的同前端交互传输的统一对象。后面会贴出来

package com.de.entity;

import cn.hutool.core.codec.Base64;
import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.ArrayUtils;

import javax.websocket.EncodeException;
import javax.websocket.Encoder;
import javax.websocket.EndpointConfig;

/**
 *  
 *  * @projectName videoservice
 *  * @title     ImageEncoder   
 *  * @package    com.de.entity  
 *  * @description    websocket 传输对象转码 
 *  * @author IT_CREAT     
 *  * @date  2020 2020/4/18 0018 下午 22:53  
 *  * @version V1.0.0 
 *  
 */
public class ImageEncoder implements Encoder.Text<Image> {

    @Override
    public String encode(Image image) throws EncodeException {
        if(image != null && !ArrayUtils.isEmpty(image.getImageByte())){
            String base64Image= Base64.encode(image.getImageByte());
            return JSON.toJSONString(new AjaxResult(AjaxResult.Type.SUCCESS_IMG_BYTE,"获取视频帧成功",base64Image));
        }
        return JSON.toJSONString(AjaxResult.error("获取视频帧失败"));
    }

    @Override
    public void init(EndpointConfig endpointConfig) {

    }

    @Override
    public void destroy() {

    }
}

3)websocket服务编写,记得一定要指明添加编码器,ServerEndpoint注解的encoders 属性指明

package com.de.service;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.de.entity.AjaxResult;
import com.de.entity.ImageEncoder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;

/**
 *  
 *  * @projectName videoservice
 *  * @title     WebSocketServer   
 *  * @package    com.de.service  
 *  * @description    websocket服务
 *  * @author IT_CREAT     
 *  * @date  2020 2020/4/12 0012 下午 16:41  
 *  * @version V1.0.0 
 *  
 */
@ServerEndpoint(value = "/webSocketService", encoders = {ImageEncoder.class})
@Component
@Slf4j
public class WebSocketServer {
    /**
     * 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
     */
    private static int onlineCount = 0;
    /**
     * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
     */
    private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
    /**
     * 与某个客户端的连接会话,需要通过它来给客户端发送数据
     */
    private Session session;
    /**
     * 接收userId
     */
    private String userId = "";

    /**
     * 连接建立成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session) {
        this.session = session;
        this.userId = session.getId();
        if (webSocketMap.containsKey(userId)) {
            webSocketMap.remove(userId);
            webSocketMap.put(userId, this);
            //加入set中
        } else {
            webSocketMap.put(userId, this);
            //加入set中
            addOnlineCount();
            //在线数加1
        }
        log.info("用户连接:" + userId + ",当前在线人数为:" + getOnlineCount());
        HashMap<String, String> data = new HashMap<>();
        data.put("userId", userId);
        sendMessageByStr(JSON.toJSONString(AjaxResult.success("连接成功", data)));
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        if (webSocketMap.containsKey(userId)) {
            webSocketMap.remove(userId);
            //从set中删除
            subOnlineCount();
        }
        log.info("用户退出:" + userId + ",当前在线人数为:" + getOnlineCount());
    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @param message 客户端发送过来的消息,必须是json串
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("用户消息:" + userId + ",报文:" + message);
        //可以群发消息
        //消息保存到数据库、redis
        if (StringUtils.isNotBlank(message)) {
            //解析发送的报文
            JSONObject jsonObject = JSON.parseObject(message);
            //追加发送人(防止串改)
            if (jsonObject != null) {
                jsonObject.put("fromUserId", this.userId);
                String toUserId = jsonObject.getString("toUserId");
                //传送给对应toUserId用户的websocket
                if (StringUtils.isNotBlank(toUserId) && webSocketMap.containsKey(toUserId)) {
                    webSocketMap.get(toUserId).sendMessageByStr(jsonObject.toJSONString());
                } else {
                    log.error("请求的userId:" + toUserId + "不在该服务器上");
                    //否则不在这个服务器上,发送到mysql或者redis
                }
            }
        }
    }

    /**
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("用户错误:" + this.userId + ",原因:" + error.getMessage());
        log.error("websocket error: ", error);
    }

    public void sendMessageByStr(String message) {
        if (StringUtils.isNotBlank(message)) {
            try {
                if (this.session.isOpen()) {
                    this.session.getBasicRemote().sendText(message);
                }
            } catch (IOException e) {
                log.error("发送到用户:" + this.userId + "信息失败 ,信息是:" + message);
                log.error("websocket send str msg exception: ", e);
            }
        }
    }

    public void sendMessageByObject(Object message) {
        if (message != null) {
            try {
                this.session.getBasicRemote().sendObject(message);
            } catch (IOException | EncodeException e) {
                log.error("发送到用户:" + this.userId + "信息失败 ,信息是:" + message);
                log.error("websocket send object msg exception: ", e);
            }
        }
    }

    public void sendBinary(ByteBuffer message) {
        if (message != null) {
            try {
                this.session.getBasicRemote().sendBinary(message);
            } catch (IOException e) {
                log.error("发送到用户:" + this.userId + "信息失败 ,信息是:" + message);
                log.error("websocket send byteBuffer msg exception: ", e);
            }
        }
    }

    /**
     * 发送自定义消息
     */
    public static void sendInfo(String message, String userId) {
        log.info("发送消息到:" + userId + ",报文:" + message);
        if (StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)) {
            webSocketMap.get(userId).sendMessageByStr(message);
        } else {
            log.error("用户" + userId + ",不在线!");
        }
    }

    /**
     * 向所有的客户端发送消息
     *
     * @param byteBuffer byteBuffer
     * @throws IOException IOException
     */
    public static void sendAllByBinary(ByteBuffer byteBuffer) {
        if (!webSocketMap.isEmpty()) {
            Collection<WebSocketServer> values = webSocketMap.values();
            for (WebSocketServer next : values) {
                next.sendBinary(byteBuffer);
            }
        }
    }

    public static void sendAllByObject(Object message) {
        if (!webSocketMap.isEmpty()) {
            Collection<WebSocketServer> values = webSocketMap.values();
            for (WebSocketServer next : values) {
                next.sendMessageByObject(message);
            }
        }
    }

    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    public static synchronized void addOnlineCount() {
        WebSocketServer.onlineCount++;
    }

    public static synchronized void subOnlineCount() {
        WebSocketServer.onlineCount--;
    }

}

4)AjaxResult,前面用了,说了贴出来

package com.de.entity;

import org.apache.commons.lang3.ObjectUtils;

import java.util.HashMap;

/**
 * 操作消息提醒
 *
 * @author ruoyi
 */
public class AjaxResult extends HashMap<String, Object> {
    private static final long serialVersionUID = 1L;

    /**
     * 状态码
     */
    public static final String CODE_TAG = "code";

    /**
     * 返回内容
     */
    public static final String MSG_TAG = "msg";

    /**
     * 数据对象
     */
    public static final String DATA_TAG = "data";

    /**
     * 状态类型
     */
    public enum Type {
        /**
         * 成功
         */
        SUCCESS(0),
        SUCCESS_IMG_BYTE(201),
        /**
         * 警告
         */
        WARN(301),
        /**
         * 错误
         */
        ERROR(500);
        private final int value;

        Type(int value) {
            this.value = value;
        }

        public int value() {
            return this.value;
        }
    }

    /**
     * 初始化一个新创建的 AjaxResult 对象,使其表示一个空消息。
     */
    public AjaxResult() {
    }

    /**
     * 初始化一个新创建的 AjaxResult 对象
     *
     * @param type 状态类型
     * @param msg  返回内容
     */
    public AjaxResult(Type type, String msg) {
        super.put(CODE_TAG, type.value);
        super.put(MSG_TAG, msg);
    }

    /**
     * 初始化一个新创建的 AjaxResult 对象
     *
     * @param type 状态类型
     * @param msg  返回内容
     * @param data 数据对象
     */
    public AjaxResult(Type type, String msg, Object data) {
        super.put(CODE_TAG, type.value);
        super.put(MSG_TAG, msg);
        if (ObjectUtils.isNotEmpty(data)) {
            super.put(DATA_TAG, data);
        }
    }

    /**
     * 返回成功消息
     *
     * @return 成功消息
     */
    public static AjaxResult success() {
        return AjaxResult.success("操作成功");
    }

    /**
     * 返回成功数据
     *
     * @return 成功消息
     */
    public static AjaxResult success(Object data) {
        return AjaxResult.success("操作成功", data);
    }

    /**
     * 返回成功消息
     *
     * @param msg 返回内容
     * @return 成功消息
     */
    public static AjaxResult success(String msg) {
        return AjaxResult.success(msg, null);
    }

    /**
     * 返回成功消息
     *
     * @param msg  返回内容
     * @param data 数据对象
     * @return 成功消息
     */
    public static AjaxResult success(String msg, Object data) {
        return new AjaxResult(Type.SUCCESS, msg, data);
    }

    /**
     * 返回警告消息
     *
     * @param msg 返回内容
     * @return 警告消息
     */
    public static AjaxResult warn(String msg) {
        return AjaxResult.warn(msg, null);
    }

    /**
     * 返回警告消息
     *
     * @param msg  返回内容
     * @param data 数据对象
     * @return 警告消息
     */
    public static AjaxResult warn(String msg, Object data) {
        return new AjaxResult(Type.WARN, msg, data);
    }

    /**
     * 返回错误消息
     *
     * @return
     */
    public static AjaxResult error() {
        return AjaxResult.error("操作失败");
    }

    /**
     * 返回错误消息
     *
     * @param msg 返回内容
     * @return 警告消息
     */
    public static AjaxResult error(String msg) {
        return AjaxResult.error(msg, null);
    }

    /**
     * 返回错误消息
     *
     * @param msg  返回内容
     * @param data 数据对象
     * @return 警告消息
     */
    public static AjaxResult error(String msg, Object data) {
        return new AjaxResult(Type.ERROR, msg, data);
    }
}

这样整个websocket服务就大功告成了。然后就是解析rtsp流了。

4、解析rtsp,通过websokect发送到前端显示

1) application.yml配置(主要):为什么我的url只是个视频地址呢,因为我主要是测试,你这里写rtsp地址也行,javacv会根据地址不同去解析,如何搭建一个快速的rtsp服务器做测试呢,我后面会说

rtsp:
  url: E:/视频/VID_20141122_212032.mp4
  transport:
    type: udp

完整的application.yml

# 开发环境配置
server:
  # 服务器的HTTP端口,默认为80
  port: 8080
  servlet:
    # 应用的访问路径
    context-path: /
  tomcat:
    # tomcat的URI编码
    uri-encoding: UTF-8
    # tomcat最大线程数,默认为200
    max-threads: 800
    # Tomcat启动初始化的线程数,默认值25
    min-spare-threads: 30
 
# 日志配置
logging:
  file:
    name: delogs.log

# Spring配置
spring:
  # 模板引擎
  thymeleaf:
    mode: HTML
    encoding: utf-8
    # 禁用缓存
    cache: false
  jackson:
    time-zone: GMT+8
    date-format: yyyy-MM-dd HH:mm:ss
    serialization:
      FAIL_ON_EMPTY_BEANS: false
#  profiles:
#    active: druid
  # 文件上传
  servlet:
     multipart:
       # 单个文件大小
       max-file-size:  10MB
       # 设置总上传的文件大小
       max-request-size:  20MB
  # 服务模块
  devtools:
    restart:
      # 热部署开关
      enabled: true

rtsp:
  url: E:/视频/VID_20141122_212032.mp4
  transport:
    type: udp

2) 解析rtsp服务,websocket发送数据到前端

package com.de.rtsp;

import com.de.entity.Image;
import com.de.service.WebSocketServer;
import lombok.extern.slf4j.Slf4j;
import org.bytedeco.javacv.FFmpegFrameGrabber;
import org.bytedeco.javacv.Frame;
import org.bytedeco.javacv.FrameGrabber;
import org.bytedeco.javacv.Java2DFrameConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.stereotype.Component;

import javax.imageio.ImageIO;
import java.awt.image.BufferedImage;
import java.io.ByteArrayOutputStream;
import java.io.IOException;

/**
 *  
 *  * @projectName videoservice
 *  * @title     MediaUtils   
 *  * @package    com.de.rtsp  
 *  * @description   获取rtsp流,解析为视频帧,websocket传递到前台显示 
 *  * @author IT_CREAT     
 *  * @date  2020 2020/4/12 0012 下午 18:24  
 *  * @version V1.0.0 
 *  
 */
@Slf4j
@Component
@EnableAsync
public class MediaTransfer {

    @Value("${rtsp.url}")
    private String rtspUrl;

    @Value("${rtsp.transport.type}")
    private String rtspTransportType;

    private static FFmpegFrameGrabber grabber;

    private static boolean isStart = false;

    /**
     * 视频帧率
     */
    public static int frameRate = 24;
    /**
     * 视频宽度
     */
    public static int frameWidth = 480;
    /**
     * 视频高度
     */
    public static int frameHeight = 270;

    /**
     * 开启获取rtsp流,通过websocket传输数据
     */
    @Async
    public void live() {
        log.info("连接rtsp:"+rtspUrl+",开始创建grabber");
        grabber = createGrabber(rtspUrl);
        if (grabber != null) {
            log.info("创建grabber成功");
        } else {
            log.info("创建grabber失败");
        }
        startCameraPush();
    }

    /**
     * 构造视频抓取器
     *
     * @param rtsp 拉流地址
     * @return
     */
    public FFmpegFrameGrabber createGrabber(String rtsp) {
        // 获取视频源
        try {
            FFmpegFrameGrabber grabber = FFmpegFrameGrabber.createDefault(rtsp);
            grabber.setOption("rtsp_transport", rtspTransportType);
            //设置帧率
            grabber.setFrameRate(frameRate);
            //设置获取的视频宽度
            grabber.setImageWidth(frameWidth);
            //设置获取的视频高度
            grabber.setImageHeight(frameHeight);
            //设置视频bit率
            grabber.setVideoBitrate(2000000);
            return grabber;
        } catch (FrameGrabber.Exception e) {
            log.error("创建解析rtsp FFmpegFrameGrabber 失败");
            log.error("create rtsp FFmpegFrameGrabber exception: ", e);
            return null;
        }
    }

    /**
     * 推送图片(摄像机直播)
     */
    public void startCameraPush() {
        Java2DFrameConverter java2DFrameConverter = new Java2DFrameConverter();
        while (true) {
            if (grabber == null) {
                log.info("重试连接rtsp:"+rtspUrl+",开始创建grabber");
                grabber = createGrabber(rtspUrl);
                log.info("创建grabber成功");
            }
            try {
                if (grabber != null && !isStart) {
                    grabber.start();
                    isStart = true;
                    log.info("启动grabber成功");
                }
                if (grabber != null) {
                    Frame frame = grabber.grabImage();
                    if (null == frame) {
                        continue;
                    }
                    BufferedImage bufferedImage = java2DFrameConverter.getBufferedImage(frame);
                    byte[] bytes = imageToBytes(bufferedImage, "jpg");
                    //使用websocket发送视频帧数据
                    WebSocketServer.sendAllByObject(new Image(bytes));
                }
            } catch (FrameGrabber.Exception | RuntimeException e) {
                log.error("因为异常,grabber关闭,rtsp连接断开,尝试重新连接");
                log.error("exception : " , e);
                if (grabber != null) {
                    try {
                        grabber.stop();
                    } catch (FrameGrabber.Exception ex) {
                        log.error("grabber stop exception: ", ex);
                    } finally {
                        grabber = null;
                        isStart = false;
                    }
                }
            }
        }
    }

    /**
     * 图片转字节数组
     *
     * @param bImage 图片数据
     * @param format 格式
     * @return 图片字节码
     */
    private byte[] imageToBytes(BufferedImage bImage, String format) {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        try {
            ImageIO.write(bImage, format, out);
        } catch (IOException e) {
            log.error("bufferImage 转 byte 数组异常");
            log.error("bufferImage transfer byte[] exception: ", e);
            return null;
        }
        return out.toByteArray();
    }

}

3)rtsp解析服务启动实体类(可以采用多线程启动,我这里是用的异步的方式,上面的服务代码用了异步@Async,其实底层也是多线程,@PostConstruct注解的作用是项目启动即加载,只加载一次,相当与spring的ben配置的初始化加载方法,至于为什么异步,因为我这里初始化加载,不采用异步,就会一直卡在那,因为上面使用的是死循环监听rtsp流,进行拉流操作,所以不异步项目都起不来,会一直卡在死循环处)

package com.de.rtsp;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 *  
 *  * @projectName videoservice
 *  * @title     MadiaStart   
 *  * @package    com.de.rtsp  
 *  * @description    程序启动时加载一次,rtspj解析传输到websocket启动类 
 *  * @author IT_CREAT     
 *  * @date  2020 2020/4/18 0018 下午 22:48  
 *  * @version V1.0.0 
 *  
 */
@Component
public class MediaStart {

    @Autowired
    MediaTransfer mediaTransfer;

    @PostConstruct
    public void init() {
        //异步加载,因为初始化时执行,live里面是死循环监听rtsp,如果不异步操作,就会卡死在初始化阶段,项目就会起不来
        mediaTransfer.live();
    }

}

5、前端接受数据显示

1)首先得有websocket的插件或者是处理方法,我这里贴出来Websocket.js

;!(function (window) {
    "use strict";
    let Event = {
        wsMesEvent: function (message) {
            console.log(message)
        }
    }, dftOpt = {
        protocol: (window.location.protocol == 'http:') ? 'ws://' : 'wss://'
        , host: window.location.host
        , port: '80'
        , path: ''
        , isReConect: false
        , wsMesEvent: Event.wsMesEvent
    }, Util = {
        arrayLike(arrayLike) {
            Array.from(arrayLike)
        },
        isArray(arr) {
            Array.isArray(arr)
        },
        forEach(array, iterate) {
            let index = -1
                , length = array.length;
            if (typeof iterate != 'function') {
                return array;
            }
            while (++index < length) {
                iterate.call(array, array[index], index);
            }
        },
        isPlainObject(obj) {
            let flag = false;
            if (!obj || typeof obj != 'object') {
                return flag;
            }
            if (obj.constructor.prototype.hasOwnProperty("isPrototypeOf")) {
                flag = true;
            }
            return flag;
        },
        extend(...args) {
            if (args.length <= 0) {
                return
            };
            let target = args[0];
            if (args.length == 1) {
                return args[0]
            };
            this.forEach(args, (arg, i) => {
                if (i != 0) {
                    var keys = Object.keys(arg);
                    this.forEach(keys, (key, i) => {
                        var val = arg[key];
                        if (this.isPlainObject(val) || this.isArray(val)) {
                            var newTarget = this.isArray(val) ? [] : {};
                            target[key] = this.extend(newTarget, val);
                        } else {
                            target[key] = val;
                        }
                    });
                }
            });
            return target;
        }
    }, Ws = function (opt) {
        //如果浏览器不支持websocket,直接退出
        if (!this.isSupportWs()) {
            alert("对不起,您的浏览器在不支持WebSocket,请先升级您的浏览器!!");
            return;
        }
        let config = this.config = Util.extend({}, dftOpt, opt);
        //接口地址url
        this.url = config.host === "" || config.host === "" ?
            config.protocol + config.path:
            config.protocol + config.host + ':' + config.port + config.path;
        //心跳状态  为false时不能执行操作 等待重连
        this.isHeartBeat = false;
        //重连状态  避免不间断的重连操作
        this.isReconnect = config.isReConect;
        //发送的消息
        this.curSendMes = null;
        //响应的信息
        this.message = null;
        //创建webSocket
        this.ws;
        //初始化websocket
        this.initWs = function () {
            //创建WebSocket
            let ws = this.ws = new WebSocket(this.url);
            // ws.binaryType = "arraybuffer";
            //Ws连接函数:服务器连接成功
            ws.onopen = (e) => {
                console.log(`与${this.config.host}:${this.config.port}${this.config.path}连接已建立...`)
                this.isHeartBeat = true;
                //发布事件
                this.send();
            };
            //Ws消息接收函数:服务器向前端推送消息时触发
            ws.onmessage = (e) => {
                //处理各种推送消
                this.message = e.data;
                config.wsMesEvent.apply(this, [e.data]);
            }
            //Ws异常事件:Ws报错后触发
            ws.onerror = (e) => {
                this.isHeartBeat = false;
                this.reConnect();
            }
            //Ws关闭事件:Ws连接关闭后触发
            ws.onclose = (e) => {
                console.log('连接已关闭...');
                alert("websocket连接已关闭,按F5尝试重新刷新页面");
                this.isHeartBeat = false;
                ws = null;
                this.reConnect();
            };
        };
        this.initWs();
    };

    //判断是否支持WebSocket
    Ws.prototype.isSupportWs = function () {
        return (window.WebSocket || window.MozWebSocket) ? true : false;
    }

    //重新连接
    Ws.prototype.reConnect = function () {
        //不需要重新连接,直接返回
        if (!this.isReconnect) return;
        this.isReconnect = true;
        //没连接上 会一直重连,设置延迟避免请求过多
        setTimeout(() => {
            this.initWs()
            this.isReconnect = false;
        }, 5000);
    }

    //发送消息
    Ws.prototype.send = function (content) {
        this.curSendMes = content || this.curSendMes;
        if(this.curSendMes == null){
            return;
        }
        if (this.isHeartBeat) {
            // this.ws.send(bytes);
            this.ws.send(this.curSendMes);
        }
    }
    window.Ws = Ws;
})(window);

/***
 * 使用方式:
 * //建立连接
 * var ws1 = new Ws({
 *        host:'123.207.167.163'
 *        ,port:9010
 *        ,path:'/ajaxchattest'
 *        ,wsMesEvent:function(message){
 *            //将接收到的二进制数据转为字符串
		       var unit8Arr = new Uint8Array(event.data) ;
 *            console.log(message)
 *        }
 *    });
 *    //发送请求
 *    ws1.send("111");
 *
 *    //建立连接
 *    var ws2 = new Ws({
 *        host:'123.207.167.163'
 *        ,port:9010
 *        ,path:'/ajaxchattest'
 *        ,wsMesEvent:function(message){
 *            console.log(message)
 *        }
 *    });
 *    //发送请求
 *    ws2.send("222");
 * */

2)前端页面(我用的是Thymeleaf模板,整个展示的核心就两个js, jquery.js和上面给出的websocket.js)

<!DOCTYPE html>
<html lang="zh" xmlns:th="http://www.thymeleaf.org" xmlns:shiro="http://www.pollix.at/thymeleaf/shiro">
<head>
    <th:block th:include="include :: header('视频展示rtsp')"/>
</head>
<body class="gray-bg">
<div style="padding: 20px">
    <p style="font-size: 20px;color: #0a7491;font-weight: bold;font-family: 楷体">rtsp拉取视频显示</p>
    <img id="show_video" src="">
</div>
<th:block th:include="include :: footer"/>
<script src="../static/js/workbench/WebSocket.js" th:src="@{/js/workbench/WebSocket.js}"></script>
<script th:inline="javascript">
    var wsUrl = getWsPath() + "/webSocketService";
    //建立连接
    var ws1 = new Ws({
        host: ""
        , port: ""
        , path: wsUrl
        , wsMesEvent: function (message) {
            //将接收到的图片数据进行刷新显示
            var data = JSON.parse(message);
            if (data.code === 0) {
                console.log(message)
            } else if (data.code === 201) {
                $("#show_video").attr("src", "data:image/*;base64," + data.data)
            }
        }
    });

</script>
</body>
</html>

整个流程就完了,效果如下:

利用javacv解析rtsp流,通过websocket将视频帧传输到web前端显示成视频

rtsp服务器搭建,基于vlc播放器:https://blog.****.net/IT_CREATE/article/details/105626071

上一篇: Hibernate源码研究碎得(4)

下一篇: