flv.js源码知识点(中)

在上一篇文章 flv.js源码知识点(上)中主要讲解了flv.js的整理流程,今天讲解其中的网速计算和数据缓存处理。

1 网速计算

在音视频播放的场景中,用户的网速是影响体验的重要因素,播放器在播放的过程中,可以计算单位时间获取的数据量来衡量网速。flv.js的实例提供了statistics_info事件获取当前的网速。

  1. flvPlayer.on(statistics_info, function(res) {
  2. console.log(statistics_info,res);
  3. })

res结构如下:

  1. {
  2. currentSegmentIndex: 0,
  3. decodedFrames: 15,
  4. droppedFrames: 0,
  5. hasRedirect: false,
  6. loaderType: "fetch-stream-loader",
  7. playerType: "FlvPlayer",
  8. speed: 395.19075278358656,
  9. totalSegmentCount: 1,
  10. url: "https:/example.com/1.flv"
  11. }

其中的speed字段就是网速,单位是KB/s, 下面就看关于网速计算相关的部分。statistics_info事件中获取网速的整体流程如下图:

  • IOController中控制每次把加载的字节数添加到SpeedSampler中,对外提供的lastSecondKBps属性是最近有数据一秒的网速。

  • TransmuxingController中控制播放器在加载数据的时候开启定时器获取统计数据,向上触发事件。

核心的计算还是SpeedSampler类, lastSecondKBps是getter属性获取最近有数据一秒的网速,代码含义参考注释。

  1. get lastSecondKBps () {
  2. // 如果够1s计算 this._lastSecondBytes
  3. this.addBytes(0)
  4. // 上1秒的_lastSecondBytes有数据 就直接返回
  5. // 这个巧妙的是 感觉不是准确的1s 但是又是准确的 因为如果是超过1秒就不继续添加了 1秒内的就添加进去了。
  6. // 如果上一秒有数据则返回
  7. if (this._lastSecondBytes !== 0) {
  8. return this._lastSecondBytes / 1024
  9. } else {
  10. // 如果上一秒的速度是0,并且距离上次计算超过了500ms 则用_intervalBytes和durationSeconds进行计算
  11. if (this._now() - this._lastCheckpoint >= 500) {
  12. // if time interval since last checkpoint has exceeded 500ms
  13. // the speed is nearly accurate
  14. return this.currentKBps
  15. } else {
  16. // We dont know
  17. return 0
  18. }
  19. }
  20. }

下面是addBytes方法,根据本次调用的时间和上一次计算时间的差值做不同处理,具体参见代码注释,这种计算的思路是挺巧妙的,开始以为不准切,但是仔细思考是能准确计算最近有数据一秒的网速。一直强调是最近有数据一秒的网速而不是上一秒的网速。

  1. addBytes (bytes) {
  2. // 如果是第一次调用则 记录_firstCheckpoint _lastCheckpoint
  3. if (this._firstCheckpoint === 0) {
  4. this._firstCheckpoint = this._now()
  5. this._lastCheckpoint = this._firstCheckpoint
  6. this._intervalBytes += bytes
  7. this._totalBytes += bytes
  8. } else if (this._now() - this._lastCheckpoint < 1000) {
  9. // 小于1s 就添加 _intervalBytes
  10. this._intervalBytes += bytes
  11. this._totalBytes += bytes
  12. } else { // duration >= 1000
  13. // 只有大于1秒的时候才计算_lastSecondBytes
  14. // 就是这1s内的_intervalBytes
  15. this._lastSecondBytes = this._intervalBytes
  16. this._intervalBytes = bytes // 并且重新开始计算_intervalBytes 大于1秒的这次数据算在下1秒
  17. this._totalBytes += bytes
  18. this._lastCheckpoint = this._now()
  19. }
  20. }

