Best JavaScript code snippet using wpt
readable-internals.ts
Source:readable-internals.ts
1/**2 * streams/readable-internals - internal types and functions for readable streams3 * Part of Stardazed4 * (c) 2018-Present by @zenmumbler5 * https://github.com/stardazed/sd-streams6 */7import * as ws from "./writable-internals";8import * as shared from "./shared-internals";9import * as q from "./queue-mixin";10// ReadableStreamDefaultController11export const controlledReadableStream_ = Symbol("controlledReadableStream_");12export const pullAlgorithm_ = Symbol("pullAlgorithm_");13export const cancelAlgorithm_ = Symbol("cancelAlgorithm_");14export const strategySizeAlgorithm_ = Symbol("strategySizeAlgorithm_");15export const strategyHWM_ = Symbol("strategyHWM_");16export const started_ = Symbol("started_");17export const closeRequested_ = Symbol("closeRequested_");18export const pullAgain_ = Symbol("pullAgain_");19export const pulling_ = Symbol("pulling_");20export const cancelSteps_ = Symbol("cancelSteps_");21export const pullSteps_ = Symbol("pullSteps_");22// ReadableByteStreamController23export const autoAllocateChunkSize_ = Symbol("autoAllocateChunkSize_");24export const byobRequest_ = Symbol("byobRequest_");25export const controlledReadableByteStream_ = Symbol("controlledReadableByteStream_");26export const pendingPullIntos_ = Symbol("pendingPullIntos_");27// ReadableStreamDefaultReader28export const closedPromise_ = Symbol("closedPromise_");29export const ownerReadableStream_ = Symbol("ownerReadableStream_");30export const readRequests_ = Symbol("readRequests_");31export const readIntoRequests_ = Symbol("readIntoRequests_");32// ReadableStreamBYOBRequest33export const associatedReadableByteStreamController_ = Symbol("associatedReadableByteStreamController_");34export const view_ = Symbol("view_");35// ReadableStreamBYOBReader36// ReadableStream37export const reader_ = Symbol("reader_");38export const readableStreamController_ = Symbol("readableStreamController_");39export type StartFunction<OutputType> = (controller: SDReadableStreamControllerBase<OutputType>) => void | PromiseLike<void>;40export type StartAlgorithm = () => Promise<void> | void;41export type PullFunction<OutputType> = (controller: SDReadableStreamControllerBase<OutputType>) => void | PromiseLike<void>;42export type PullAlgorithm<OutputType> = (controller: SDReadableStreamControllerBase<OutputType>) => PromiseLike<void>;43export type CancelAlgorithm = (reason?: shared.ErrorResult) => Promise<void>;44// ----45export interface SDReadableStreamControllerBase<OutputType> {46 readonly desiredSize: number | null;47 close(): void;48 error(e?: shared.ErrorResult): void;49 [cancelSteps_](reason: shared.ErrorResult): Promise<void>;50 [pullSteps_](forAuthorCode: boolean): Promise<IteratorResult<OutputType>>;51}52export interface SDReadableStreamBYOBRequest {53 readonly view: ArrayBufferView;54 respond(bytesWritten: number): void;55 respondWithNewView(view: ArrayBufferView): void;56 [associatedReadableByteStreamController_]: SDReadableByteStreamController | undefined;57 [view_]: ArrayBufferView | undefined;58}59interface ArrayBufferViewCtor {60 new(buffer: ArrayBufferLike, byteOffset?: number, byteLength?: number): ArrayBufferView;61}62export interface PullIntoDescriptor {63 readerType: "default" | "byob";64 ctor: ArrayBufferViewCtor;65 buffer: ArrayBufferLike;66 bufferByteLength: number;67 byteOffset: number;68 byteLength: number;69 bytesFilled: number;70 elementSize: number;71}72export interface SDReadableByteStreamController extends SDReadableStreamControllerBase<ArrayBufferView>, q.ByteQueueContainer {73 readonly byobRequest: SDReadableStreamBYOBRequest | undefined;74 enqueue(chunk: ArrayBufferView): void;75 [autoAllocateChunkSize_]: number | undefined; // A positive integer, when the automatic buffer allocation feature is enabled. In that case, this value specifies the size of buffer to allocate. It is undefined otherwise.76 [byobRequest_]: SDReadableStreamBYOBRequest | undefined; // A ReadableStreamBYOBRequest instance representing the current BYOB pull request77 [cancelAlgorithm_]: CancelAlgorithm; // A promise-returning algorithm, taking one argument (the cancel reason), which communicates a requested cancelation to the underlying source78 [closeRequested_]: boolean; // A boolean flag indicating whether the stream has been closed by its underlying byte source, but still has chunks in its internal queue that have not yet been read79 [controlledReadableByteStream_]: SDReadableStream<ArrayBufferView>; // The ReadableStream instance controlled80 [pullAgain_]: boolean; // A boolean flag set to true if the streamâs mechanisms requested a call to the underlying byte sourceâs pull() method to pull more data, but the pull could not yet be done since a previous call is still executing81 [pullAlgorithm_]: PullAlgorithm<ArrayBufferView>; // A promise-returning algorithm that pulls data from the underlying source82 [pulling_]: boolean; // A boolean flag set to true while the underlying byte sourceâs pull() method is executing and has not yet fulfilled, used to prevent reentrant calls83 [pendingPullIntos_]: PullIntoDescriptor[]; // A List of descriptors representing pending BYOB pull requests84 [started_]: boolean; // A boolean flag indicating whether the underlying source has finished starting85 [strategyHWM_]: number; // A number supplied to the constructor as part of the streamâs queuing strategy, indicating the point at which the stream will apply backpressure to its underlying byte source86}87export interface SDReadableStreamDefaultController<OutputType> extends SDReadableStreamControllerBase<OutputType>, q.QueueContainer<OutputType> {88 enqueue(chunk?: OutputType): void;89 [controlledReadableStream_]: SDReadableStream<OutputType>;90 [pullAlgorithm_]: PullAlgorithm<OutputType>;91 [cancelAlgorithm_]: CancelAlgorithm;92 [strategySizeAlgorithm_]: QueuingStrategySize<OutputType>;93 [strategyHWM_]: number;94 [started_]: boolean;95 [closeRequested_]: boolean;96 [pullAgain_]: boolean;97 [pulling_]: boolean;98}99// ----100export interface SDReadableStreamReader<OutputType> {101 readonly closed: Promise<void>;102 cancel(reason: shared.ErrorResult): Promise<void>;103 releaseLock(): void;104 [ownerReadableStream_]: SDReadableStream<OutputType> | undefined;105 [closedPromise_]: shared.ControlledPromise<void>;106}107export interface ReadRequest<V> extends shared.ControlledPromise<V> {108 forAuthorCode: boolean;109}110export declare class SDReadableStreamDefaultReader<OutputType> implements SDReadableStreamReader<OutputType> {111 constructor(stream: SDReadableStream<OutputType>);112 readonly closed: Promise<void>;113 cancel(reason: shared.ErrorResult): Promise<void>;114 releaseLock(): void;115 read(): Promise<IteratorResult<OutputType | undefined>>;116 [ownerReadableStream_]: SDReadableStream<OutputType> | undefined;117 [closedPromise_]: shared.ControlledPromise<void>;118 [readRequests_]: ReadRequest<IteratorResult<OutputType>>[];119}120export declare class SDReadableStreamBYOBReader implements SDReadableStreamReader<ArrayBufferView> {121 constructor(stream: SDReadableStream<ArrayBufferView>);122 readonly closed: Promise<void>;123 cancel(reason: shared.ErrorResult): Promise<void>;124 releaseLock(): void;125 read(view: ArrayBufferView): Promise<IteratorResult<ArrayBufferView>>;126 [ownerReadableStream_]: SDReadableStream<ArrayBufferView> | undefined;127 [closedPromise_]: shared.ControlledPromise<void>;128 [readIntoRequests_]: ReadRequest<IteratorResult<ArrayBufferView>>[];129}130// ----131export interface GenericTransformStream<InputType, OutputType> {132 readable: SDReadableStream<OutputType>;133 writable: ws.WritableStream<InputType>;134}135export type ReadableStreamState = "readable" | "closed" | "errored";136export declare class SDReadableStream<OutputType> {137 constructor(underlyingSource?: UnderlyingSource<OutputType>, strategy?: QueuingStrategy<OutputType>);138 readonly locked: boolean;139 cancel(reason?: shared.ErrorResult): Promise<void>;140 getReader(): SDReadableStreamReader<OutputType>;141 getReader(options: { mode: "byob" }): SDReadableStreamBYOBReader;142 tee(): SDReadableStream<OutputType>[];143 pipeThrough<ResultType>(transform: GenericTransformStream<OutputType, ResultType>, options?: StreamPipeOptions): SDReadableStream<ResultType>;144 pipeTo(dest: ws.WritableStream<OutputType>, options?: StreamPipeOptions): Promise<void>;145 [shared.state_]: ReadableStreamState;146 [shared.storedError_]: shared.ErrorResult;147 [reader_]: SDReadableStreamReader<OutputType> | undefined;148 [readableStreamController_]: SDReadableStreamControllerBase<OutputType>;149}150// ---- Stream151export function initializeReadableStream<OutputType>(stream: SDReadableStream<OutputType>) {152 stream[shared.state_] = "readable";153 stream[reader_] = undefined;154 stream[shared.storedError_] = undefined;155 stream[readableStreamController_] = undefined!; // mark slot as used for brand check156}157export function isReadableStream(value: unknown): value is SDReadableStream<any> {158 if (typeof value !== "object" || value === null) {159 return false;160 }161 return readableStreamController_ in value;162}163export function isReadableStreamLocked<OutputType>(stream: SDReadableStream<OutputType>) {164 return stream[reader_] !== undefined;165}166export function readableStreamGetNumReadIntoRequests<OutputType>(stream: SDReadableStream<OutputType>) {167 const reader = stream[reader_] as SDReadableStreamBYOBReader;168 if (reader === undefined) {169 return 0;170 }171 return reader[readIntoRequests_].length;172}173export function readableStreamGetNumReadRequests<OutputType>(stream: SDReadableStream<OutputType>) {174 const reader = stream[reader_] as SDReadableStreamDefaultReader<OutputType>;175 if (reader === undefined) {176 return 0;177 }178 return reader[readRequests_].length;179}180export function readableStreamCreateReadResult<T>(value: T, done: boolean, forAuthorCode: boolean): IteratorResult<T> {181 const prototype = forAuthorCode ? Object.prototype : null;182 const result = Object.create(prototype);183 result.value = value;184 result.done = done;185 return result;186}187export function readableStreamAddReadIntoRequest(stream: SDReadableStream<ArrayBufferView>, forAuthorCode: boolean) {188 // Assert: ! IsReadableStreamBYOBReader(stream.[[reader]]) is true.189 // Assert: stream.[[state]] is "readable" or "closed".190 const reader = stream[reader_] as SDReadableStreamBYOBReader;191 const conProm = shared.createControlledPromise<IteratorResult<ArrayBufferView>>() as ReadRequest<IteratorResult<ArrayBufferView>>;192 conProm.forAuthorCode = forAuthorCode;193 reader[readIntoRequests_].push(conProm);194 return conProm.promise;195}196export function readableStreamAddReadRequest<OutputType>(stream: SDReadableStream<OutputType>, forAuthorCode: boolean) {197 // Assert: ! IsReadableStreamDefaultReader(stream.[[reader]]) is true.198 // Assert: stream.[[state]] is "readable".199 const reader = stream[reader_] as SDReadableStreamDefaultReader<OutputType>;200 const conProm = shared.createControlledPromise<IteratorResult<OutputType>>() as ReadRequest<IteratorResult<OutputType>>;201 conProm.forAuthorCode = forAuthorCode;202 reader[readRequests_].push(conProm);203 return conProm.promise;204}205export function readableStreamHasBYOBReader<OutputType>(stream: SDReadableStream<OutputType>) {206 const reader = stream[reader_];207 return isReadableStreamBYOBReader(reader);208}209export function readableStreamHasDefaultReader<OutputType>(stream: SDReadableStream<OutputType>) {210 const reader = stream[reader_];211 return isReadableStreamDefaultReader(reader);212}213export function readableStreamCancel<OutputType>(stream: SDReadableStream<OutputType>, reason: shared.ErrorResult) {214 if (stream[shared.state_] === "closed") {215 return Promise.resolve(undefined);216 }217 if (stream[shared.state_] === "errored") {218 return Promise.reject(stream[shared.storedError_]);219 }220 readableStreamClose(stream);221 const sourceCancelPromise = stream[readableStreamController_][cancelSteps_](reason);222 return sourceCancelPromise.then(_ => undefined);223}224export function readableStreamClose<OutputType>(stream: SDReadableStream<OutputType>) {225 // Assert: stream.[[state]] is "readable".226 stream[shared.state_] = "closed";227 const reader = stream[reader_];228 if (reader === undefined) {229 return;230 }231 reader[closedPromise_].resolve();232 reader[closedPromise_].promise.catch(() => {});233 if (isReadableStreamDefaultReader(reader)) {234 for (const readRequest of reader[readRequests_]) {235 readRequest.resolve(readableStreamCreateReadResult(undefined, true, readRequest.forAuthorCode));236 }237 reader[readRequests_] = [];238 }239}240export function readableStreamError<OutputType>(stream: SDReadableStream<OutputType>, error: shared.ErrorResult) {241 if (stream[shared.state_] !== "readable") {242 throw new RangeError("Stream is in an invalid state");243 }244 stream[shared.state_] = "errored";245 stream[shared.storedError_] = error;246 const reader = stream[reader_];247 if (reader === undefined) {248 return;249 }250 reader[closedPromise_].reject(error);251 if (isReadableStreamDefaultReader(reader)) {252 for (const readRequest of reader[readRequests_]) {253 readRequest.reject(error);254 }255 reader[readRequests_] = [];256 }257 else {258 // Assert: IsReadableStreamBYOBReader(reader).259 const readIntoRequests = (reader as SDReadableStreamBYOBReader)[readIntoRequests_];260 for (const readIntoRequest of readIntoRequests) {261 readIntoRequest.reject(error);262 }263 (reader as SDReadableStreamBYOBReader)[readIntoRequests_] = [];264 }265}266// ---- Readers267export function isReadableStreamDefaultReader(reader: unknown): reader is SDReadableStreamDefaultReader<any> {268 if (typeof reader !== "object" || reader === null) {269 return false;270 }271 return readRequests_ in reader;272}273export function isReadableStreamBYOBReader(reader: unknown): reader is SDReadableStreamBYOBReader {274 if (typeof reader !== "object" || reader === null) {275 return false;276 }277 return readIntoRequests_ in reader;278}279export function readableStreamReaderGenericInitialize<OutputType>(reader: SDReadableStreamReader<OutputType>, stream: SDReadableStream<OutputType>) {280 reader[ownerReadableStream_] = stream;281 stream[reader_] = reader;282 const streamState = stream[shared.state_];283 reader[closedPromise_] = shared.createControlledPromise<void>();284 if (streamState === "readable") {285 // leave as is286 }287 else if (streamState === "closed") {288 reader[closedPromise_].resolve(undefined);289 }290 else {291 reader[closedPromise_].reject(stream[shared.storedError_]);292 reader[closedPromise_].promise.catch(() => {});293 }294}295export function readableStreamReaderGenericRelease<OutputType>(reader: SDReadableStreamReader<OutputType>) {296 // Assert: reader.[[ownerReadableStream]] is not undefined.297 // Assert: reader.[[ownerReadableStream]].[[reader]] is reader.298 const stream = reader[ownerReadableStream_];299 if (stream === undefined) {300 throw new TypeError("Reader is in an inconsistent state");301 }302 if (stream[shared.state_] === "readable") {303 // code moved out304 }305 else {306 reader[closedPromise_] = shared.createControlledPromise<void>();307 }308 reader[closedPromise_].reject(new TypeError());309 reader[closedPromise_].promise.catch(() => {});310 stream[reader_] = undefined;311 reader[ownerReadableStream_] = undefined;312}313export function readableStreamBYOBReaderRead(reader: SDReadableStreamBYOBReader, view: ArrayBufferView, forAuthorCode = false) {314 const stream = reader[ownerReadableStream_]!;315 // Assert: stream is not undefined.316 317 if (stream[shared.state_] === "errored") {318 return Promise.reject(stream[shared.storedError_]);319 }320 return readableByteStreamControllerPullInto(stream[readableStreamController_] as SDReadableByteStreamController, view, forAuthorCode);321}322export function readableStreamDefaultReaderRead<OutputType>(reader: SDReadableStreamDefaultReader<OutputType>, forAuthorCode = false): Promise<IteratorResult<OutputType | undefined>> {323 const stream = reader[ownerReadableStream_]!;324 // Assert: stream is not undefined.325 if (stream[shared.state_] === "closed") {326 return Promise.resolve(readableStreamCreateReadResult(undefined, true, forAuthorCode));327 }328 if (stream[shared.state_] === "errored") {329 return Promise.reject(stream[shared.storedError_]);330 }331 // Assert: stream.[[state]] is "readable".332 return stream[readableStreamController_][pullSteps_](forAuthorCode);333}334export function readableStreamFulfillReadIntoRequest<OutputType>(stream: SDReadableStream<OutputType>, chunk: ArrayBufferView, done: boolean) {335 const reader = stream[reader_] as SDReadableStreamBYOBReader;336 const readIntoRequest = reader[readIntoRequests_].shift()!; // <-- length check done in caller337 readIntoRequest.resolve(readableStreamCreateReadResult(chunk, done, readIntoRequest.forAuthorCode));338}339export function readableStreamFulfillReadRequest<OutputType>(stream: SDReadableStream<OutputType>, chunk: OutputType, done: boolean) {340 const reader = stream[reader_] as SDReadableStreamDefaultReader<OutputType>;341 const readRequest = reader[readRequests_].shift()!; // <-- length check done in caller342 readRequest.resolve(readableStreamCreateReadResult(chunk, done, readRequest.forAuthorCode));343}344// ---- DefaultController345export function setUpReadableStreamDefaultController<OutputType>(stream: SDReadableStream<OutputType>, controller: SDReadableStreamDefaultController<OutputType>, startAlgorithm: StartAlgorithm, pullAlgorithm: PullAlgorithm<OutputType>, cancelAlgorithm: CancelAlgorithm, highWaterMark: number, sizeAlgorithm: QueuingStrategySize<OutputType>) {346 // Assert: stream.[[readableStreamController]] is undefined.347 controller[controlledReadableStream_] = stream;348 q.resetQueue(controller);349 controller[started_] = false;350 controller[closeRequested_] = false;351 controller[pullAgain_] = false;352 controller[pulling_] = false;353 controller[strategySizeAlgorithm_] = sizeAlgorithm;354 controller[strategyHWM_] = highWaterMark;355 controller[pullAlgorithm_] = pullAlgorithm;356 controller[cancelAlgorithm_] = cancelAlgorithm;357 stream[readableStreamController_] = controller;358 const startResult = startAlgorithm();359 Promise.resolve(startResult).then(360 _ => {361 controller[started_] = true;362 // Assert: controller.[[pulling]] is false.363 // Assert: controller.[[pullAgain]] is false.364 readableStreamDefaultControllerCallPullIfNeeded(controller);365 },366 error => {367 readableStreamDefaultControllerError(controller, error);368 }369 );370}371export function isReadableStreamDefaultController(value: unknown): value is SDReadableStreamDefaultController<any> {372 if (typeof value !== "object" || value === null) {373 return false;374 }375 return controlledReadableStream_ in value;376}377export function readableStreamDefaultControllerHasBackpressure<OutputType>(controller: SDReadableStreamDefaultController<OutputType>) {378 return ! readableStreamDefaultControllerShouldCallPull(controller);379}380export function readableStreamDefaultControllerCanCloseOrEnqueue<OutputType>(controller: SDReadableStreamDefaultController<OutputType>) {381 const state = controller[controlledReadableStream_][shared.state_];382 return controller[closeRequested_] === false && state === "readable";383}384export function readableStreamDefaultControllerGetDesiredSize<OutputType>(controller: SDReadableStreamDefaultController<OutputType>) {385 const state = controller[controlledReadableStream_][shared.state_];386 if (state === "errored") {387 return null;388 }389 if (state === "closed") {390 return 0;391 }392 return controller[strategyHWM_] - controller[q.queueTotalSize_];393}394export function readableStreamDefaultControllerClose<OutputType>(controller: SDReadableStreamDefaultController<OutputType>) {395 // Assert: !ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is true.396 controller[closeRequested_] = true;397 const stream = controller[controlledReadableStream_];398 if (controller[q.queue_].length === 0) {399 readableStreamDefaultControllerClearAlgorithms(controller);400 readableStreamClose(stream);401 }402}403export function readableStreamDefaultControllerEnqueue<OutputType>(controller: SDReadableStreamDefaultController<OutputType>, chunk: OutputType) {404 const stream = controller[controlledReadableStream_];405 // Assert: !ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is true.406 if (isReadableStreamLocked(stream) && readableStreamGetNumReadRequests(stream) > 0) {407 readableStreamFulfillReadRequest(stream, chunk, false);408 }409 else {410 // Let result be the result of performing controller.[[strategySizeAlgorithm]], passing in chunk,411 // and interpreting the result as an ECMAScript completion value.412 // impl note: assuming that in JS land this just means try/catch with rethrow413 let chunkSize: number;414 try {415 chunkSize = controller[strategySizeAlgorithm_](chunk);416 }417 catch (error) {418 readableStreamDefaultControllerError(controller, error);419 throw error;420 }421 try {422 q.enqueueValueWithSize(controller, chunk, chunkSize);423 }424 catch (error) {425 readableStreamDefaultControllerError(controller, error);426 throw error;427 }428 }429 readableStreamDefaultControllerCallPullIfNeeded(controller);430}431export function readableStreamDefaultControllerError<OutputType>(controller: SDReadableStreamDefaultController<OutputType>, error: shared.ErrorResult) {432 const stream = controller[controlledReadableStream_];433 if (stream[shared.state_] !== "readable") {434 return;435 }436 q.resetQueue(controller);437 readableStreamDefaultControllerClearAlgorithms(controller);438 readableStreamError(stream, error);439}440export function readableStreamDefaultControllerCallPullIfNeeded<OutputType>(controller: SDReadableStreamDefaultController<OutputType>) {441 if (! readableStreamDefaultControllerShouldCallPull(controller)) {442 return;443 }444 if (controller[pulling_]) {445 controller[pullAgain_] = true;446 return;447 }448 if (controller[pullAgain_]) {449 throw new RangeError("Stream controller is in an invalid state.");450 }451 controller[pulling_] = true;452 controller[pullAlgorithm_](controller).then(453 _ => {454 controller[pulling_] = false;455 if (controller[pullAgain_]) {456 controller[pullAgain_] = false;457 readableStreamDefaultControllerCallPullIfNeeded(controller);458 }459 },460 error => {461 readableStreamDefaultControllerError(controller, error);462 }463 );464}465export function readableStreamDefaultControllerShouldCallPull<OutputType>(controller: SDReadableStreamDefaultController<OutputType>) {466 const stream = controller[controlledReadableStream_];467 if (! readableStreamDefaultControllerCanCloseOrEnqueue(controller)) {468 return false;469 }470 if (controller[started_] === false) {471 return false;472 }473 if (isReadableStreamLocked(stream) && readableStreamGetNumReadRequests(stream) > 0) {474 return true;475 }476 const desiredSize = readableStreamDefaultControllerGetDesiredSize(controller);477 if (desiredSize === null) {478 throw new RangeError("Stream is in an invalid state.");479 }480 return desiredSize > 0;481}482export function readableStreamDefaultControllerClearAlgorithms<OutputType>(controller: SDReadableStreamDefaultController<OutputType>) {483 controller[pullAlgorithm_] = undefined!;484 controller[cancelAlgorithm_] = undefined!;485 controller[strategySizeAlgorithm_] = undefined!;486}487// ---- BYOBController488export function setUpReadableByteStreamController(stream: SDReadableStream<ArrayBufferView>, controller: SDReadableByteStreamController, startAlgorithm: StartAlgorithm, pullAlgorithm: PullAlgorithm<ArrayBufferView>, cancelAlgorithm: CancelAlgorithm, highWaterMark: number, autoAllocateChunkSize: number | undefined) {489 // Assert: stream.[[readableStreamController]] is undefined.490 if (stream[readableStreamController_] !== undefined) {491 throw new TypeError("Cannot reuse streams");492 }493 if (autoAllocateChunkSize !== undefined) {494 if (! shared.isInteger(autoAllocateChunkSize) || autoAllocateChunkSize <= 0) {495 throw new RangeError("autoAllocateChunkSize must be a positive, finite integer");496 }497 }498 // Set controller.[[controlledReadableByteStream]] to stream.499 controller[controlledReadableByteStream_] = stream;500 // Set controller.[[pullAgain]] and controller.[[pulling]] to false.501 controller[pullAgain_] = false;502 controller[pulling_] = false;503 readableByteStreamControllerClearPendingPullIntos(controller);504 q.resetQueue(controller);505 controller[closeRequested_] = false;506 controller[started_] = false;507 controller[strategyHWM_] = shared.validateAndNormalizeHighWaterMark(highWaterMark);508 controller[pullAlgorithm_] = pullAlgorithm;509 controller[cancelAlgorithm_] = cancelAlgorithm;510 controller[autoAllocateChunkSize_] = autoAllocateChunkSize;511 controller[pendingPullIntos_] = [];512 stream[readableStreamController_] = controller;513 // Let startResult be the result of performing startAlgorithm.514 const startResult = startAlgorithm();515 Promise.resolve(startResult).then(516 _ => {517 controller[started_] = true;518 // Assert: controller.[[pulling]] is false.519 // Assert: controller.[[pullAgain]] is false.520 readableByteStreamControllerCallPullIfNeeded(controller);521 },522 error => {523 readableByteStreamControllerError(controller, error);524 }525 );526}527export function isReadableStreamBYOBRequest(value: unknown): value is SDReadableStreamBYOBRequest {528 if (typeof value !== "object" || value === null) {529 return false;530 }531 return associatedReadableByteStreamController_ in value;532}533export function isReadableByteStreamController(value: unknown): value is SDReadableByteStreamController {534 if (typeof value !== "object" || value === null) {535 return false;536 }537 return controlledReadableByteStream_ in value;538}539export function readableByteStreamControllerCallPullIfNeeded(controller: SDReadableByteStreamController) {540 if (! readableByteStreamControllerShouldCallPull(controller)) {541 return;542 }543 if (controller[pulling_]) {544 controller[pullAgain_] = true;545 return;546 }547 // Assert: controller.[[pullAgain]] is false.548 controller[pulling_] = true;549 controller[pullAlgorithm_](controller).then(550 _ => {551 controller[pulling_] = false;552 if (controller[pullAgain_]) {553 controller[pullAgain_] = false;554 readableByteStreamControllerCallPullIfNeeded(controller);555 }556 },557 error => {558 readableByteStreamControllerError(controller, error);559 }560 );561}562export function readableByteStreamControllerClearAlgorithms(controller: SDReadableByteStreamController) {563 controller[pullAlgorithm_] = undefined!;564 controller[cancelAlgorithm_] = undefined!;565}566export function readableByteStreamControllerClearPendingPullIntos(controller: SDReadableByteStreamController) {567 readableByteStreamControllerInvalidateBYOBRequest(controller);568 controller[pendingPullIntos_] = [];569}570export function readableByteStreamControllerClose(controller: SDReadableByteStreamController) {571 const stream = controller[controlledReadableByteStream_];572 // Assert: controller.[[closeRequested]] is false.573 // Assert: stream.[[state]] is "readable".574 if (controller[q.queueTotalSize_] > 0) {575 controller[closeRequested_] = true;576 return;577 }578 if (controller[pendingPullIntos_].length > 0) {579 const firstPendingPullInto = controller[pendingPullIntos_][0];580 if (firstPendingPullInto.bytesFilled > 0) {581 const error = new TypeError();582 readableByteStreamControllerError(controller, error);583 throw error;584 }585 }586 readableByteStreamControllerClearAlgorithms(controller);587 readableStreamClose(stream);588}589export function readableByteStreamControllerCommitPullIntoDescriptor(stream: SDReadableStream<ArrayBufferView>, pullIntoDescriptor: PullIntoDescriptor) {590 // Assert: stream.[[state]] is not "errored".591 let done = false;592 if (stream[shared.state_] === "closed") {593 // Assert: pullIntoDescriptor.[[bytesFilled]] is 0.594 done = true;595 }596 const filledView = readableByteStreamControllerConvertPullIntoDescriptor(pullIntoDescriptor);597 if (pullIntoDescriptor.readerType === "default") {598 readableStreamFulfillReadRequest(stream, filledView, done);599 }600 else {601 // Assert: pullIntoDescriptor.[[readerType]] is "byob".602 readableStreamFulfillReadIntoRequest(stream, filledView, done);603 }604}605export function readableByteStreamControllerConvertPullIntoDescriptor(pullIntoDescriptor: PullIntoDescriptor) {606 const { bytesFilled, elementSize } = pullIntoDescriptor;607 // Assert: bytesFilled <= pullIntoDescriptor.byteLength608 // Assert: bytesFilled mod elementSize is 0609 const buffer = shared.transferArrayBuffer(pullIntoDescriptor.buffer);610 return new pullIntoDescriptor.ctor(buffer, pullIntoDescriptor.byteOffset, bytesFilled / elementSize);611}612export function readableByteStreamControllerEnqueue(controller: SDReadableByteStreamController, chunk: ArrayBufferView) {613 const stream = controller[controlledReadableByteStream_];614 // Assert: controller.[[closeRequested]] is false.615 // Assert: stream.[[state]] is "readable".616 const { buffer, byteOffset, byteLength } = chunk;617 618 // If buffer is detached, throw a TypeError619 const transferredBuffer = shared.transferArrayBuffer(buffer);620 if (controller[pendingPullIntos_].length > 0) {621 const firstPendingPullInto = controller[pendingPullIntos_][0];622 // If firstPendingPullInto.buffer is detached throw a TypeError623 firstPendingPullInto.buffer = shared.transferArrayBuffer(firstPendingPullInto.buffer);624 }625 readableByteStreamControllerInvalidateBYOBRequest(controller);626 if (readableStreamHasDefaultReader(stream)) {627 if (readableStreamGetNumReadRequests(stream) === 0) {628 readableByteStreamControllerEnqueueChunkToQueue(controller, transferredBuffer, byteOffset, byteLength);629 }630 else {631 // Assert: controller.[[queue]] is empty.632 const transferredView = new Uint8Array(transferredBuffer, byteOffset, byteLength);633 readableStreamFulfillReadRequest(stream, transferredView, false);634 }635 }636 else if (readableStreamHasBYOBReader(stream)) {637 readableByteStreamControllerEnqueueChunkToQueue(controller, transferredBuffer, byteOffset, byteLength);638 readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller);639 }640 else {641 // Assert: !IsReadableStreamLocked(stream) is false.642 readableByteStreamControllerEnqueueChunkToQueue(controller, transferredBuffer, byteOffset, byteLength);643 }644 readableByteStreamControllerCallPullIfNeeded(controller);645}646export function readableByteStreamControllerEnqueueChunkToQueue(controller: SDReadableByteStreamController, buffer: ArrayBufferLike, byteOffset: number, byteLength: number) {647 controller[q.queue_].push({ buffer, byteOffset, byteLength });648 controller[q.queueTotalSize_] += byteLength;649}650export function readableByteStreamControllerError(controller: SDReadableByteStreamController, error: shared.ErrorResult) {651 const stream = controller[controlledReadableByteStream_];652 if (stream[shared.state_] !== "readable") {653 return;654 }655 readableByteStreamControllerClearPendingPullIntos(controller);656 q.resetQueue(controller);657 readableByteStreamControllerClearAlgorithms(controller);658 readableStreamError(stream, error);659}660export function readableByteStreamControllerFillHeadPullIntoDescriptor(_controller: SDReadableByteStreamController, size: number, pullIntoDescriptor: PullIntoDescriptor) {661 // Assert: either controller.[[pendingPullIntos]] is empty, or the first element of controller.[[pendingPullIntos]] is pullIntoDescriptor.662 // Assert: controller.[[byobRequest]] is null663 pullIntoDescriptor.bytesFilled += size;664}665export function readableByteStreamControllerFillPullIntoDescriptorFromQueue(controller: SDReadableByteStreamController, pullIntoDescriptor: PullIntoDescriptor) {666 const elementSize = pullIntoDescriptor.elementSize;667 const currentAlignedBytes = pullIntoDescriptor.bytesFilled - (pullIntoDescriptor.bytesFilled % elementSize);668 const maxBytesToCopy = Math.min(controller[q.queueTotalSize_], pullIntoDescriptor.byteLength - pullIntoDescriptor.bytesFilled);669 const maxBytesFilled = pullIntoDescriptor.bytesFilled + maxBytesToCopy;670 const maxAlignedBytes = maxBytesFilled - (maxBytesFilled % elementSize);671 let totalBytesToCopyRemaining = maxBytesToCopy;672 let ready = false;673 if (maxAlignedBytes > currentAlignedBytes) {674 totalBytesToCopyRemaining = maxAlignedBytes - pullIntoDescriptor.bytesFilled;675 ready = true;676 }677 const queue = controller[q.queue_];678 while (totalBytesToCopyRemaining > 0) {679 const headOfQueue = queue.front()!;680 const bytesToCopy = Math.min(totalBytesToCopyRemaining, headOfQueue.byteLength);681 const destStart = pullIntoDescriptor.byteOffset + pullIntoDescriptor.bytesFilled;682 shared.copyDataBlockBytes(pullIntoDescriptor.buffer, destStart, headOfQueue.buffer, headOfQueue.byteOffset, bytesToCopy);683 if (headOfQueue.byteLength === bytesToCopy) {684 queue.shift();685 }686 else {687 headOfQueue.byteOffset += bytesToCopy;688 headOfQueue.byteLength -= bytesToCopy;689 }690 controller[q.queueTotalSize_] -= bytesToCopy;691 readableByteStreamControllerFillHeadPullIntoDescriptor(controller, bytesToCopy, pullIntoDescriptor);692 totalBytesToCopyRemaining -= bytesToCopy;693 }694 if (! ready) {695 // Assert: controller[queueTotalSize_] === 0696 // Assert: pullIntoDescriptor.bytesFilled > 0697 // Assert: pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize698 }699 return ready;700}701export function readableByteStreamControllerGetDesiredSize(controller: SDReadableByteStreamController) {702 const stream = controller[controlledReadableByteStream_];703 const state = stream[shared.state_];704 if (state === "errored") {705 return null;706 }707 if (state === "closed") {708 return 0;709 }710 return controller[strategyHWM_] - controller[q.queueTotalSize_];711}712export function readableByteStreamControllerHandleQueueDrain(controller: SDReadableByteStreamController) {713 // Assert: controller.[[controlledReadableByteStream]].[[state]] is "readable".714 if (controller[q.queueTotalSize_] === 0 && controller[closeRequested_]) {715 readableByteStreamControllerClearAlgorithms(controller);716 readableStreamClose(controller[controlledReadableByteStream_]);717 }718 else {719 readableByteStreamControllerCallPullIfNeeded(controller);720 }721}722export function readableByteStreamControllerInvalidateBYOBRequest(controller: SDReadableByteStreamController) {723 const byobRequest = controller[byobRequest_];724 if (byobRequest === undefined) {725 return;726 }727 byobRequest[associatedReadableByteStreamController_] = undefined;728 byobRequest[view_] = undefined;729 controller[byobRequest_] = undefined;730}731export function readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller: SDReadableByteStreamController) {732 // Assert: controller.[[closeRequested]] is false.733 const pendingPullIntos = controller[pendingPullIntos_];734 while (pendingPullIntos.length > 0) {735 if (controller[q.queueTotalSize_] === 0) {736 return;737 }738 const pullIntoDescriptor = pendingPullIntos[0];739 if (readableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, pullIntoDescriptor)) {740 readableByteStreamControllerShiftPendingPullInto(controller);741 readableByteStreamControllerCommitPullIntoDescriptor(controller[controlledReadableByteStream_], pullIntoDescriptor);742 }743 }744}745export function readableByteStreamControllerPullInto(controller: SDReadableByteStreamController, view: ArrayBufferView, forAuthorCode: boolean) {746 const stream = controller[controlledReadableByteStream_];747 const elementSize = (view as Uint8Array).BYTES_PER_ELEMENT || 1; // DataView exposes this in Webkit as 1, is not present in FF or Blink748 const ctor = view.constructor as Uint8ArrayConstructor; // the typecast here is just for TS typing, it does not influence buffer creation749 const byteOffset = view.byteOffset;750 const byteLength = view.byteLength;751 // In Ref Impl: this can fail (detached) and needs to result in byobReader.read failure752 let buffer = shared.transferArrayBuffer(view.buffer);753 const pullIntoDescriptor: PullIntoDescriptor = { buffer, bufferByteLength: buffer.byteLength, byteOffset, byteLength, bytesFilled: 0, elementSize, ctor, readerType: "byob" };754 if (controller[pendingPullIntos_].length > 0) {755 controller[pendingPullIntos_].push(pullIntoDescriptor);756 return readableStreamAddReadIntoRequest(stream, forAuthorCode);757 }758 if (stream[shared.state_] === "closed") {759 const emptyView = new ctor(pullIntoDescriptor.buffer, pullIntoDescriptor.byteOffset, 0);760 return Promise.resolve(readableStreamCreateReadResult(emptyView, true, forAuthorCode));761 }762 if (controller[q.queueTotalSize_] > 0) {763 if (readableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, pullIntoDescriptor)) {764 const filledView = readableByteStreamControllerConvertPullIntoDescriptor(pullIntoDescriptor);765 readableByteStreamControllerHandleQueueDrain(controller);766 return Promise.resolve(readableStreamCreateReadResult(filledView, false, forAuthorCode));767 }768 if (controller[closeRequested_]) {769 const error = new TypeError();770 readableByteStreamControllerError(controller, error);771 return Promise.reject(error);772 }773 }774 controller[pendingPullIntos_].push(pullIntoDescriptor);775 const promise = readableStreamAddReadIntoRequest(stream, forAuthorCode);776 readableByteStreamControllerCallPullIfNeeded(controller);777 return promise;778}779export function readableByteStreamControllerRespond(controller: SDReadableByteStreamController, bytesWritten: number) {780 bytesWritten = Number(bytesWritten);781 if (! shared.isFiniteNonNegativeNumber(bytesWritten)) {782 throw new RangeError("bytesWritten must be a finite, non-negative number");783 }784 // Assert: controller.[[pendingPullIntos]] is not empty.785 const firstDescriptor = controller[pendingPullIntos_][0];786 const state = controller[controlledReadableByteStream_][shared.state_];787 if (state === "closed") {788 if (bytesWritten !== 0) {789 throw new TypeError('bytesWritten must be 0 when calling respond() on a closed stream');790 }791 }792 else {793 // Assert: state === "readable"794 if (firstDescriptor.bytesFilled + bytesWritten > firstDescriptor.byteLength) {795 throw new RangeError("bytesWritten out of range");796 } 797 }798 firstDescriptor.buffer = shared.transferArrayBuffer(firstDescriptor.buffer);799 readableByteStreamControllerRespondInternal(controller, bytesWritten);800}801export function readableByteStreamControllerRespondInClosedState(controller: SDReadableByteStreamController, _firstDescriptor: PullIntoDescriptor) {802 // Assert: firstDescriptor.[[bytesFilled]] is 0.803 const stream = controller[controlledReadableByteStream_];804 if (readableStreamHasBYOBReader(stream)) {805 while (readableStreamGetNumReadIntoRequests(stream) > 0) {806 const pullIntoDescriptor = readableByteStreamControllerShiftPendingPullInto(controller)!;807 readableByteStreamControllerCommitPullIntoDescriptor(stream, pullIntoDescriptor);808 }809 }810}811export function readableByteStreamControllerRespondInReadableState(controller: SDReadableByteStreamController, bytesWritten: number, pullIntoDescriptor: PullIntoDescriptor) {812 // Assert: pullIntoDescriptor.[[bytesFilled]] + bytesWritten <= pullIntoDescriptor.[[byteLength]]813 readableByteStreamControllerFillHeadPullIntoDescriptor(controller, bytesWritten, pullIntoDescriptor);814 if (pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize) {815 return;816 }817 readableByteStreamControllerShiftPendingPullInto(controller);818 const remainderSize = pullIntoDescriptor.bytesFilled % pullIntoDescriptor.elementSize;819 if (remainderSize > 0) {820 const end = pullIntoDescriptor.byteOffset + pullIntoDescriptor.bytesFilled;821 const remainder = shared.cloneArrayBuffer(pullIntoDescriptor.buffer, end - remainderSize, remainderSize, ArrayBuffer);822 readableByteStreamControllerEnqueueChunkToQueue(controller, remainder, 0, remainder.byteLength);823 }824 pullIntoDescriptor.bytesFilled -= remainderSize;825 readableByteStreamControllerCommitPullIntoDescriptor(controller[controlledReadableByteStream_], pullIntoDescriptor);826 readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller);827}828export function readableByteStreamControllerRespondInternal(controller: SDReadableByteStreamController, bytesWritten: number) {829 const firstDescriptor = controller[pendingPullIntos_][0];830 // Assert: canTransferArrayBuffer(firstDescriptor.buffer)831 readableByteStreamControllerInvalidateBYOBRequest(controller);832 const stream = controller[controlledReadableByteStream_];833 if (stream[shared.state_] === "closed") {834 // Assert: bytesWritten === 0835 readableByteStreamControllerRespondInClosedState(controller, firstDescriptor);836 }837 else {838 // Assert: stream.[[state]] is "readable".839 // Assert: bytesWritten > 0840 readableByteStreamControllerRespondInReadableState(controller, bytesWritten, firstDescriptor);841 }842 readableByteStreamControllerCallPullIfNeeded(controller);843}844export function readableByteStreamControllerRespondWithNewView(controller: SDReadableByteStreamController, view: ArrayBufferView) {845 // Assert: controller.[[pendingPullIntos]] is not empty.846 // Assert: view.buffer is not detached847 const firstDescriptor = controller[pendingPullIntos_][0];848 const state = controller[controlledReadableByteStream_][shared.state_];849 if (state === "closed") {850 if (view.byteLength !== 0) {851 throw new TypeError("The view's length must be 0 when calling respondWithNewView() on a closed stream");852 }853 }854 else {855 // Assert: state === "readable"856 if (view.byteLength === 0) {857 throw new RangeError("The view's length must be greater than 0 when calling respondWithNewView() on a readable stream");858 } 859 }860 if (firstDescriptor.byteOffset + firstDescriptor.bytesFilled !== view.byteOffset) {861 throw new RangeError("The region specified by view does not match byobRequest");862 } 863 if (firstDescriptor.bufferByteLength !== view.buffer.byteLength) {864 throw new RangeError("The buffer of view has different capacity than byobRequest");865 }866 firstDescriptor.buffer = shared.transferArrayBuffer(view.buffer);867 readableByteStreamControllerRespondInternal(controller, view.byteLength);868}869export function readableByteStreamControllerShiftPendingPullInto(controller: SDReadableByteStreamController) {870 // Assert: controller.[[byobRequest]] is null871 const descriptor = controller[pendingPullIntos_].shift();872 return descriptor;873}874export function readableByteStreamControllerShouldCallPull(controller: SDReadableByteStreamController) {875 // Let stream be controller.[[controlledReadableByteStream]].876 const stream = controller[controlledReadableByteStream_];877 if (stream[shared.state_] !== "readable") {878 return false;879 }880 if (controller[closeRequested_]) {881 return false;882 }883 if (! controller[started_]) {884 return false;885 }886 if (readableStreamHasDefaultReader(stream) && readableStreamGetNumReadRequests(stream) > 0) {887 return true;888 }889 if (readableStreamHasBYOBReader(stream) && readableStreamGetNumReadIntoRequests(stream) > 0) {890 return true;891 }892 const desiredSize = readableByteStreamControllerGetDesiredSize(controller);893 // Assert: desiredSize is not null.894 return desiredSize! > 0;895}896export function setUpReadableStreamBYOBRequest(request: SDReadableStreamBYOBRequest, controller: SDReadableByteStreamController, view: ArrayBufferView) {897 if (! isReadableByteStreamController(controller)) {898 throw new TypeError();899 }900 if (! ArrayBuffer.isView(view)) {901 throw new TypeError();902 }903 // Assert: !IsDetachedBuffer(view.[[ViewedArrayBuffer]]) is false.904 request[associatedReadableByteStreamController_] = controller;905 request[view_] = view;...
byte-stream-controller.ts
Source:byte-stream-controller.ts
...430 headOfQueue.byteOffset += bytesToCopy;431 headOfQueue.byteLength -= bytesToCopy;432 }433 controller._queueTotalSize -= bytesToCopy;434 ReadableByteStreamControllerFillHeadPullIntoDescriptor(controller, bytesToCopy, pullIntoDescriptor);435 totalBytesToCopyRemaining -= bytesToCopy;436 }437 if (!ready) {438 assert(controller._queueTotalSize === 0);439 assert(pullIntoDescriptor.bytesFilled > 0);440 assert(pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize);441 }442 return ready;443}444function ReadableByteStreamControllerFillHeadPullIntoDescriptor(controller: ReadableByteStreamController,445 size: number,446 pullIntoDescriptor: PullIntoDescriptor) {447 assert(controller._pendingPullIntos.length === 0 || controller._pendingPullIntos.peek() === pullIntoDescriptor);448 ReadableByteStreamControllerInvalidateBYOBRequest(controller);449 pullIntoDescriptor.bytesFilled += size;450}451function ReadableByteStreamControllerHandleQueueDrain(controller: ReadableByteStreamController) {452 assert(controller._controlledReadableByteStream._state === 'readable');453 if (controller._queueTotalSize === 0 && controller._closeRequested) {454 ReadableByteStreamControllerClearAlgorithms(controller);455 ReadableStreamClose(controller._controlledReadableByteStream);456 } else {457 ReadableByteStreamControllerCallPullIfNeeded(controller);458 }459}460function ReadableByteStreamControllerInvalidateBYOBRequest(controller: ReadableByteStreamController) {461 if (controller._byobRequest === null) {462 return;463 }464 controller._byobRequest._associatedReadableByteStreamController = undefined!;465 controller._byobRequest._view = null!;466 controller._byobRequest = null;467}468function ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller: ReadableByteStreamController) {469 assert(!controller._closeRequested);470 while (controller._pendingPullIntos.length > 0) {471 if (controller._queueTotalSize === 0) {472 return;473 }474 const pullIntoDescriptor = controller._pendingPullIntos.peek();475 if (ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, pullIntoDescriptor)) {476 ReadableByteStreamControllerShiftPendingPullInto(controller);477 ReadableByteStreamControllerCommitPullIntoDescriptor(478 controller._controlledReadableByteStream,479 pullIntoDescriptor480 );481 }482 }483}484export function ReadableByteStreamControllerPullInto<T extends ArrayBufferView>(485 controller: ReadableByteStreamController,486 view: T,487 readIntoRequest: ReadIntoRequest<T>488): void {489 const stream = controller._controlledReadableByteStream;490 let elementSize = 1;491 if (view.constructor !== DataView) {492 elementSize = (view.constructor as ArrayBufferViewConstructor<T>).BYTES_PER_ELEMENT;493 }494 const ctor = view.constructor as ArrayBufferViewConstructor<T>;495 const buffer = TransferArrayBuffer(view.buffer);496 const pullIntoDescriptor: BYOBPullIntoDescriptor<T> = {497 buffer,498 byteOffset: view.byteOffset,499 byteLength: view.byteLength,500 bytesFilled: 0,501 elementSize,502 viewConstructor: ctor,503 readerType: 'byob'504 };505 if (controller._pendingPullIntos.length > 0) {506 controller._pendingPullIntos.push(pullIntoDescriptor);507 // No ReadableByteStreamControllerCallPullIfNeeded() call since:508 // - No change happens on desiredSize509 // - The source has already been notified of that there's at least 1 pending read(view)510 ReadableStreamAddReadIntoRequest(stream, readIntoRequest);511 return;512 }513 if (stream._state === 'closed') {514 const emptyView = new ctor(pullIntoDescriptor.buffer, pullIntoDescriptor.byteOffset, 0);515 readIntoRequest._closeSteps(emptyView);516 return;517 }518 if (controller._queueTotalSize > 0) {519 if (ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, pullIntoDescriptor)) {520 const filledView = ReadableByteStreamControllerConvertPullIntoDescriptor<T>(pullIntoDescriptor);521 ReadableByteStreamControllerHandleQueueDrain(controller);522 readIntoRequest._chunkSteps(filledView);523 return;524 }525 if (controller._closeRequested) {526 const e = new TypeError('Insufficient bytes to fill elements in the given buffer');527 ReadableByteStreamControllerError(controller, e);528 readIntoRequest._errorSteps(e);529 return;530 }531 }532 controller._pendingPullIntos.push(pullIntoDescriptor);533 ReadableStreamAddReadIntoRequest<T>(stream, readIntoRequest);534 ReadableByteStreamControllerCallPullIfNeeded(controller);535}536function ReadableByteStreamControllerRespondInClosedState(controller: ReadableByteStreamController,537 firstDescriptor: PullIntoDescriptor) {538 firstDescriptor.buffer = TransferArrayBuffer(firstDescriptor.buffer);539 assert(firstDescriptor.bytesFilled === 0);540 const stream = controller._controlledReadableByteStream;541 if (ReadableStreamHasBYOBReader(stream)) {542 while (ReadableStreamGetNumReadIntoRequests(stream) > 0) {543 const pullIntoDescriptor = ReadableByteStreamControllerShiftPendingPullInto(controller);544 ReadableByteStreamControllerCommitPullIntoDescriptor(stream, pullIntoDescriptor);545 }546 }547}548function ReadableByteStreamControllerRespondInReadableState(controller: ReadableByteStreamController,549 bytesWritten: number,550 pullIntoDescriptor: PullIntoDescriptor) {551 if (pullIntoDescriptor.bytesFilled + bytesWritten > pullIntoDescriptor.byteLength) {552 throw new RangeError('bytesWritten out of range');553 }554 ReadableByteStreamControllerFillHeadPullIntoDescriptor(controller, bytesWritten, pullIntoDescriptor);555 if (pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize) {556 // TODO: Figure out whether we should detach the buffer or not here.557 return;558 }559 ReadableByteStreamControllerShiftPendingPullInto(controller);560 const remainderSize = pullIntoDescriptor.bytesFilled % pullIntoDescriptor.elementSize;561 if (remainderSize > 0) {562 const end = pullIntoDescriptor.byteOffset + pullIntoDescriptor.bytesFilled;563 const remainder = pullIntoDescriptor.buffer.slice(end - remainderSize, end);564 ReadableByteStreamControllerEnqueueChunkToQueue(controller, remainder, 0, remainder.byteLength);565 }566 pullIntoDescriptor.buffer = TransferArrayBuffer(pullIntoDescriptor.buffer);567 pullIntoDescriptor.bytesFilled -= remainderSize;568 ReadableByteStreamControllerCommitPullIntoDescriptor(controller._controlledReadableByteStream, pullIntoDescriptor);...
readable_byte_stream_controller.ts
Source:readable_byte_stream_controller.ts
...330 ResetQueue(controller);331 ReadableByteStreamControllerClearAlgorithms(controller);332 ReadableStreamError(controller.controlledReadableByteStream, e);333}334export function ReadableByteStreamControllerFillHeadPullIntoDescriptor(335 controller: ReadableByteStreamController,336 size: number,337 pullIntoDescriptor: PullIntoDescriptor338) {339 Assert(340 controller.pendingPullIntos.length === 0 ||341 controller.pendingPullIntos[0] === pullIntoDescriptor342 );343 ReadableByteStreamControllerInvalidateBYOBRequest(controller);344 pullIntoDescriptor.bytesFilled += size;345}346export function ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(347 controller: ReadableByteStreamController,348 pullIntoDescriptor: PullIntoDescriptor349): boolean {350 const { elementSize } = pullIntoDescriptor;351 const currentAlignedBytes =352 pullIntoDescriptor.bytesFilled -353 (pullIntoDescriptor.bytesFilled % elementSize);354 const maxBytesToCopy = Math.min(355 controller.queueTotalSize,356 pullIntoDescriptor.byteLength - pullIntoDescriptor.bytesFilled357 );358 const maxBytesFilled = pullIntoDescriptor.bytesFilled + maxBytesToCopy;359 const maxAlignedBytes = maxBytesFilled - (maxBytesFilled % elementSize);360 let totalBytesToCopyRemaining = maxBytesToCopy;361 let ready = false;362 if (maxAlignedBytes > currentAlignedBytes) {363 totalBytesToCopyRemaining =364 maxAlignedBytes - pullIntoDescriptor.bytesFilled;365 ready = true;366 }367 const { queue } = controller;368 while (totalBytesToCopyRemaining > 0) {369 const headOfQueue = queue[0];370 const bytesToCopy = Math.min(371 totalBytesToCopyRemaining,372 headOfQueue.byteLength373 );374 const destStart =375 pullIntoDescriptor.byteOffset + pullIntoDescriptor.bytesFilled;376 const srcView = new Uint8Array(377 headOfQueue.buffer,378 headOfQueue.byteOffset,379 headOfQueue.byteLength380 );381 const destView = new Uint8Array(382 pullIntoDescriptor.buffer,383 destStart,384 bytesToCopy385 );386 for (let i = 0; i < bytesToCopy; i++) {387 destView[i] = srcView[i];388 }389 if (headOfQueue.byteLength === bytesToCopy) {390 queue.shift();391 } else {392 headOfQueue.byteOffset += bytesToCopy;393 headOfQueue.byteLength -= bytesToCopy;394 }395 controller.queueTotalSize -= bytesToCopy;396 ReadableByteStreamControllerFillHeadPullIntoDescriptor(397 controller,398 bytesToCopy,399 pullIntoDescriptor400 );401 totalBytesToCopyRemaining -= bytesToCopy;402 }403 if (ready === false) {404 Assert(controller.queueTotalSize === 0);405 Assert(pullIntoDescriptor.bytesFilled > 0);406 Assert(pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize);407 }408 return ready;409}410export function ReadableByteStreamControllerGetDesiredSize(411 controller: ReadableByteStreamController412): number | null {413 const stream = controller.controlledReadableByteStream;414 const { state } = stream;415 if (state === "errored") {416 return null;417 }418 if (state === "closed") {419 return 0;420 }421 return controller.strategyHWM - controller.queueTotalSize;422}423export function ReadableByteStreamControllerHandleQueueDrain(424 controller: ReadableByteStreamController425) {426 Assert(controller.controlledReadableByteStream.state === "readable");427 if (controller.queueTotalSize === 0 && controller.closeRequested) {428 ReadableByteStreamControllerClearAlgorithms(controller);429 ReadableStreamClose(controller.controlledReadableByteStream);430 } else {431 ReadableByteStreamControllerCallPullIfNeeded(controller);432 }433}434export function ReadableByteStreamControllerInvalidateBYOBRequest(435 controller: ReadableByteStreamController436) {437 if (controller._byobRequest === void 0) {438 return;439 }440 controller._byobRequest.associatedReadableByteStreamController = void 0;441 controller._byobRequest._view = void 0;442 controller._byobRequest = void 0;443}444export function ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(445 controller: ReadableByteStreamController446) {447 Assert(controller.closeRequested === false);448 while (controller.pendingPullIntos.length > 0) {449 if (controller.queueTotalSize === 0) {450 return;451 }452 const pullIntoDescriptor = controller.pendingPullIntos[0];453 if (454 ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(455 controller,456 pullIntoDescriptor457 ) === true458 ) {459 ReadableByteStreamControllerShiftPendingPullInto(controller);460 ReadableByteStreamControllerCommitPullIntoDescriptor(461 controller.controlledReadableByteStream,462 pullIntoDescriptor463 );464 }465 }466}467const TypedArraySizeMap = {468 Int8Array: [1, Int8Array],469 Uint8Array: [1, Uint8Array],470 Uint8ClampedArray: [1, Uint8ClampedArray],471 Int16Array: [2, Int16Array],472 Uint16Array: [2, Uint16Array],473 Int32Array: [4, Int32Array],474 Uint32Array: [4, Uint32Array],475 Float32Array: [4, Float32Array],476 Float64Array: [8, Float64Array]477};478export function ReadableByteStreamControllerPullInto(479 controller: ReadableByteStreamController,480 view: ArrayBufferView,481 forAuthorCode?: boolean482): Promise<any> {483 const stream = controller.controlledReadableByteStream;484 let elementSize = 1;485 let ctor = DataView;486 const ctorName = view.constructor.name;487 if (TypedArraySizeMap[ctorName]) {488 [elementSize, ctor] = TypedArraySizeMap[ctorName];489 }490 const { byteOffset, byteLength } = view;491 const buffer = TransferArrayBuffer(view.buffer);492 const pullIntoDescriptor: PullIntoDescriptor = {493 buffer,494 byteOffset,495 byteLength,496 bytesFilled: 0,497 elementSize,498 ctor,499 readerType: "byob"500 };501 if (controller.pendingPullIntos.length > 0) {502 controller.pendingPullIntos.push(pullIntoDescriptor);503 return ReadableStreamAddReadIntoRequest(stream, forAuthorCode);504 }505 if (stream.state === "closed") {506 const emptyView = new ctor(507 pullIntoDescriptor.buffer,508 pullIntoDescriptor.byteOffset,509 0510 );511 return Promise.resolve(512 ReadableStreamCreateReadResult(emptyView, true, forAuthorCode)513 );514 }515 if (controller.queueTotalSize > 0) {516 if (517 ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(518 controller,519 pullIntoDescriptor520 )521 ) {522 const filedView = ReadableByteStreamControllerConvertPullIntoDescriptor(523 pullIntoDescriptor524 );525 ReadableByteStreamControllerHandleQueueDrain(controller);526 return Promise.resolve(527 ReadableStreamCreateReadResult(filedView, false, forAuthorCode)528 );529 }530 if (controller.closeRequested) {531 const e = new TypeError();532 ReadableByteStreamControllerError(controller, e);533 return Promise.reject(e);534 }535 }536 controller.pendingPullIntos.push(pullIntoDescriptor);537 const promise = ReadableStreamAddReadIntoRequest(stream, forAuthorCode);538 ReadableByteStreamControllerCallPullIfNeeded(controller);539 return promise;540}541export function ReadableByteStreamControllerRespond(542 controller: ReadableByteStreamController,543 bytesWritten: number544): void {545 if (IsFiniteNonNegativeNumber(bytesWritten) === false) {546 throw new RangeError();547 }548 Assert(controller.pendingPullIntos.length > 0);549 ReadableByteStreamControllerRespondInternal(controller, bytesWritten);550}551export function ReadableByteStreamControllerRespondInClosedState(552 controller: ReadableByteStreamController,553 firstDescriptor: PullIntoDescriptor554) {555 firstDescriptor.buffer = TransferArrayBuffer(firstDescriptor.buffer);556 Assert(firstDescriptor.bytesFilled === 0);557 const stream = controller.controlledReadableByteStream;558 if (ReadableStreamHasBYOBReader(stream)) {559 while (ReadableStreamGetNumReadIntoRequests(stream) > 0) {560 const pullIntoDescriptor = ReadableByteStreamControllerShiftPendingPullInto(561 controller562 );563 ReadableByteStreamControllerCommitPullIntoDescriptor(564 stream,565 pullIntoDescriptor566 );567 }568 }569}570export function ReadableByteStreamControllerRespondInReadableState(571 controller: ReadableByteStreamController,572 bytesWritten: number,573 pullIntoDescriptor: PullIntoDescriptor574) {575 if (576 pullIntoDescriptor.bytesFilled + bytesWritten >577 pullIntoDescriptor.byteLength578 ) {579 throw new RangeError();580 }581 ReadableByteStreamControllerFillHeadPullIntoDescriptor(582 controller,583 bytesWritten,584 pullIntoDescriptor585 );586 if (pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize) {587 return;588 }589 ReadableByteStreamControllerShiftPendingPullInto(controller);590 const remainderSize =591 pullIntoDescriptor.bytesFilled % pullIntoDescriptor.elementSize;592 if (remainderSize > 0) {593 const end = pullIntoDescriptor.byteOffset + pullIntoDescriptor.bytesFilled;594 const remainder = CloneArrayBuffer(595 pullIntoDescriptor.buffer,...
Using AI Code Generation
1const { ReadableByteStreamController } = require('stream/web');2const { ReadableStream } = require('stream/web');3const { ReadableStreamBYOBReader } = require('stream/web');4const assert = require('assert');5const { test, assert_equals, assert_true } = require('testharness');6const { ReadableStreamDefaultController } = require('stream/web');7const { ReadableStreamBYOBRequest } = require('stream/web');8const { ReadableStreamDefaultReader } = require('stream/web');9const { ReadableStreamDefaultControllerClose } = require('stream/web');10const { ReadableStreamDefaultControllerEnqueue } = require('stream/web');11const { ReadableStreamDefaultControllerError } = require('stream/web');12const { ReadableStreamDefaultControllerGetDesiredSize } = require('stream/web');13const { ReadableStreamDefaultControllerHasBackpressure } = require('stream/web');14const { ReadableStreamBYOBRequestRespond } = require('stream/web');15const { ReadableStreamBYOBRequestRespondWithNewView } = require('stream/web');16const { ReadableByteStreamControllerClose } = require('stream/web');17const { ReadableByteStreamControllerEnqueue } = require('stream/web');18const { ReadableByteStreamControllerError } = require('stream/web');19const { ReadableByteStreamControllerFillHeadPullIntoDescriptor } = require('stream/web');20const { ReadableByteStreamControllerGetDesiredSize } = require('stream/web');21const { ReadableByteStreamControllerHandleQueueDrain } = require('stream/web');22const { ReadableByteStreamControllerInvalidateBYOBRequest } = require('stream/web');23const { ReadableByteStreamControllerShouldCallPull } = require('stream/web');24const { ReadableByteStreamControllerClearAlgorithms } = require('stream/web');25const { ReadableStreamTee } = require('stream/web');26const { ReadableStreamDefaultControllerCallPullIfNeeded } = require('stream/web');27const { ReadableStreamDefaultControllerShouldCallPull } = require('stream/web');28const { ReadableStreamDefaultControllerClearAlgorithms } = require('stream/web');29const { ReadableByteStreamControllerCallPullIfNeeded } = require('stream/web');30const { ReadableStreamBYOBRequestView } = require('stream/web');31const { ReadableStreamBYOBReaderRead } = require('stream/web');32const { ReadableStreamDefault
Using AI Code Generation
1var rs = new ReadableStream({2 start(c) {3 var desc = {4 buffer: new ArrayBuffer(16),5 };6 ReadableByteStreamControllerFillHeadPullIntoDescriptor(c, desc, 8);7 assert_equals(desc.bufferByteLength, 16);8 assert_equals(desc.bytesFilled, 8);9 assert_equals(desc.byteOffset, 0);10 assert_equals(desc.buffer.constructor, ArrayBuffer);11 assert_equals(desc.ctor, Uint8Array);12 assert_equals(desc.readerType, "default");13 }14});15rs.getReader().read().then(r => {16 assert_equals(r.value.constructor, Uint8Array);17 assert_equals(r.value.byteLength, 8);18});19var rs = new ReadableStream({20 start(c) {21 var desc1 = {22 buffer: new ArrayBuffer(16),23 };24 var desc2 = {25 buffer: new ArrayBuffer(16),26 };27 var desc3 = {28 buffer: new ArrayBuffer(16),29 };30 c._pendingPullIntos = [desc1, desc2, desc3];31 var desc = ReadableByteStreamControllerShiftPendingPullIntos(c);32 assert_equals(desc.bufferByteLength, 16);33 assert_equals(desc.bytesFilled, 0);34 assert_equals(desc.byteOffset, 0);35 assert_equals(desc.buffer.constructor, ArrayBuffer);36 assert_equals(desc.ctor, Uint8Array);37 assert_equals(desc.readerType, "default");
Using AI Code Generation
1test(() => {2 const rs = new ReadableStream({3 pull() {}4 });5 const reader = rs.getReader();6 const desc = {7 buffer: new ArrayBuffer(1),8 };9 const result = ReadableByteStreamControllerFillHeadPullIntoDescriptor(10 );11 assert_equals(result, false);12}, 'ReadableByteStreamControllerFillHeadPullIntoDescriptor should return false');13test(() => {14 const rs = new ReadableStream({15 pull() {}16 });17 const reader = rs.getReader();18 const desc = {19 buffer: new ArrayBuffer(1),20 };21 const result = ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(22 );23 assert_equals(result, 0);24}, 'ReadableByteStreamControllerFillPullIntoDescriptorFromQueue should return 0');25test(() => {26 const rs = new ReadableStream({27 pull() {}28 });29 const reader = rs.getReader();30 const result = ReadableByteStreamControllerHandleQueueDrain(31 );32 assert_equals(result, undefined);33}, 'ReadableByteStreamControllerHandleQueueDrain should return undefined');34test(() => {35 const rs = new ReadableStream({36 pull() {}37 });38 const reader = rs.getReader();39 const result = ReadableByteStreamControllerInvalidateBYOBRequest(
Using AI Code Generation
1var rs = new ReadableStream({2 pull: function(c) {3 var pullIntoDescriptor = c.byobRequest.desiredSize;4 var result = c.fillHeadPullIntoDescriptor(pullIntoDescriptor);5 }6});7var reader = rs.getReader({ mode: 'byob' });8var view = new Uint8Array(10);9reader.read(view).then(function(result) {10});11> + JS::Rooted<JSObject*> view(cx, byobRequest->view());12> + uint32_t viewByteLength = 0; 13> + uint32_t viewByteOffset = 0;14> + JS::Rooted<JSObject*> viewBuffer(cx, nullptr);
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!