blockedsource.js 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. import QuickLRU from 'quick-lru';
  2. import { BaseSource } from './basesource.js';
  3. import { AbortError, AggregateError, wait, zip } from '../utils.js';
  4. class Block {
  5. /**
  6. *
  7. * @param {number} offset
  8. * @param {number} length
  9. * @param {ArrayBuffer} [data]
  10. */
  11. constructor(offset, length, data = null) {
  12. this.offset = offset;
  13. this.length = length;
  14. this.data = data;
  15. }
  16. /**
  17. * @returns {number} the top byte border
  18. */
  19. get top() {
  20. return this.offset + this.length;
  21. }
  22. }
  23. class BlockGroup {
  24. /**
  25. *
  26. * @param {number} offset
  27. * @param {number} length
  28. * @param {number[]} blockIds
  29. */
  30. constructor(offset, length, blockIds) {
  31. this.offset = offset;
  32. this.length = length;
  33. this.blockIds = blockIds;
  34. }
  35. }
  36. export class BlockedSource extends BaseSource {
  37. /**
  38. *
  39. * @param {BaseSource} source The underlying source that shall be blocked and cached
  40. * @param {object} options
  41. * @param {number} [options.blockSize]
  42. * @param {number} [options.cacheSize]
  43. */
  44. constructor(source, { blockSize = 65536, cacheSize = 100 } = {}) {
  45. super();
  46. this.source = source;
  47. this.blockSize = blockSize;
  48. this.blockCache = new QuickLRU({
  49. maxSize: cacheSize,
  50. onEviction: (blockId, block) => {
  51. this.evictedBlocks.set(blockId, block);
  52. },
  53. });
  54. /** @type {Map<number, Block>} */
  55. this.evictedBlocks = new Map();
  56. // mapping blockId -> Block instance
  57. this.blockRequests = new Map();
  58. // set of blockIds missing for the current requests
  59. this.blockIdsToFetch = new Set();
  60. this.abortedBlockIds = new Set();
  61. }
  62. get fileSize() {
  63. return this.source.fileSize;
  64. }
  65. /**
  66. *
  67. * @param {import("./basesource").Slice[]} slices
  68. */
  69. async fetch(slices, signal) {
  70. const blockRequests = [];
  71. const missingBlockIds = [];
  72. const allBlockIds = [];
  73. this.evictedBlocks.clear();
  74. for (const { offset, length } of slices) {
  75. let top = offset + length;
  76. const { fileSize } = this;
  77. if (fileSize !== null) {
  78. top = Math.min(top, fileSize);
  79. }
  80. const firstBlockOffset = Math.floor(offset / this.blockSize) * this.blockSize;
  81. for (let current = firstBlockOffset; current < top; current += this.blockSize) {
  82. const blockId = Math.floor(current / this.blockSize);
  83. if (!this.blockCache.has(blockId) && !this.blockRequests.has(blockId)) {
  84. this.blockIdsToFetch.add(blockId);
  85. missingBlockIds.push(blockId);
  86. }
  87. if (this.blockRequests.has(blockId)) {
  88. blockRequests.push(this.blockRequests.get(blockId));
  89. }
  90. allBlockIds.push(blockId);
  91. }
  92. }
  93. // allow additional block requests to accumulate
  94. await wait();
  95. this.fetchBlocks(signal);
  96. // Gather all of the new requests that this fetch call is contributing to `fetch`.
  97. const missingRequests = [];
  98. for (const blockId of missingBlockIds) {
  99. // The requested missing block could already be in the cache
  100. // instead of having its request still be outstanding.
  101. if (this.blockRequests.has(blockId)) {
  102. missingRequests.push(this.blockRequests.get(blockId));
  103. }
  104. }
  105. // Actually await all pending requests that are needed for this `fetch`.
  106. await Promise.allSettled(blockRequests);
  107. await Promise.allSettled(missingRequests);
  108. // Perform retries if a block was interrupted by a previous signal
  109. const abortedBlockRequests = [];
  110. const abortedBlockIds = allBlockIds
  111. .filter((id) => this.abortedBlockIds.has(id) || !this.blockCache.has(id));
  112. abortedBlockIds.forEach((id) => this.blockIdsToFetch.add(id));
  113. // start the retry of some blocks if required
  114. if (abortedBlockIds.length > 0 && signal && !signal.aborted) {
  115. this.fetchBlocks(null);
  116. for (const blockId of abortedBlockIds) {
  117. const block = this.blockRequests.get(blockId);
  118. if (!block) {
  119. throw new Error(`Block ${blockId} is not in the block requests`);
  120. }
  121. abortedBlockRequests.push(block);
  122. }
  123. await Promise.allSettled(abortedBlockRequests);
  124. }
  125. // throw an abort error
  126. if (signal && signal.aborted) {
  127. throw new AbortError('Request was aborted');
  128. }
  129. const blocks = allBlockIds.map((id) => this.blockCache.get(id) || this.evictedBlocks.get(id));
  130. const failedBlocks = blocks.filter((i) => !i);
  131. if (failedBlocks.length) {
  132. throw new AggregateError(failedBlocks, 'Request failed');
  133. }
  134. // create a final Map, with all required blocks for this request to satisfy
  135. const requiredBlocks = new Map(zip(allBlockIds, blocks));
  136. // TODO: satisfy each slice
  137. return this.readSliceData(slices, requiredBlocks);
  138. }
  139. /**
  140. *
  141. * @param {AbortSignal} signal
  142. */
  143. fetchBlocks(signal) {
  144. // check if we still need to
  145. if (this.blockIdsToFetch.size > 0) {
  146. const groups = this.groupBlocks(this.blockIdsToFetch);
  147. // start requesting slices of data
  148. const groupRequests = this.source.fetch(groups, signal);
  149. for (let groupIndex = 0; groupIndex < groups.length; ++groupIndex) {
  150. const group = groups[groupIndex];
  151. for (const blockId of group.blockIds) {
  152. // make an async IIFE for each block
  153. this.blockRequests.set(blockId, (async () => {
  154. try {
  155. const response = (await groupRequests)[groupIndex];
  156. const blockOffset = blockId * this.blockSize;
  157. const o = blockOffset - response.offset;
  158. const t = Math.min(o + this.blockSize, response.data.byteLength);
  159. const data = response.data.slice(o, t);
  160. const block = new Block(
  161. blockOffset,
  162. data.byteLength,
  163. data,
  164. blockId,
  165. );
  166. this.blockCache.set(blockId, block);
  167. this.abortedBlockIds.delete(blockId);
  168. } catch (err) {
  169. if (err.name === 'AbortError') {
  170. // store the signal here, we need it to determine later if an
  171. // error was caused by this signal
  172. err.signal = signal;
  173. this.blockCache.delete(blockId);
  174. this.abortedBlockIds.add(blockId);
  175. } else {
  176. throw err;
  177. }
  178. } finally {
  179. this.blockRequests.delete(blockId);
  180. }
  181. })());
  182. }
  183. }
  184. this.blockIdsToFetch.clear();
  185. }
  186. }
  187. /**
  188. *
  189. * @param {Set} blockIds
  190. * @returns {BlockGroup[]}
  191. */
  192. groupBlocks(blockIds) {
  193. const sortedBlockIds = Array.from(blockIds).sort((a, b) => a - b);
  194. if (sortedBlockIds.length === 0) {
  195. return [];
  196. }
  197. let current = [];
  198. let lastBlockId = null;
  199. const groups = [];
  200. for (const blockId of sortedBlockIds) {
  201. if (lastBlockId === null || lastBlockId + 1 === blockId) {
  202. current.push(blockId);
  203. lastBlockId = blockId;
  204. } else {
  205. groups.push(new BlockGroup(
  206. current[0] * this.blockSize,
  207. current.length * this.blockSize,
  208. current,
  209. ));
  210. current = [blockId];
  211. lastBlockId = blockId;
  212. }
  213. }
  214. groups.push(new BlockGroup(
  215. current[0] * this.blockSize,
  216. current.length * this.blockSize,
  217. current,
  218. ));
  219. return groups;
  220. }
  221. /**
  222. *
  223. * @param {import("./basesource").Slice[]} slices
  224. * @param {Map} blocks
  225. */
  226. readSliceData(slices, blocks) {
  227. return slices.map((slice) => {
  228. let top = slice.offset + slice.length;
  229. if (this.fileSize !== null) {
  230. top = Math.min(this.fileSize, top);
  231. }
  232. const blockIdLow = Math.floor(slice.offset / this.blockSize);
  233. const blockIdHigh = Math.floor(top / this.blockSize);
  234. const sliceData = new ArrayBuffer(slice.length);
  235. const sliceView = new Uint8Array(sliceData);
  236. for (let blockId = blockIdLow; blockId <= blockIdHigh; ++blockId) {
  237. const block = blocks.get(blockId);
  238. const delta = block.offset - slice.offset;
  239. const topDelta = block.top - top;
  240. let blockInnerOffset = 0;
  241. let rangeInnerOffset = 0;
  242. let usedBlockLength;
  243. if (delta < 0) {
  244. blockInnerOffset = -delta;
  245. } else if (delta > 0) {
  246. rangeInnerOffset = delta;
  247. }
  248. if (topDelta < 0) {
  249. usedBlockLength = block.length - blockInnerOffset;
  250. } else {
  251. usedBlockLength = top - block.offset - blockInnerOffset;
  252. }
  253. const blockView = new Uint8Array(block.data, blockInnerOffset, usedBlockLength);
  254. sliceView.set(blockView, rangeInnerOffset);
  255. }
  256. return sliceData;
  257. });
  258. }
  259. }