下面是currentKBps getter属性,在lastSecondKBps中只有当超过因为如果durationSeconds大于0.5时才使用currentKBps属性,因为如果durationSeconds过小,会过大估计了网速。

  1. get currentKBps () {
  2. this.addBytes(0)
  3. let durationSeconds = (this._now() - this._lastCheckpoint) / 1000
  4. if (durationSeconds == 0) durationSeconds = 1
  5. return (this._intervalBytes / durationSeconds) / 1024
  6. }

平均网速averageKBps, 如果中途出现网络中断或者暂停的情况会拉低平均网速。

  1. get averageKBps () {
  2. let durationSeconds = (this._now() - this._firstCheckpoint) / 1000
  3. return (this._totalBytes / durationSeconds) / 1024
  4. }

2 数据缓存处理

这里讲的缓存是指使用loader获取数据后到传给FLVDemuxer过程中的缓存。这个过程中为什么需要缓存呢?因为FLV格式数据的解封是以TAG为单位,而过来的数据是流式的字节,不可能每次是完整的TAG,所以FLVDemuxer每次只处理当前数据中完整的TAG,没有处理的部分就缓存起来,和下次获取的数据拼接。

通过上面的原理介绍,你应该可以猜到这个过程是放在IOController中,我们先分解缓存中使用到的几个关键API和操作方法。

2.1 二进制缓存区格式

ArrayBuffer 对象用来表示通用的、固定长度的原始二进制数据缓冲区。
你不能直接操作 ArrayBuffer 的内容,而是要通过类型数组对象DataView 对象来操作,它们会将缓冲区中的数据表示为特定的格式,并通过这些格式来读写缓冲区的内容。

这里的定义 关键有两点,一是ArrayBuffer是固定长度,所以扩展的话需要创建新的然后把数据复制过去,而是不能直接操作,二是 不能直接操作,需要用类型数据对象,我们这里用Uint8Array,因为8位无符号正好是以一个字节为单位。我们这里对缓存的处理,暂时不需要读取指定的字节,目前只需要能够读取指定位置的数据即可。

2.2 缓存区操作API

Uint8Array 数组类型表示一个8位无符号整型数组,创建时内容被初始化为0。创建完后,可以以对象的方式或使用数组下标索引的方式引用数组中的元素。

new Uint8Array(buffer [, byteOffset [, length]]);

说明:在ArrayBuffer上创建Uint8Array对象,使缓存区可操作。
参数: bufferArrayBuffer对象,byteOffset指定ArrayBuffer的起始字节数,length指定创建的长度。

typedarray.set(typedarray[, offset])
说明:Uint8Array属于typedarray, set方法可以从指定类型化数据中读取值,并将其存储在类型化数组中的指定位置。
参数:typedarray是指要拷贝的源数据,offset指拷贝到目标数据的起始位置。

2.3 方法一 扩展缓存

根据上面的api,把长度为100的ArrayBuffer扩展为长度为1000的ArrabyBuffer

  1. const oldbuffer = new ArrayBuffer(100);
  2. const u1 = new Uint8Array(oldbuffer, 0);
  3. const newbuffer = new ArrayBuffer(1000);
  4. const u2 = new Uint8Array(newbuffer,0);
  5. u2.set(u1,0);

2.4 方法二 消费缓存

记录缓存消费位置,消费一部分后重新设置缓存。

  1. let stashUsed = 100;
  2. let bufferSize = 1024;
  3. let stashBuffer = new ArrayBuffer(1024);
  4. // 消费数据 返回消费的字节数
  5. let consumed = dispatchChunks(stashBuffer.slice(0, stashUsed),stashUsed);
  6. let allBuffer = new Uint8Array(stashBuffer, 0, bufferSize);
  7. let remainBuffer = new Uint8Array(stashBuffer, consumed);
  8. allBuffer.set(remainBuffer,0);
  9. stashUsed = stashUsed-consumed;

2.5 缓存源码

