Best JavaScript code snippet using qawolf
blobAccess.js
Source:blobAccess.js
1/*2Class for searching and modifying blob files.3Blob files are used to store large amounts of immutable data, profile pictures in this case4File allocates space in 128 byte "chunks", data will usually require many chunks5File headers point to the next free chunk, and the first bytes of a free chunk in a contiguous area of free chunks will contain the size of the free area and address of the next free chunk6Headers:7Next free chunk (8 bytes)|Total chunks used (8 bytes)8- Total chunks used still includes chunks that have been deallocated9Free chunk format (only first free chunk in a free area):10Area is allocated?* (1 byte)|Number of free chunks in area (8 bytes)|First chunk address of next free area (8 bytes)11 *Contains 1 if deallocated12Entry format:13Area is allocated?* (1 byte)|Number of chunks allocated (8 bytes)|Actual length of data (8 bytes)|data (variable length)14 *Contains 2 if allocated15Blob files are not designed to be searchable, as they are just large pools of space where other classes can store their data- it is up to the classes to remember where in the blob file they put it16*/17const fs = require('fs');18const path = require('path');19const { PassThrough } = require('stream');20class blobAccess{21 static createBlob(filePath, overwrite=false){ // If there is already a blob file at the given path, and overwrite is true the file will be recreated22 // WARNING: This method is not asynchronous23 let fileMustBeCreated = overwrite;24 try{25 // Check that we have both read and write access to the file26 fs.accessSync(filePath, fs.constants.R_OK | fs.constants.W_OK);27 }28 catch{29 fileMustBeCreated = true;30 }31 if (fileMustBeCreated === true){32 // Create directory33 try{34 // If the path given directly in a root directory, this will throw an error. But the error won't matter as the directory does not need to be created anyway35 fs.mkdirSync(path.dirname(filePath), {recursive: true});36 }37 catch{}38 // Create the file with the headers containing 039 fs.writeFileSync(filePath, Buffer.alloc(16));40 }41 }42 static getReadableStream(filePath, position){43 // Return a stream to read the data in the entry at the given position44 return new Promise((resolve, reject) => {45 // First get the amount of data to read46 fs.open(filePath, "r", (err, descriptor) => {47 if (err) reject(err);48 else{49 fs.read(descriptor, {position: position + 9, length: 8, buffer: Buffer.alloc(8)}, (err, bytesRead, data) => {50 if (err){51 fs.close(descriptor, e => {52 if (e) reject(e);53 else reject(err);54 });55 }56 else{57 try{58 let dataLength = Number(data.readBigInt64BE());59 let stream = fs.createReadStream(filePath, {fd: descriptor, start: position + 17, end: position + 17 + dataLength - 1});60 // Add attribute to stream to show the amount of data to be read61 stream.maxAllowedReadLength = dataLength;62 resolve(stream);63 }64 catch(reason){65 fs.close(descriptor, e => {66 if (e) reject(e);67 else reject(reason);68 });69 };70 }71 });72 }73 });74 });75 }76 static getWritableStream(filePath, position){77 // Set up a writable stream to write to the file78 return new Promise((resolve, reject) => {79 // First get the maximum size of the allocated space so we can stop the stream if it tries to write more than allowed80 fs.open(filePath, "r+", (err, descriptor) => {81 if (err) reject(err);82 else{83 fs.read(descriptor, {position: position + 1, length: 8, buffer: Buffer.alloc(8)}, (err, bytesRead, data) => {84 if (err){85 fs.close(descriptor, e => {86 if (e) reject(e);87 else reject(err);88 });89 }90 else{91 try{92 let maxLength = Number(data.readBigInt64BE()) * 128;93 let stream = fs.createWriteStream(filePath, {start: position + 17, flags: "r+"});94 // Create a PassThrough stream to monitor the data to close the stream if too much data is sent95 let monitorStream = PassThrough();96 // Create totalLifetimeBytesWritten attribute of monitorStream to record total bytes sent through stream97 monitorStream.totalLifetimeBytesWritten = 0;98 // Create maxAllowedWriteLength attribute of monitorStream to show how much data can be written99 monitorStream.maxAllowedWriteLength = maxLength;100 monitorStream.on("data", chunk => {101 monitorStream.totalLifetimeBytesWritten += chunk.length;102 if (maxLength <= monitorStream.totalLifetimeBytesWritten){103 // Producer attempted to write too much data, so close the stream104 monitorStream.end();105 }106 });107 // Listen on stream close to update length field in blob108 monitorStream.on("close", () => {109 let lengthWritten = Buffer.alloc(8);110 lengthWritten.writeBigInt64BE(BigInt(monitorStream.totalLifetimeBytesWritten));111 fs.write(descriptor, lengthWritten, 0, 8, position + 9, (err) => {112 stream.end();113 fs.close(descriptor, e => {114 if (e) throw e;115 else if (err) throw err;116 });117 });118 });119 // Connect monitorStream to stream120 monitorStream.pipe(stream);121 resolve(monitorStream);122 }123 catch(reason){124 fs.close(descriptor, e => {125 if (e) reject(e);126 else reject(reason);127 });128 }129 }130 });131 }132 });133 });134 }135 static allocate(filePath, amount){136 // Create an entry with enough chunks to hold given amount, and return the start position of the entry (to create new entries this can be used followed by writeToEntry)137 return new Promise((resolve, reject) => {138 fs.open(filePath, "r+", (err, descriptor) => {139 if (err) reject(err);140 else{141 // Calculate the number of 128 byte chunks needed to store the data and its metadata142 let chunksNeeded = Math.ceil((amount + 17) / 128);143 // Read headers144 fs.read(descriptor, {position: 0, length: 16, buffer: Buffer.alloc(16)}, (err, bytesRead, data) => {145 if (err){146 fs.close(descriptor, e => {147 if (e) reject(e);148 else reject(err);149 });150 }151 else{152 // A free area contains one or more free chunks153 let freeAreaPointerAddr = 0; // The address of the nextFreeArea pointer itself154 let nextFreeArea = Number(data.readBigInt64BE(0));155 let chunksUsed = Number(data.readBigInt64BE(8));156 let freeAreaSize; // Size of the current free area in chunks157 // Search the chain of free areas until we either find one big enough or run out158 let findFreeArea = (err, bytesRead, data) => {159 if (err){160 fs.close(descriptor, e => {161 if (e) reject(e);162 else reject(err);163 });164 }165 else{166 freeAreaSize = Number(data.readBigInt64BE(1));167 let thisFreeArea = nextFreeArea;168 nextFreeArea = Number(data.readBigInt64BE(9));169 if (freeAreaSize == chunksNeeded){170 // We have found a free area of exactly the right size171 // Update nextFreeArea pointer to point to next free area172 let rawBytes = Buffer.alloc(17);173 rawBytes.writeBigInt64BE(BigInt(nextFreeArea), 1);174 rawBytes.writeBigInt64BE(0n, 9);175 fs.write(descriptor, rawBytes, 1, 8, freeAreaPointerAddr, err => {176 if (err){177 fs.close(descriptor, e => {178 if (e) reject(e);179 else reject(err);180 });181 }182 else{183 // Now update the newly allocated area with the correct details184 rawBytes.writeInt8(2); // Set first byte to 2 to show that this space is allocated185 rawBytes.writeBigInt64BE(BigInt(chunksNeeded), 1);186 rawBytes.writeBigInt64BE(0n, 9); // Set actual bytes used to 0 as the space has just been allocated but nothing has been written yet187 fs.write(descriptor, rawBytes, 0, 17, thisFreeArea, err => {188 if (err){189 fs.close(descriptor, e => {190 if (e) reject(e);191 else reject(err);192 });193 }194 else{195 // Return the allocated address196 fs.close(descriptor, e => {197 if (e) reject(e);198 else resolve(thisFreeArea);199 });200 }201 });202 }203 });204 }205 else if (chunksNeeded < freeAreaSize){206 // Separate the extra chunks in this free area into a new free area207 let newFreeArea = thisFreeArea + (chunksNeeded * 128);208 let newAreaDetails = Buffer.alloc(17);209 newAreaDetails.writeBigInt64BE(BigInt(freeAreaSize - chunksNeeded), 1);210 newAreaDetails.writeBigInt64BE(BigInt(nextFreeArea), 9);211 fs.write(descriptor, newAreaDetails, 0, 17, newFreeArea, err => {212 if (err){213 fs.close(descriptor, e => {214 if (e) reject(e);215 else reject(err);216 });217 }218 else{219 // Update nextFreeArea pointer to point to this new free area220 let newFreeAreaRaw = Buffer.alloc(8);221 newFreeAreaRaw.writeBigInt64BE(BigInt(newFreeArea));222 fs.write(descriptor, newFreeAreaRaw, 0, 8, freeAreaPointerAddr, err => {223 if (err){224 fs.close(descriptor, e => {225 if (e) reject(e);226 else reject(err);227 });228 }229 else{230 // Update the newly allocated area with the correct size231 let allocatedDetailsRaw = Buffer.alloc(17);232 allocatedDetailsRaw.writeInt8(2); // Set first byte to 2 to show that chunk is allocated233 allocatedDetailsRaw.writeBigInt64BE(BigInt(chunksNeeded), 1);234 allocatedDetailsRaw.writeBigInt64BE(0n, 9);235 fs.write(descriptor, allocatedDetailsRaw, 0, 17, thisFreeArea, err => {236 if (err){237 if (e) reject(e);238 else reject(err);239 }240 else{241 // Return the address of the newly allocated area242 fs.close(descriptor, e => {243 if (e) reject(e);244 else resolve(thisFreeArea);245 });246 }247 });248 }249 });250 }251 });252 }253 else{254 // Try the next free area (if there is one)255 if (nextFreeArea === 0){256 // There isn't another free area, so we need to write to the end of the file257 let newSpaceAddress = 16 + (chunksUsed * 128) // End of file = space used by headers + space used by all chunks258 // Write the size of the new free area259 let rawBytes = Buffer.alloc(17);260 rawBytes.writeInt8(2); // Set first byte to 2 to show that space is allocated261 rawBytes.writeBigInt64BE(BigInt(chunksNeeded), 1);262 rawBytes.writeBigInt64BE(0n, 9); // Set actual bytes used to 0 as the space has just been allocated but nothing has been written yet263 fs.write(descriptor, rawBytes, 0, 17, newSpaceAddress, err => {264 if (err){265 fs.close(descriptor, e => {266 if (e) reject(e);267 else reject(err);268 });269 }270 else{271 // Now update the chunksUsed header272 rawBytes.writeBigInt64BE(BigInt(chunksUsed + chunksNeeded));273 fs.write(descriptor, rawBytes, 0, 8, 8, err => {274 if (err){275 fs.close(descriptor, e => {276 if (e) reject(e);277 else reject(err);278 })279 }280 else{281 // Return the address of the newly allocated space282 fs.close(descriptor, e => {283 if (e) reject(e);284 else resolve(newSpaceAddress);285 });286 }287 });288 }289 });290 }291 else{292 // Try next free area293 freeAreaPointerAddr = thisFreeArea + 9;294 fs.read(descriptor, {position: nextFreeArea, length: 17, buffer: Buffer.alloc(17)}, findFreeArea);295 }296 }297 }298 };299 if (nextFreeArea === 0){300 // There isn't another free area, so we need to write to the end of the file301 let newSpaceAddress = 16 + (chunksUsed * 128) // End of file = space used by headers + space used by all chunks302 // Write the size of the new free area303 let rawBytes = Buffer.alloc(17);304 rawBytes.writeInt8(2);305 rawBytes.writeBigInt64BE(BigInt(chunksNeeded), 1);306 rawBytes.writeBigInt64BE(0n, 9);307 fs.write(descriptor, rawBytes, 0, 17, newSpaceAddress, err => {308 if (err){309 fs.close(descriptor, e => {310 if (e) reject(e);311 else reject(err);312 });313 }314 else{315 // Now update the chunksUsed header316 rawBytes.writeBigInt64BE(BigInt(chunksUsed + chunksNeeded));317 fs.write(descriptor, rawBytes, 0, 8, 8, err => {318 if (err){319 fs.close(descriptor, e => {320 if (e) reject(e);321 else reject(err);322 })323 }324 else{325 // Return the address of the newly allocated space326 fs.close(descriptor, e => {327 if (e) reject(e);328 else resolve(newSpaceAddress);329 });330 }331 });332 }333 });334 }335 else{336 fs.read(descriptor, {position: nextFreeArea, length: 17, buffer: Buffer.alloc(17)}, findFreeArea);337 }338 }339 340 });341 }342 });343 344 });345 }346 static deallocate(filePath, position){347 // Deallocate the entry at the given position348 return new Promise((resolve, reject) => {349 fs.open(filePath, "r+", (err, descriptor) => {350 if (err) reject(err);351 else{352 fs.read(descriptor, {position: 0, buffer: Buffer.alloc(8), length: 8}, (err, bytesRead, data) => {353 if (err){354 fs.close(descriptor, e => {355 if (e) reject(e);356 else reject(err);357 });358 }359 else{360 let nextFreeArea = data.readBigInt64BE();361 // Read details of the area to be deallocated362 fs.read(descriptor, {position: position, buffer: Buffer.alloc(17), length: 17}, (err, bytesRead, data) => {363 if (err){364 fs.close(descriptor, e => {365 if (e) reject(e);366 else reject(err);367 });368 }369 else{370 let freedAreaSize = Number(data.readBigInt64BE(1));371 let writeChunkDetails = () => {372 // Write the details of the new space to its first 17 bytes and finish373 let detailsRaw = Buffer.alloc(17);374 detailsRaw.writeInt8(1); // Set first byte to 1 to show space is deallocated375 detailsRaw.writeBigInt64BE(BigInt(freedAreaSize), 1); // Write size of newly freed area to bytes 1 - 9376 detailsRaw.writeBigInt64BE(BigInt(nextFreeArea), 9); // Write address of next free area to bytes 9 - 17377 fs.write(descriptor, detailsRaw, 0, 17, position, err => {378 if (err){379 fs.close(descriptor, e => {380 if (e) reject(e);381 else reject(err);382 });383 }384 else{385 fs.close(descriptor, e => {386 if (e) reject(e);387 else resolve(true);388 });389 }390 });391 };392 // Find out if next area is also free393 fs.read(descriptor, {position: position + (freedAreaSize * 128), length: 17, buffer: Buffer.alloc(17)}, (err, bytesRead, data) => {394 if (err){395 fs.close(descriptor, e => {396 if (e) reject(e);397 else reject(err);398 });399 }400 else{401 if (data.readInt8(0) == 1){402 // The next space is also deallocated so merge the newly freed one with it403 nextFreeArea = data.readBigInt64BE(9);404 let nextFreeAreaSize = Number(data.readBigInt64BE(1));405 // Find the nextFree pointer that points to this area and update it with the position of the newly freed block406 let pointerAddress = 0;407 let findPointer = (err, bytesRead, data) => {408 if (err){409 fs.close(descriptor, e => {410 if (e) reject(e);411 else reject(err);412 });413 }414 else{415 if (data.readBigInt64BE() == position + (freedAreaSize * 128)){416 // Update the pointer to point to newly freed space instead417 let rawBytes = Buffer.alloc(8);418 rawBytes.writeBigInt64BE(BigInt(position));419 fs.write(descriptor, rawBytes, 0, 8, pointerAddress, err => {420 if (err){421 fs.close(descriptor, e => {422 if (e) reject(e);423 else reject(err);424 });425 }426 else{427 freedAreaSize += nextFreeAreaSize; // Increase size field of newly freed area as it now also includes the free area following it428 writeChunkDetails();429 }430 });431 }432 else{433 pointerAddress = Number(data.readBigInt64BE()) + 1;434 fs.read(descriptor, {position: pointerAddress, length: 8, buffer: Buffer.alloc(8)}, findPointer);435 }436 }437 };438 fs.read(descriptor, {position: 0, length: 8, buffer: Buffer.alloc(8)}, findPointer);439 440 }441 else{442 // Write newly freed position to nextFree header443 let positionRaw = Buffer.alloc(8);444 positionRaw.writeBigInt64BE(BigInt(position));445 fs.write(descriptor, positionRaw, 0, 8, 0, err => {446 if (err){447 fs.close(descriptor, e => {448 if (e) reject(e);449 else reject(err);450 });451 }452 else{453 // Finally write the correct details to the newly deallocated space454 writeChunkDetails();455 }456 });457 }458 }459 });460 }461 });462 }463 });464 }465 });466 });467 }468}...
monitor_runner.ts
Source:monitor_runner.ts
1// -*- mode: typescript; indent-tabs-mode: nil; js-basic-offset: 4 -*-2//3// This file is part of Genie4//5// Copyright 2020 The Board of Trustees of the Leland Stanford Junior University6//7// Licensed under the Apache License, Version 2.0 (the "License");8// you may not use this file except in compliance with the License.9// You may obtain a copy of the License at10//11// http://www.apache.org/licenses/LICENSE-2.012//13// Unless required by applicable law or agreed to in writing, software14// distributed under the License is distributed on an "AS IS" BASIS,15// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.16// See the License for the specific language governing permissions and17// limitations under the License.18//19// Author: Giovanni Campagna <gcampagn@cs.stanford.edu>20import * as Tp from 'thingpedia';21import { Runtime } from 'thingtalk';22import type * as stream from 'stream';23import AsyncQueue from 'consumer-queue';24import RateLimiter from '../util/rate_limiter';25import * as Protocol from '../sync/protocol';26import { ChannelStateBinder } from './channel_state_binder';27import type ExecWrapper from './exec_wrapper';28import type DeviceView from '../devices/device_view';29function extendParams(output : Record<string, unknown>, input : Record<string, unknown>) {30 for (const key in input) {31 if (Object.prototype.hasOwnProperty.call(output, key))32 continue;33 output[key] = input[key];34 }35}36type MonitorStream = stream.Readable & {37 destroy() : void;38 uniqueId ?: string;39 device ?: Tp.BaseDevice;40}41type SubscribeFunction = (params : Record<string, unknown>, state : ChannelStateBinder, hints : Runtime.CompiledQueryHints, env : ExecWrapper) => MonitorStream;42type MonitorEvent = Record<string, unknown> & { __timestamp : number };43export default class MonitorRunner {44 private _env : ExecWrapper;45 private _devices : DeviceView;46 private _channel : string;47 private _fn : string;48 private _params : Record<string, unknown>;49 private _hints : Runtime.CompiledQueryHints;50 private _rateLimiter : RateLimiter;51 private _streams : Map<Tp.BaseDevice, MonitorStream>;52 private _ended : Set<MonitorStream>;53 private _stopped : boolean;54 private _queue : AsyncQueue<IteratorResult<[string, MonitorEvent], void>>;55 private _dataListener : (this : MonitorStream, data : MonitorEvent) => void;56 private _endListener : (this : MonitorStream) => void;57 private _errorListener : (this : MonitorStream, error : Error) => void;58 constructor(env : ExecWrapper,59 devices : DeviceView,60 channel : string,61 params : Record<string, unknown>,62 hints : Runtime.CompiledQueryHints) {63 this._env = env;64 this._channel = channel;65 this._fn = 'subscribe_' + channel;66 this._params = params;67 this._hints = hints;68 // rate limit to 1 per second, with a burst of 30069 this._rateLimiter = new RateLimiter(300, 300 * 1000);70 const self = this;71 this._dataListener = function(data) {72 const from = this;73 self._onTriggerData(from, data);74 };75 this._endListener = function() {76 const from = this;77 self._onTriggerEnd(from);78 };79 this._errorListener = function(error) {80 const from = this;81 self._onTriggerError(from, error);82 };83 this._devices = devices;84 this._streams = new Map; // from device to the corresponding stream85 this._ended = new Set;86 this._stopped = false;87 this._queue = new AsyncQueue();88 }89 next() {90 return this._queue.pop();91 }92 private _onTriggerError(from : MonitorStream, error : Error) {93 this._env.reportError('Trigger ' + from.uniqueId + ' reported an error', error);94 }95 private _onTriggerData(from : MonitorStream, data : MonitorEvent) {96 if (this._stopped)97 return;98 if (!this._rateLimiter.hit())99 return;100 console.log('Handling incoming data on ' + from.uniqueId);101 const outputType = from.device!.kind + ':' + this._channel;102 if (data.__timestamp === undefined) {103 console.log('WARNING: missing timestamp on data from ' + from.uniqueId);104 const now = Date.now();105 data.__timestamp = now;106 }107 if (from.device!.uniqueId !== from.device!.kind)108 data.__device = new Tp.Value.Entity(from.device!.uniqueId!, from.device!.name);109 extendParams(data, this._params);110 this._queue.push({ done: false, value: [outputType, data] });111 }112 private _onTriggerEnd(from : MonitorStream) {113 console.log('Handling trigger end from ' + from.uniqueId);114 if (this._stopped)115 return;116 this._ended.add(from);117 if (this._ended.size === this._streams.size) {118 this._stopped = true;119 this._queue.push({ done: true, value: undefined });120 }121 }122 private _onDeviceAdded(device : Tp.BaseDevice) {123 const uniqueId = device.uniqueId + ':' + this._channel + ':' + Protocol.params.makeString(this._params);124 Promise.resolve().then(() => {125 const state = new ChannelStateBinder(this._env.engine.db, uniqueId);126 // TODO deduplicate subscriptions globally127 // (this needs to be done at a different level because we need to128 // do global common subexpression elimination to save history)129 return state.open().then(() => {130 if (this._stopped)131 return;132 const stream = (device as unknown as Record<string, SubscribeFunction>)[this._fn](this._params, state, this._hints, this._env);133 this._streams.set(device, stream);134 stream.uniqueId = uniqueId; // for debugging only135 stream.device = device;136 stream.on('error', this._errorListener);137 stream.on('end', this._endListener);138 stream.on('data', this._dataListener);139 });140 }).catch((e) => {141 this._env.reportError('Failed to initialize trigger ' + uniqueId, e);142 });143 }144 private _onDeviceRemoved(device : Tp.BaseDevice) {145 const stream = this._streams.get(device);146 if (!stream)147 return;148 this._streams.delete(device);149 stream.destroy();150 }151 end() {152 if (this._stopped)153 return;154 this._stopped = true;155 this._queue.push({ done: true, value: undefined });156 }157 stop() {158 this._stopped = true;159 this._devices.stop();160 for (const stream of this._streams.values())161 stream.destroy();162 }163 start() {164 this._devices.on('object-added', this._onDeviceAdded.bind(this));165 this._devices.on('object-removed', this._onDeviceRemoved.bind(this));166 this._devices.start();167 }...
monitoring.ts
Source:monitoring.ts
1/********************************************************************************2 Copyright (c) 2022 EclipseSource and others.3 This program and the accompanying materials are made available under the4 terms of the Eclipse Public License v. 2.0 which is available at5 http://www.eclipse.org/legal/epl-2.0.6 This Source Code may also be made available under the following Secondary7 Licenses when the conditions for such availability set forth in the Eclipse8 Public License v. 2.0 are satisfied: GNU General Public License, version 29 with the GNU Classpath Exception which is available at10 https://www.gnu.org/software/classpath/license.html.11 SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.012********************************************************************************/13import type { Port__Output as Port } from 'arduino-cli_proto_ts/common/cc/arduino/cli/commands/v1/Port'14import type { Device } from 'deployment-server'15import { ClientDuplexStream } from '@grpc/grpc-js'16import { Duplex, pipeline, Transform } from 'stream'17import { ConnectedDevices } from './store'18import { DeviceStatus } from '../util/common'19import { setTimeout } from 'timers/promises'20import { logger } from '../util/logger'21import { openDeployStream } from '../deployment-server/connection'22import { StreamingOpenRequest } from 'arduino-cli_proto_ts/common/cc/arduino/cli/monitor/v1/StreamingOpenRequest'23import { StreamingOpenResponse__Output as StreamingOpenResponse } from 'arduino-cli_proto_ts/common/cc/arduino/cli/monitor/v1/StreamingOpenResponse'24import { monitor } from '../arduino-cli/monitor'25import { once } from 'events'26export interface MonitorData {27 device: Device28 timeout?: number29}30export class DeviceMonitor {31 #isConnected: boolean = false32 #monitorStream: ClientDuplexStream<StreamingOpenRequest, StreamingOpenResponse> | undefined33 #serverStream: Duplex | undefined34 #deploymentId: string | undefined35 readonly port: Port36 constructor (port: Port) {37 this.port = port38 }39 async start (deploymentId: string, sec: number): Promise<void> {40 if (this.isPaused() && this.#deploymentId === deploymentId) {41 return this.resume()42 }43 // Check if deploymentId is different to last monitor request44 if (this.#deploymentId !== deploymentId) {45 this.#deploymentId = deploymentId46 // Destroy outdated stream to server47 if (this.#serverStream != null) {48 this.#serverStream.end()49 this.#serverStream.unpipe()50 await setTimeout(200)51 this.#serverStream.destroy()52 await once(this.#serverStream, 'close')53 this.#serverStream = undefined54 }55 }56 // Open deployment stream to server57 if (this.#serverStream == null || !this.#serverStream.writable) {58 this.#serverStream = await openDeployStream(this.#deploymentId)59 }60 // Create monitor stream to device61 this.#monitorStream = await monitor(this.port)62 try {63 await this.pipeToSocket()64 } catch (e) {65 logger.error(e)66 await this.stop()67 }68 this.#isConnected = true69 await setTimeout(sec * 1000)70 await this.stop()71 }72 async stop (): Promise<void> {73 if (!this.#isConnected) {74 return75 }76 // End monitor stream and unpipe77 if (this.#monitorStream != null) {78 this.#monitorStream.end()79 this.#monitorStream.unpipe()80 await setTimeout(200)81 this.#monitorStream.destroy()82 // Set monitor stream back to undefined83 this.#monitorStream = undefined84 }85 this.#isConnected = false86 // End server stream and unpipe87 if (this.#serverStream != null) {88 this.#serverStream.end()89 this.#serverStream.unpipe()90 monitorResponseTransform.unpipe(this.#serverStream)91 await setTimeout(200)92 this.#serverStream.destroy()93 await once(this.#serverStream, 'close')94 // Set server stream back to undefined95 this.#serverStream = undefined96 }97 // Wait 5 seconds to ensure that the device also closed the stream98 await setTimeout(5000)99 try {100 const device = ConnectedDevices.onPort(this.port.address, this.port.protocol)101 await device.updateStatus(DeviceStatus.RUNNING)102 } catch (e) {103 logger.error(e)104 }105 }106 pause (): void {107 this.#monitorStream?.pause()108 }109 resume (): void {110 this.#monitorStream?.resume()111 }112 isPaused (): boolean {113 if (this.#monitorStream != null) {114 return this.#monitorStream.isPaused()115 }116 return false117 }118 isMonitoring (): boolean {119 return this.#isConnected && !this.isPaused()120 }121 async pipeToSocket (): Promise<Duplex> {122 if (this.#monitorStream == null) {123 throw new Error(`No valid data stream to pipe from device on port ${this.port.address} (${this.port.protocol})`)124 }125 if (this.#serverStream == null) {126 throw new Error(`No valid deployment stream to server for device on port ${this.port.address} (${this.port.protocol})`)127 }128 return pipeline(this.#monitorStream, monitorResponseTransform, this.#serverStream, (error) => {129 if (error != null) {130 logger.error(error)131 }132 })133 }134 unpipe (): void {135 if (this.#monitorStream != null) {136 this.#monitorStream.unpipe()137 }138 }139 get isConnected (): boolean {140 return this.#isConnected141 }142}143const monitorResponseTransform = new Transform({144 writableObjectMode: true,145 transform (chunk: StreamingOpenResponse, encoding, callback) {146 const { data, dropped } = chunk147 if (dropped > 0) {148 logger.debug(`Dropped ${dropped} bytes during monitoring`)149 }150 callback(null, data)151 }...
91-osmonitor.js
Source:91-osmonitor.js
1module.exports = function (RED) {2 "use strict";3 var request = require('request');4 var moment = require('moment');5 var url = require("url");6 function testing(node, n) {7 const API_URL = `${process.env.MOCK_DATA_SOURCE}/reading/latest`;8 const options = {9 method: 'post',10 body: { sensor_id: n.subtype },11 json: true,12 url: API_URL,13 }14 const periodic = setInterval(() => {15 request(options, (err, res, body) => {16 if (err) {17 console.log(err, 'error posting json')18 } else {19 if (body.length > 0) {20 const result = body[0];21 if (result.length > 0) {22 const { time, value } = result[0];23 node.send({24 name: n.name || "osmonitor",25 id: n.id,26 subtype: n.subtype,27 type: "osmonitor",28 payload: {29 ts: moment.utc(time).unix(),30 value: Number(value),31 },32 });33 }34 }35 }36 });37 }, 3000);38 node.on("close", () => {39 console.log(`${node.id} stopping requests`);40 clearInterval(periodic);41 });42 }43 function OSMonitor(n) {44 console.log("creating new os monitor node");45 RED.nodes.createNode(this, n);46 if (process.env.TESTING) {47 console.log("running in test mode!");48 return testing(this, n);49 }50 const databox = require('node-databox');51 var periodic;52 this.name = n.name;53 const node = this;54 55 const cb = (data) => {56 console.log("nice, seen data");57 console.log(JSON.parse(data.data).data);58 const tosend = {59 name: n.name || "osmonitor",60 id: n.id,61 subtype: n.subtype,62 type: "osmonitor",63 payload: {64 ts: Date.now(),65 value: JSON.parse(data.data).data,66 }67 }68 node.send(tosend);69 }70 71 const hcatobj = JSON.parse(process.env[`DATASOURCE_${n.id}`]);72 console.log("getting hypercat to source data metadata", hcatobj);73 const monitorStream = databox.HypercatToSourceDataMetadata(hcatobj);74 console.log("have monitorstream", monitorStream);75 console.log("store url is ", hcatobj.href);76 console.log("arbiter endpoint",process.env['DATABOX_ARBITER_ENDPOINT']);77 const store = databox.NewStoreClient(hcatobj.href, process.env['DATABOX_ARBITER_ENDPOINT'], true);78 console.log("registering with", monitorStream.DataSourceID);79 store.TSBlob.Observe(monitorStream.DataSourceID).then((emitter) => {80 console.log("now have emitter!");81 this.emitter = emitter;82 emitter.on('data', cb);83 emitter.on('error', (err) => {84 console.warn(err);85 });86 }).catch((err) => {87 console.log(err);88 console.warn("Error Observing ", monitorStream.DataSourceID, " ", err);89 });90 this.on("close", () => {91 console.log(`${node.id} stopping requests`);92 this.emitter.removeListener("data", cb);93 clearInterval(periodic);94 });95 }96 // Register the node by name. This must be called beforeoverriding any of the97 // Node functions.98 RED.nodes.registerType("osmonitor", OSMonitor);...
index.js
Source:index.js
1import WebpackerReact from 'webpacker-react'2import TweetEmbed from 'react-tweet-embed'3// React components4import { QSContainer } from './qs/QSContainer';5import { MturkQSContainer } from './qs/MturkQSContainer';6import { LocalBatchQSContainer } from './qs/LocalBatchQSContainer';7import { SentimentTextBox } from './sent_textbox/SentimentTextBox';8import { MonitorStream } from './monitor_stream/MonitorStream';9import { Leadline } from './frontpage/Leadline';10import { UserActivity } from './user_activity/UserActivity';11import { EditQuestionSequence } from './edit_question_sequence/EditQuestionSequence';12import { Assignment } from './mturk_worker/Assignment';13import { StreamGraph } from './stream_graph/StreamGraph';14import { StreamGraphKeywords } from './stream_graph_keywords/StreamGraphKeywords';15import { MlModels } from './ml_models/MlModels';16import { MlPlayground } from './ml_playground/MlPlayground';17import { PredictViz } from './predict_viz/PredictViz';18import { DownloadResource } from './helpers/DownloadResource';19// Register components using Webpacker-react20WebpackerReact.setup({21 QSContainer,22 MturkQSContainer,23 SentimentTextBox,24 MonitorStream,25 Leadline,26 UserActivity,27 EditQuestionSequence,28 LocalBatchQSContainer,29 TweetEmbed,30 Assignment,31 StreamGraph,32 StreamGraphKeywords,33 MlModels,34 MlPlayground,35 PredictViz,36 DownloadResource37})38// This is needed for components to properly unmount and not being cached...
Using AI Code Generation
1const qawolf = require("qawolf");2const { monitorStream } = require("qawolf");3const { chromium } = require("playwright");4(async () => {5 const browser = await chromium.launch();6 const context = await browser.newContext();7 const page = await context.newPage();8 await page.fill("input[name=q]", "hello world");9 await page.press("input[name=q]", "Enter");10 await page.waitForLoadState("load");11 await page.waitForSelector(".g");12 await qawolf.stopVideos();13 await browser.close();14})();15const qawolf = require("qawolf");16const { monitorStream } = require("qawolf");17const { chromium } = require("playwright");18(async () => {19 const browser = await chromium.launch();20 const context = await browser.newContext();21 const page = await context.newPage();22 await page.fill("input[name=q]", "hello world");23 await page.press("input[name=q]", "Enter");24 await page.waitForLoadState("load");25 await page.waitForSelector(".g");26 await qawolf.stopVideos();27 await browser.close();28})();29const qawolf = require("qawolf");30const { monitorStream } = require("qawolf");31const { chromium } = require("playwright");32(async () => {33 const browser = await chromium.launch();34 const context = await browser.newContext();35 const page = await context.newPage();36 await page.fill("input[name=q]", "hello world");37 await page.press("input[name=q]", "Enter");38 await page.waitForLoadState("load");39 await page.waitForSelector(".g");40 await qawolf.stopVideos();41 await browser.close();42})();43const qawolf = require("qawolf");44const { monitorStream } = require("qawolf");45const { chromium } = require("playwright");46(async () => {47 const browser = await chromium.launch();48 const context = await browser.newContext();49 const page = await context.newPage();
Using AI Code Generation
1const { monitorStream } = require('qawolf');2const fs = require('fs');3(async () => {4 const browser = await qawolf.launch();5 const context = await browser.newContext();6 const page = await context.newPage();7 await page.fill('input[name="q"]', 'Hello World!');8 await page.press('input[name="q"]', 'Enter');9 await browser.close();10})();11I believe the issue is that you are missing the import for qawolf. You can add it with:12const qawolf = require('qawolf');
Using AI Code Generation
1const qawolf = require("qawolf");2const { launch } = require("qawolf");3const { devices } = require("playwright");4const iPhone = devices["iPhone 11 Pro"];5async function test() {6 const context = await browser.newContext({ ...iPhone });7 const page = await context.newPage();8 await qawolf.monitorStream(page);9 await page.click("input[name=q]");10 await page.fill("input[name=q]", "hello world");11 await page.press("input[name=q]", "Enter");12 await qawolf.stopVideos();13 await browser.close();14}15test();16{17 "scripts": {18 },19 "devDependencies": {20 }21}22await page.click("input[name=q]", { device: iPhone });23await page.click("input[aria-label='Search']");24await page.click("input[name=q]", { device: iPhone });
Using AI Code Generation
1const qawolf = require('qawolf');2const { monitorStream } = require('qawolf');3const browser = await qawolf.launch();4const context = await browser.newContext();5const page = await context.newPage();6await page.fill('[name="q"]', 'qawolf');7await page.press('[name="q"]', 'Enter');8await page.waitForLoadState();9await monitorStream(page, (stream) => {10stream.on('data', (data) => {11console.log(data);12});13});14await qawolf.stopVideos();
Using AI Code Generation
1const { monitorStream } = require("qawolf");2(async () => {3 const stream = await monitorStream();4 await stream.stop();5})();6const { monitorStream } = require("qawolf");7(async () => {8 const stream = await monitorStream();9 await stream.stop();10})();11const { monitorStream } = require("qawolf");12(async () => {13 const stream = await monitorStream();14 await stream.stop();15})();16const { monitorStream } = require("qawolf");17(async () => {18 const stream = await monitorStream();19 await stream.stop();20})();21const { monitorStream } = require("qawolf");22(async () => {23 const stream = await monitorStream();24 await stream.stop();25})();26const { monitorStream } = require("qawolf");27(async () => {28 const stream = await monitorStream();29 await stream.stop();30})();31const { monitorStream } = require("qawolf");32(async () => {33 const stream = await monitorStream();34 await stream.stop();35})();36const { monitorStream } = require("qawolf");37(async () => {38 const stream = await monitorStream();39 await stream.stop();40})();41const { monitorStream } = require("qawolf");42(async () => {43 const stream = await monitorStream();44 await stream.stop();45})();46const { monitorStream } = require("qawolf");
Using AI Code Generation
1const qawolf = require("qawolf");2const { monitorStream } = qawolf;3const fs = require("fs");4const readableStream = fs.createReadStream("test.mp4");5const writableStream = fs.createWriteStream("test1.mp4");6monitorStream(readableStream, writableStream, {7 onProgress: (bytes, totalBytes, percentage) => {8 console.log("bytes", bytes);9 console.log("totalBytes", totalBytes);10 console.log("percentage", percentage);11 },12 onError: (err) => {13 console.log("err", err);14 },15 onFinish: () => {16 console.log("done");17 },18});19const qawolf = require("qawolf");20const { monitorStream } = qawolf;21const fs = require("fs");22const readableStream = fs.createReadStream("test.mp4");23const writableStream = fs.createWriteStream("test1.mp4");24monitorStream(readableStream, writableStream, {25 onProgress: (bytes, totalBytes, percentage) => {26 console.log("bytes", bytes);27 console.log("totalBytes", totalBytes);28 console.log("percentage", percentage);29 },30 onError: (err) => {31 console.log("err", err);32 },33 onFinish: () => {34 console.log("done");35 },36});37const qawolf = require("qawolf");38const { monitorStream } = qawolf;39const fs = require("fs");40const readableStream = fs.createReadStream("test.mp4");41const writableStream = fs.createWriteStream("test1.mp4");42monitorStream(readableStream, writableStream, {43 onProgress: (bytes, totalBytes, percentage) => {44 console.log("bytes", bytes);45 console.log("totalBytes", totalBytes);46 console.log("percentage", percentage);47 },48 onError: (err) =>
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!