下面就来看IOController中缓存数据的代码。
几个变量和方法的含义:

  1. this._stashBuffer ArrayBuffer类型 存放数据的缓存区
  2. this._bufferSize 缓存区的大小 this._stashBuffer的长度
  3. this._stashUsed 缓存区中使用的缓存大小
  4. this._stashByteStart 已经消费的部分在整个流中的开始位置
  5. this._expandBuffer() 扩展缓存的方法
  6. this.this._dispatchChunks() 消费缓存数据的方法 返回消费的数量
  7. chunk ajax获取的二进制数据

有了上面的准备,就可以直接看缓存处理的代码了

  1. // 缓存中没有数据的情况
  2. if (this._stashUsed === 0) {
  3. // 直接消费
  4. let consumed = this._dispatchChunks(chunk, byteStart);
  5. // 如果有剩余
  6. if (consumed < chunk.byteLength) {
  7. // 未处理的数据长度
  8. let remain = chunk.byteLength - consumed;
  9. // 如果数据超过缓存 则扩展缓存
  10. if (remain > this._bufferSize) {
  11. this._expandBuffer(remain);
  12. }
  13. // 在_stashBuffer上创建 Uint8Array使其可以操作
  14. let stashArray = new Uint8Array(this._stashBuffer, 0, this._bufferSize);
  15. // 从chunk的 consumed开始获取数据 然后从第0位置开始写入stashArray中
  16. stashArray.set(new Uint8Array(chunk, consumed), 0);
  17. // 记录stashUsed的大小
  18. this._stashUsed += remain;
  19. // 记录整个流中的开始位置
  20. this._stashByteStart = byteStart + consumed;
  21. }
  22. } else {
  23. // 缓存中有数据的情况
  24. // 先扩展缓存 能够放下已存在的和当前获取的
  25. if (this._stashUsed + chunk.byteLength > this._bufferSize) {
  26. this._expandBuffer(this._stashUsed + chunk.byteLength);
  27. }
  28. let stashArray = new Uint8Array(this._stashBuffer, 0, this._bufferSize);
  29. // 先把获取到的chunk 放入缓存中 从_stashUsed的offset开始存放
  30. stashArray.set(new Uint8Array(chunk), this._stashUsed);
  31. // 重置_stashUsed
  32. this._stashUsed += chunk.byteLength;
  33. // 把缓存中的数据全部读出进行消费
  34. let consumed = this._dispatchChunks(this._stashBuffer.slice(0, this._stashUsed), this._stashByteStart);
  35. // 如果消费了有剩余
  36. if (consumed < this._stashUsed && consumed > 0) { // unconsumed data remain
  37. // 从consumed开始截取数据
  38. let remainArray = new Uint8Array(this._stashBuffer, consumed);
  39. // 从0开始设置 剩下的数据作为缓存 并且改变_stashUsed 记录缓存的位置
  40. stashArray.set(remainArray, 0);
  41. }
  42. // 重新设置_stashUsed
  43. this._stashUsed -= consumed;
  44. this._stashByteStart += consumed;
  45. }

上面的代码是每次来数据都会调用this._dispatchChunks进行消费操作,其实还有一种处理情况,通过变量this._enableStash控制,上面的情况是this._enableStashfalse。如果为true的话区别是只有缓存的数据达到this._stashSize大小时,才会触发this._dispatchChunks进行消费操作。

总体的流程是如果数据小于this._stashSize 则往缓存中添加,如果大于继续下面的判断
如果缓存中没有数据 则直接消费本地来的数据,如果有数据则消费缓存中的数据 消费之后再把本地来的数据放入缓存。具体参见代码

  1. if (this._stashUsed === 0 && this._stashByteStart === 0) { // seeked? or init chunk?
  2. // This is the first chunk after seek action
  3. this._stashByteStart = byteStart;
  4. }
  5. // 不满_stashSize 就会先往缓存中存放 _stashSize会动态调整
  6. if (this._stashUsed + chunk.byteLength <= this._stashSize) {
  7. let stashArray = new Uint8Array(this._stashBuffer, 0, this._stashSize);
  8. stashArray.set(new Uint8Array(chunk), this._stashUsed);
  9. this._stashUsed += chunk.byteLength;
  10. } else { // stashUsed + chunkSize > stashSize, size limit exceeded
  11. let stashArray = new Uint8Array(this._stashBuffer, 0, this._bufferSize);
  12. if (this._stashUsed > 0) { // Therere stash datas in buffer
  13. // 如果有缓存 先消费缓存中的数据
  14. let buffer = this._stashBuffer.slice(0, this._stashUsed);
  15. let consumed = this._dispatchChunks(buffer, this._stashByteStart);
  16. if (consumed < buffer.byteLength) {
  17. if (consumed > 0) {
  18. let remainArray = new Uint8Array(buffer, consumed);
  19. stashArray.set(remainArray, 0);
  20. this._stashUsed = remainArray.byteLength;
  21. this._stashByteStart += consumed;
  22. }
  23. } else {
  24. this._stashUsed = 0;
  25. this._stashByteStart += consumed;
  26. }
  27. // 消费完缓存中的数据之后,然后再把这次过来的chunk放入缓存中
  28. if (this._stashUsed + chunk.byteLength > this._bufferSize) {
  29. this._expandBuffer(this._stashUsed + chunk.byteLength);
  30. stashArray = new Uint8Array(this._stashBuffer, 0, this._bufferSize);
  31. }
  32. stashArray.set(new Uint8Array(chunk), this._stashUsed);
  33. this._stashUsed += chunk.byteLength;
  34. } else { // stash buffer empty, but chunkSize > stashSize (oh, holy shit)
  35. // dispatch chunk directly and stash remain data
  36. // 如果缓存中没有数据 直接消费本次来的数据
  37. let consumed = this._dispatchChunks(chunk, byteStart);
  38. if (consumed < chunk.byteLength) {
  39. let remain = chunk.byteLength - consumed;
  40. if (remain > this._bufferSize) {
  41. this._expandBuffer(remain);
  42. stashArray = new Uint8Array(this._stashBuffer, 0, this._bufferSize);
  43. }
  44. stashArray.set(new Uint8Array(chunk, consumed), 0);
  45. this._stashUsed += remain;
  46. this._stashByteStart = byteStart + consumed;
  47. }
  48. }
  49. }

关于this._stashSize还有两个问题,
一是this._stashSize的大小会根据网速进行调整,二是this._stashSize是小于等于this._bufferSize缓存大小,所以this._stashSize变化时也需要扩展缓存。

// 先看获取网速的代码。

  1. //网速计算
  2. this._speedSampler.addBytes(chunk.byteLength);
  3. // adjust stash buffer size according to network speed dynamically
  4. // 获取当前网速
  5. let KBps = this._speedSampler.lastSecondKBps;
  6. if (KBps !== 0) {
  7. // 正规化网速
  8. let normalized = this._normalizeSpeed(KBps);
  9. if (this._speedNormalized !== normalized) {
  10. this._speedNormalized = normalized;
  11. this._adjustStashSize(normalized);
  12. }
  13. }

其中的_normalizeSpeed方法是在给定的速度中二分查找最接近网速的大小。

  1. this._speedNormalizeList = [64, 128, 256, 384, 512, 768, 1024, 1536, 2048, 3072, 4096];
  2. _normalizeSpeed(input) {
  3. let list = this._speedNormalizeList;
  4. let last = list.length - 1;
  5. let mid = 0;
  6. let lbound = 0;
  7. let ubound = last;
  8. if (input < list[0]) {
  9. return list[0];
  10. }
  11. // binary search
  12. while (lbound <= ubound) {
  13. mid = lbound + Math.floor((ubound - lbound) / 2);
  14. if (mid === last || (input >= list[mid] && input < list[mid + 1])) {
  15. return list[mid];
  16. } else if (list[mid] < input) {
  17. lbound = mid + 1;
  18. } else {
  19. ubound = mid - 1;
  20. }
  21. }
  22. }

_adjustStashSize是调整this._stashSize的方法,当缓存的大小小于this._stashSize时,则进行扩展。

  1. _adjustStashSize(normalized) {
  2. let stashSizeKB = 0;
  3. // 如果是直播
  4. if (this._config.isLive) {
  5. // live stream: always use single normalized speed for size of stashSizeKB
  6. stashSizeKB = normalized;
  7. } else {
  8. if (normalized < 512) {
  9. stashSizeKB = normalized;
  10. } else if (normalized >= 512 && normalized <= 1024) {
  11. stashSizeKB = Math.floor(normalized * 1.5);
  12. } else {
  13. stashSizeKB = normalized * 2;
  14. }
  15. }
  16. // 最大是8K
  17. if (stashSizeKB > 8192) {
  18. stashSizeKB = 8192;
  19. }
  20. let bufferSize = stashSizeKB * 1024 + 1024 * 1024 * 1; // stashSize + 1MB
  21. // 如果缓存小则扩展缓存
  22. if (this._bufferSize < bufferSize) {
  23. this._expandBuffer(bufferSize);
  24. }
  25. this._stashSize = stashSizeKB * 1024;
  26. }

扩展缓存的_expandBuffer方法和我们写的demo很相似。

  1. _expandBuffer(expectedBytes) {
  2. let bufferNewSize = this._stashSize;
  3. // 每次*2 直到大于expectedBytes
  4. while (bufferNewSize + 1024 * 1024 * 1 < expectedBytes) {
  5. bufferNewSize *= 2;
  6. }
  7. bufferNewSize += 1024 * 1024 * 1; // bufferSize = stashSize + 1MB
  8. if (bufferNewSize === this._bufferSize) {
  9. return;
  10. }
  11. // 新的缓存区
  12. let newBuffer = new ArrayBuffer(bufferNewSize);
  13. // 旧缓存区有数据 则进行拷贝
  14. if (this._stashUsed > 0) { // copy existing data into new buffer
  15. let stashOldArray = new Uint8Array(this._stashBuffer, 0, this._stashUsed);
  16. let stashNewArray = new Uint8Array(newBuffer, 0, bufferNewSize);
  17. stashNewArray.set(stashOldArray, 0);
  18. }
  19. // 重设缓存区和缓存区大小
  20. this._stashBuffer = newBuffer;
  21. this._bufferSize = bufferNewSize;
  22. }

总结: 这篇文章的收获是网速计算的思路可以应用到类似场景中,比如限流。数据缓存中的二进制缓存区的操作方法。下篇文章中讲解FLV格式的解析和涉及到的位操作。

  • 如果觉得有用请帮忙点个赞。
  • 我正在参与掘金技术社区创作者签约计划招募活动,点击链接报名投稿

文章标签:

原文连接:https://juejin.cn/post/7115401019990736910

相关推荐

Jackson 解析 JSON 详细教程

Node.js精进(11)——Socket.IO

Js实现继承的6种方式

【跟着大佬学JavaScript】之数组去重(结果对比)

flv.js的追帧、断流重连及实时更新的直播优化方案

Node.js精进(10)——性能监控(下)

【js逆向爬虫】-有道翻译js逆向实战

性能吊打 Node.js 和 Deno 的新一代 javaScript 运行时-Bun.js

JsetPack组件App StartUp的使用示例

three.js 性能优化

JsonPath:针对json的强大的规则解析与参数查找工具

XJSON 是如何实现四则运算的?

js异步编程、Promise的应用以及在循环中、递归的时候使用Promise。

如何实现一个 JSON 解析库

Three.js系列: 在元宇宙看电影,享受 VR 视觉盛宴

Node.js精进(9)——性能监控(上)

autojs悬浮窗模拟toast气泡

【跟着大佬学JavaScript】之lodash防抖节流合并

本地使用 Docker Compose 与 Nestjs 快速构建基于 Dapr 的 Redis 发布/订阅分布式应用

【跟着大佬学JavaScript】之节流