How to use waitForMessages method in wpt

Best JavaScript code snippet using wpt

triggers-feature.js

Source:triggers-feature.js Github

copy

Full Screen

1"use strict";2const {start, route, stop} = require("../..");3const {crd, reject} = require("../helpers/queue-helper");4const {setupWaiter} = require("../helpers/internal-helpers");5const jobStorage = require("../../lib/job-storage");6function handler() {7 return {type: "i-was-here", id: "my-guid"};8}9Feature("Triggers", () => {10 afterEachScenario(stop);11 const source = {12 type: "order",13 id: "some-id",14 meta: {correlationId: "some-correlation-id"},15 attributes: {baz: true}16 };17 function trigger() {18 return {19 type: "trigger",20 id: "event.some-name",21 source,22 meta: {23 correlationId: "some-correlation-id"24 }25 };26 }27 function triggerMultiple(incomingSource) {28 const triggers = [];29 for (let i = 0; i < incomingSource.numToTrigger; i++) {30 triggers.push({31 type: "trigger",32 id: "event.some-name",33 source: {...incomingSource, index: i},34 meta: {35 correlationId: "some-correlation-id"36 }37 });38 }39 return triggers;40 }41 function triggerNothing() {42 return;43 }44 function triggerWithCorrelationId() {45 return {46 type: "trigger",47 id: "event.some-name",48 source,49 correlationId: "some-other-correlation-id"50 };51 }52 function triggerAsync() {53 return new Promise((resolve) => {54 return resolve({55 type: "trigger",56 id: "event.some-name",57 source,58 meta: {59 correlationId: "some-correlation-id"60 }61 });62 });63 }64 beforeEachScenario(() => {65 jobStorage.reset();66 });67 Scenario("Trigger a flow with a trigger message", () => {68 before(() => {69 crd.resetMock();70 start({71 triggers: {72 "trigger.some-generic-name": trigger73 },74 recipes: [75 {76 namespace: "event",77 name: "some-name",78 sequence: [route(".perform.one", handler)]79 }80 ]81 });82 });83 let flowMessages, waitForMessages;84 Given("we are listening for messages on the event namespace", () => {85 flowMessages = crd.subscribe("event.#");86 waitForMessages = setupWaiter("event.some-name.processed");87 });88 When("we publish an order on a trigger key", async () => {89 await crd.publishMessage("trigger.some-generic-name", source);90 await waitForMessages;91 });92 And("the flow should be completed", () => {93 flowMessages.length.should.eql(2);94 const {msg, key} = flowMessages.pop();95 key.should.eql("event.some-name.processed");96 msg.should.eql({97 type: "event",98 id: msg.id,99 data: [100 {101 type: "i-was-here",102 id: "my-guid",103 occurredAt: msg.data[0].occurredAt,104 key: "event.some-name.perform.one"105 }106 ],107 source,108 meta: {109 correlationId: "some-correlation-id"110 }111 });112 });113 });114 Scenario("Trigger a flow with a trigger message, async trigger", () => {115 before(() => {116 crd.resetMock();117 start({118 triggers: {119 "trigger.some-generic-name": triggerAsync120 },121 recipes: [122 {123 namespace: "event",124 name: "some-name",125 sequence: [route(".perform.one", handler)]126 }127 ]128 });129 });130 let flowMessages, waitForMessages;131 Given("we are listening for messages on the event namespace", () => {132 flowMessages = crd.subscribe("event.#");133 waitForMessages = setupWaiter("event.some-name.processed");134 });135 When("we publish an order on a trigger key", async () => {136 await crd.publishMessage("trigger.some-generic-name", source);137 await waitForMessages;138 });139 And("the flow should be completed", () => {140 flowMessages.length.should.eql(2);141 const {msg, key} = flowMessages.pop();142 key.should.eql("event.some-name.processed");143 msg.should.eql({144 type: "event",145 id: msg.id,146 data: [147 {148 type: "i-was-here",149 id: "my-guid",150 occurredAt: msg.data[0].occurredAt,151 key: "event.some-name.perform.one"152 }153 ],154 source,155 meta: {156 correlationId: "some-correlation-id"157 }158 });159 });160 });161 Scenario("Trigger a flow with a trigger message, spawn multiple sequences (2)", () => {162 before(() => {163 crd.resetMock();164 start({165 triggers: {166 "trigger.some-generic-name": triggerMultiple167 },168 recipes: [169 {170 namespace: "event",171 name: "some-name",172 sequence: [route(".perform.one", handler)]173 }174 ]175 });176 });177 let flowMessages, waitForMessages;178 Given("we are listening for messages on the event namespace", () => {179 flowMessages = crd.subscribe("event.#");180 waitForMessages = setupWaiter("event.some-name.processed");181 });182 When("we publish an order on a trigger key", async () => {183 await crd.publishMessage("trigger.some-generic-name", {...source, numToTrigger: 2});184 await waitForMessages;185 });186 And("the flow should be completed", () => {187 flowMessages.length.should.eql(4);188 const idxs = flowMessages.filter(({key}) => key === "event.some-name.processed").map(({msg}) => msg.source.index);189 idxs.sort();190 idxs.should.eql([0, 1]);191 });192 });193 Scenario("Trigger a flow with a trigger message, spawn no sequences", () => {194 before(() => {195 crd.resetMock();196 reject.resetMock();197 start({198 triggers: {199 "trigger.some-generic-name": triggerMultiple200 },201 recipes: [202 {203 namespace: "event",204 name: "some-name",205 sequence: [route(".perform.one", handler)]206 }207 ]208 });209 });210 let flowMessages;211 let rejectMessages;212 Given("we are listening for messages on the event namespace", () => {213 flowMessages = crd.subscribe("event.#");214 rejectMessages = reject.subscribe("#");215 });216 When("we publish an order on a trigger key", async () => {217 await crd.publishMessage("trigger.some-generic-name", {...source, numToTrigger: 0});218 });219 And("the flow should be completed", () => {220 flowMessages.length.should.eql(0);221 });222 And("nothing should be rejected", () => {223 rejectMessages.length.should.eql(0);224 });225 });226 Scenario("Trigger a flow with a trigger message, spawn no sequences #2", () => {227 before(() => {228 crd.resetMock();229 reject.resetMock();230 start({231 triggers: {232 "trigger.some-generic-name": triggerNothing233 },234 recipes: [235 {236 namespace: "event",237 name: "some-name",238 sequence: [route(".perform.one", handler)]239 }240 ]241 });242 });243 let flowMessages;244 let rejectMessages;245 Given("we are listening for messages on the event namespace", () => {246 flowMessages = crd.subscribe("event.#");247 rejectMessages = reject.subscribe("#");248 });249 When("we publish an order on a trigger key", async () => {250 await crd.publishMessage("trigger.some-generic-name", {...source, numToTrigger: 0});251 });252 And("the flow should be completed", () => {253 flowMessages.length.should.eql(0);254 });255 And("nothing should be rejected", () => {256 rejectMessages.length.should.eql(0);257 });258 });259 Scenario("Trigger a flow with a parent correlation id", () => {260 before(() => {261 crd.resetMock();262 start({263 triggers: {264 "trigger.some-other-generic-name": triggerWithCorrelationId265 },266 recipes: [267 {268 namespace: "event",269 name: "some-name",270 sequence: [route(".perform.one", handler)]271 }272 ]273 });274 });275 let flowMessages, waitForMessages;276 Given("we are listening for messages on the event namespace", () => {277 flowMessages = crd.subscribe("event.#");278 waitForMessages = setupWaiter("event.some-name.processed");279 });280 When("we publish an order on a trigger key", async () => {281 await crd.publishMessage("trigger.some-other-generic-name", source);282 await waitForMessages;283 });284 And("the flow should be completed", () => {285 flowMessages.length.should.eql(2);286 const {msg, key} = flowMessages.pop();287 key.should.eql("event.some-name.processed");288 msg.should.eql({289 type: "event",290 id: msg.id,291 data: [292 {293 type: "i-was-here",294 id: "my-guid",295 occurredAt: msg.data[0].occurredAt,296 key: "event.some-name.perform.one"297 }298 ],299 source,300 meta: {301 correlationId: "some-other-correlation-id",302 parentCorrelationId: "some-correlation-id"303 }304 });305 });306 });307 Scenario("Trigger a flow with a sequence generic parent correlation id and notifyProcessed", () => {308 before(() => {309 crd.resetMock();310 start({311 recipes: [312 {313 namespace: "event",314 name: "some-name",315 sequence: [route(".perform.one", handler)],316 useParentCorrelationId: true317 }318 ]319 });320 });321 let flowMessages, waitForMessages;322 Given("we are listening for messages on the event namespace", () => {323 flowMessages = crd.subscribe("event.#");324 waitForMessages = setupWaiter("event.some-name.processed");325 });326 When("we publish an order on a trigger key", async () => {327 await crd.publishWithMeta("trigger.event.some-name", source, {328 headers: {"x-notify-processed": true}329 });330 await waitForMessages;331 });332 And("the flow should be completed", () => {333 flowMessages.length.should.eql(2);334 const {msg, key} = flowMessages.pop();335 key.should.eql("event.some-name.processed");336 const newCorrId = msg.meta.correlationId.split(":")[1];337 newCorrId.should.be.a.uuid("v4");338 msg.should.eql({339 type: "event",340 id: msg.id,341 data: [342 {343 type: "i-was-here",344 id: "my-guid",345 occurredAt: msg.data[0].occurredAt,346 key: "event.some-name.perform.one"347 }348 ],349 source: {id: source.id, type: source.type, attributes: source.attributes, meta: source.meta},350 meta: {351 correlationId: `some-correlation-id:${newCorrId}`,352 parentCorrelationId: "some-correlation-id",353 notifyProcessed: true354 }355 });356 });357 });358 Scenario("Trigger a flow with a global use generic parent correlation id", () => {359 before(() => {360 crd.resetMock();361 start({362 recipes: [363 {364 namespace: "event",365 name: "some-name",366 sequence: [route(".perform.one", handler)]367 }368 ],369 useParentCorrelationId: true370 });371 });372 let flowMessages, waitForMessages;373 Given("we are listening for messages on the event namespace", () => {374 flowMessages = crd.subscribe("event.#");375 waitForMessages = setupWaiter("event.some-name.processed");376 });377 When("we publish an order on a trigger key", async () => {378 await crd.publishMessage("trigger.event.some-name", source);379 await waitForMessages;380 });381 And("the flow should be completed", () => {382 flowMessages.length.should.eql(2);383 const {msg, key} = flowMessages.pop();384 key.should.eql("event.some-name.processed");385 const newCorrId = msg.meta.correlationId.split(":")[1];386 newCorrId.should.be.a.uuid("v4");387 msg.should.eql({388 type: "event",389 id: msg.id,390 data: [391 {392 type: "i-was-here",393 id: "my-guid",394 occurredAt: msg.data[0].occurredAt,395 key: "event.some-name.perform.one"396 }397 ],398 source: {id: source.id, type: source.type, attributes: source.attributes, meta: source.meta},399 meta: {400 correlationId: `some-correlation-id:${newCorrId}`,401 parentCorrelationId: "some-correlation-id"402 }403 });404 });405 });406 Scenario("Trigger a flow without a generic parent correlation id", () => {407 before(() => {408 crd.resetMock();409 start({410 recipes: [411 {412 namespace: "event",413 name: "some-name",414 sequence: [route(".perform.one", handler)]415 }416 ]417 });418 });419 let flowMessages, waitForMessages;420 Given("we are listening for messages on the event namespace", () => {421 flowMessages = crd.subscribe("event.#");422 waitForMessages = setupWaiter("event.some-name.processed");423 });424 When("we publish an order on a trigger key", async () => {425 await crd.publishMessage("trigger.event.some-name", source);426 await waitForMessages;427 });428 And("the flow should be completed", () => {429 flowMessages.length.should.eql(2);430 const {msg, key} = flowMessages.pop();431 key.should.eql("event.some-name.processed");432 msg.should.eql({433 type: "event",434 id: msg.id,435 data: [436 {437 type: "i-was-here",438 id: "my-guid",439 occurredAt: msg.data[0].occurredAt,440 key: "event.some-name.perform.one"441 }442 ],443 source: {id: source.id, type: source.type, attributes: source.attributes, meta: source.meta},444 meta: {445 correlationId: `some-correlation-id`446 }447 });448 });449 });450 Scenario("Trigger a flow without a generic parent correlation id but parent correlation id in header", () => {451 before(() => {452 crd.resetMock();453 start({454 recipes: [455 {456 namespace: "event",457 name: "some-name",458 sequence: [route(".perform.one", handler)]459 }460 ]461 });462 });463 let flowMessages, waitForMessages;464 Given("we are listening for messages on the event namespace", () => {465 flowMessages = crd.subscribe("event.#");466 waitForMessages = setupWaiter("event.some-name.processed");467 });468 When("we publish an order on a trigger key", async () => {469 await crd.publishWithMeta("trigger.event.some-name", source, {470 headers: {"x-parent-correlation-id": "this-is-parent"}471 });472 await waitForMessages;473 });474 And("the flow should be completed", () => {475 flowMessages.length.should.eql(2);476 const {msg, key} = flowMessages.pop();477 key.should.eql("event.some-name.processed");478 msg.should.eql({479 type: "event",480 id: msg.id,481 data: [482 {483 type: "i-was-here",484 id: "my-guid",485 occurredAt: msg.data[0].occurredAt,486 key: "event.some-name.perform.one"487 }488 ],489 source: {id: source.id, type: source.type, attributes: source.attributes, meta: source.meta},490 meta: {491 correlationId: `some-correlation-id`,492 parentCorrelationId: "this-is-parent"493 }494 });495 });496 });497 Scenario("Trigger a flow by returning a trigger message from handler", () => {498 before(() => {499 crd.resetMock();500 start({501 recipes: [502 {503 namespace: "event",504 name: "some-other-name",505 sequence: [route(".perform.one", trigger)]506 },507 {508 namespace: "event",509 name: "some-name",510 sequence: [route(".perform.one", handler)]511 }512 ]513 });514 });515 let flowMessages;516 let secondFlowMessages;517 let triggerMessages;518 let waitForMessages;519 Given("we are listening for messages on the event namespace", () => {520 flowMessages = crd.subscribe("event.some-other-name.#");521 secondFlowMessages = crd.subscribe("event.some-name.#");522 waitForMessages = setupWaiter("event.some-other-name.processed");523 });524 When("we publish an order on the other events a trigger key", async () => {525 triggerMessages = crd.subscribe("trigger.#");526 await crd.publishMessage("trigger.event.some-other-name", source);527 await waitForMessages;528 });529 And("the flow should be completed", () => {530 flowMessages.length.should.eql(2);531 const {msg, key} = flowMessages[1];532 key.should.eql("event.some-other-name.processed");533 msg.should.eql({534 type: "event",535 id: msg.id,536 data: [537 {538 type: "trigger",539 id: "event.some-name",540 occurredAt: msg.data[0].occurredAt,541 key: "event.some-other-name.perform.one"542 }543 ],544 source: {id: source.id, type: source.type, attributes: source.attributes, meta: source.meta},545 meta: {546 correlationId: "some-correlation-id"547 }548 });549 });550 And("there should be an internal message", () => {551 triggerMessages.length.should.eql(2);552 const {msg, key} = triggerMessages.pop();553 key.should.eql("trigger.event.some-name");554 msg.should.eql({555 ...source556 });557 });558 And("the other flow should triggered and be completed", () => {559 secondFlowMessages.length.should.eql(2);560 const {msg, key} = secondFlowMessages.pop();561 key.should.eql("event.some-name.processed");562 msg.should.eql({563 type: "event",564 id: msg.id,565 data: [566 {567 type: "i-was-here",568 id: "my-guid",569 occurredAt: msg.data[0].occurredAt,570 key: "event.some-name.perform.one"571 }572 ],573 source: {id: source.id, type: source.type, attributes: source.attributes, meta: source.meta},574 meta: {575 correlationId: "some-correlation-id:0",576 notifyProcessed: "event.some-other-name.perform.one:some-correlation-id",577 parentCorrelationId: "some-correlation-id"578 }579 });580 });581 });...

Full Screen

Full Screen

channel_api.js

Source:channel_api.js Github

copy

Full Screen

...151 return ready.promise;152}153// Return a promise that resolves when the queue has at least `num`154// messages. If num is not supplied its assumed to be 1.155function waitForMessages(q, num) {156 var min = (num === undefined) ? 1 : num;157 return waitForQueue(q, function(qok) {158 return qok.messageCount >= min;159 });160}161suite("sendMessage", function() {162// publish different size messages163chtest("send to queue and get from queue", function(ch) {164 var q = 'test.send-to-q';165 var msg = randomString();166 return doAll(167 ch.assertQueue(q, QUEUE_OPTS),168 ch.purgeQueue(q))169 .then(function() {170 ch.sendToQueue(q, new Buffer(msg));171 return waitForMessages(q);})172 .then(function() {173 return ch.get(q, {noAck: true});})174 .then(function(m) {175 assert(m);176 assert.equal(msg, m.content.toString());177 });178});179chtest("send (and get) zero content to queue", function(ch) {180 var q = 'test.send-to-q';181 var msg = new Buffer(0);182 return doAll(183 ch.assertQueue(q, QUEUE_OPTS),184 ch.purgeQueue(q))185 .then(function() {186 ch.sendToQueue(q, msg);187 return waitForMessages(q);})188 .then(function() {189 return ch.get(q, {noAck: true});})190 .then(function(m) {191 assert(m);192 assert.deepEqual(msg, m.content);193 });194});195});196suite("binding, consuming", function() {197// bind, publish, get198chtest("route message", function(ch) {199 var ex = 'test.route-message';200 var q = 'test.route-message-q';201 var msg = randomString();202 return doAll(203 ch.assertExchange(ex, 'fanout', EX_OPTS),204 ch.assertQueue(q, QUEUE_OPTS),205 ch.purgeQueue(q),206 ch.bindQueue(q, ex, '', {}))207 .then(function() {208 ch.publish(ex, '', new Buffer(msg));209 return waitForMessages(q);})210 .then(function() {211 return ch.get(q, {noAck: true});})212 .then(function(m) {213 assert(m);214 assert.equal(msg, m.content.toString());215 });216});217// send to queue, purge, get-empty218chtest("purge queue", function(ch) {219 var q = 'test.purge-queue';220 return ch.assertQueue(q, {durable: false})221 .then(function() {222 ch.sendToQueue(q, new Buffer('foobar'));223 return waitForMessages(q);})224 .then(function() {225 ch.purgeQueue(q);226 return ch.get(q, {noAck: true});})227 .then(function(m) {228 assert(!m); // get-empty229 });230});231// bind again, unbind, publish, get-empty232chtest("unbind queue", function(ch) {233 var ex = 'test.unbind-queue-ex';234 var q = 'test.unbind-queue';235 var viabinding = randomString();236 var direct = randomString();237 return doAll(238 ch.assertExchange(ex, 'fanout', EX_OPTS),239 ch.assertQueue(q, QUEUE_OPTS),240 ch.purgeQueue(q),241 ch.bindQueue(q, ex, '', {}))242 .then(function() {243 ch.publish(ex, '', new Buffer('foobar'));244 return waitForMessages(q);})245 .then(function() { // message got through!246 return ch.get(q, {noAck:true})247 .then(function(m) {assert(m);});})248 .then(function() {249 return ch.unbindQueue(q, ex, '', {});})250 .then(function() {251 // via the no-longer-existing binding252 ch.publish(ex, '', new Buffer(viabinding));253 // direct to the queue254 ch.sendToQueue(q, new Buffer(direct));255 return waitForMessages(q);})256 .then(function() {return ch.get(q)})257 .then(function(m) {258 // the direct to queue message got through, the via-binding259 // message (sent first) did not260 assert.equal(direct, m.content.toString());261 });262});263// To some extent this is now just testing semantics of the server,264// but we can at least try out a few settings, and consume.265chtest("consume via exchange-exchange binding", function(ch) {266 var ex1 = 'test.ex-ex-binding1', ex2 = 'test.ex-ex-binding2';267 var q = 'test.ex-ex-binding-q';268 var rk = 'test.routing.key', msg = randomString();269 return doAll(270 ch.assertExchange(ex1, 'direct', EX_OPTS),271 ch.assertExchange(ex2, 'fanout',272 {durable: false, internal: true}),273 ch.assertQueue(q, QUEUE_OPTS),274 ch.purgeQueue(q),275 ch.bindExchange(ex2, ex1, rk, {}),276 ch.bindQueue(q, ex2, '', {}))277 .then(function() {278 var arrived = defer();279 function delivery(m) {280 if (m.content.toString() === msg) arrived.resolve();281 else arrived.reject(new Error("Wrong message"));282 }283 ch.consume(q, delivery, {noAck: true})284 .then(function() {285 ch.publish(ex1, rk, new Buffer(msg));286 });287 return arrived.promise;288 });289});290// bind again, unbind, publish, get-empty291chtest("unbind exchange", function(ch) {292 var source = 'test.unbind-ex-source';293 var dest = 'test.unbind-ex-dest';294 var q = 'test.unbind-ex-queue';295 var viabinding = randomString();296 var direct = randomString();297 return doAll(298 ch.assertExchange(source, 'fanout', EX_OPTS),299 ch.assertExchange(dest, 'fanout', EX_OPTS),300 ch.assertQueue(q, QUEUE_OPTS),301 ch.purgeQueue(q),302 ch.bindExchange(dest, source, '', {}),303 ch.bindQueue(q, dest, '', {}))304 .then(function() {305 ch.publish(source, '', new Buffer('foobar'));306 return waitForMessages(q);})307 .then(function() { // message got through!308 return ch.get(q, {noAck:true})309 .then(function(m) {assert(m);});})310 .then(function() {311 return ch.unbindExchange(dest, source, '', {});})312 .then(function() {313 // via the no-longer-existing binding314 ch.publish(source, '', new Buffer(viabinding));315 // direct to the queue316 ch.sendToQueue(q, new Buffer(direct));317 return waitForMessages(q);})318 .then(function() {return ch.get(q)})319 .then(function(m) {320 // the direct to queue message got through, the via-binding321 // message (sent first) did not322 assert.equal(direct, m.content.toString());323 });324});325// This is a bit convoluted. Sorry.326chtest("cancel consumer", function(ch) {327 var q = 'test.consumer-cancel';328 var recv1 = defer();329 var ctag;330 doAll(331 ch.assertQueue(q, QUEUE_OPTS),332 ch.purgeQueue(q),333 // My callback is 'resolve the promise in `arrived`'334 ch.consume(q, function() { recv1.resolve(); }, {noAck:true})335 .then(function(ok) {336 ctag = ok.consumerTag;337 ch.sendToQueue(q, new Buffer('foo'));338 }));339 // A message should arrive because of the consume340 return recv1.promise.then(function() {341 // replace the promise resolved by the consume callback342 recv1 = defer();343 344 return doAll(345 ch.cancel(ctag).then(function() {346 ch.sendToQueue(q, new Buffer('bar'));347 }),348 // but check a message did arrive in the queue349 waitForMessages(q))350 .then(function() {351 ch.get(q, {noAck:true})352 .then(function(m) {353 // I'm going to reject it, because I flip succeed/fail354 // just below355 if (m.content.toString() === 'bar')356 recv1.reject();357 });358 return expectFail(recv1.promise);359 // i.e., fail on delivery, succeed on get-ok360 });361 });362});363chtest("cancelled consumer", function(ch) {364 var q = 'test.cancelled-consumer';365 var nullRecv = defer();366 367 doAll(368 ch.assertQueue(q),369 ch.purgeQueue(q),370 ch.consume(q, function(msg) {371 if (msg === null) nullRecv.resolve();372 else nullRecv.reject(new Error('Message not expected'));373 }))374 .then(function() {375 ch.deleteQueue(q);376 });377 return nullRecv.promise;378});379// ack, by default, removes a single message from the queue380chtest("ack", function(ch) {381 var q = 'test.ack';382 var msg1 = randomString(), msg2 = randomString();383 return doAll(384 ch.assertQueue(q, QUEUE_OPTS),385 ch.purgeQueue(q))386 .then(function() {387 ch.sendToQueue(q, new Buffer(msg1));388 ch.sendToQueue(q, new Buffer(msg2));389 return waitForMessages(q, 2);390 })391 .then(function() {392 return ch.get(q, {noAck: false})393 })394 .then(function(m) {395 assert.equal(msg1, m.content.toString());396 ch.ack(m);397 // %%% is there a race here? may depend on398 // rabbitmq-sepcific semantics399 return ch.get(q);400 })401 .then(function(m) {402 assert(m);403 assert.equal(msg2, m.content.toString());404 });405});406// Nack, by default, puts a message back on the queue (where in the407// queue is up to the server)408chtest("nack", function(ch) {409 var q = 'test.nack';410 var msg1 = randomString();411 return doAll(412 ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q))413 .then(function() {414 ch.sendToQueue(q, new Buffer(msg1));415 return waitForMessages(q);})416 .then(function() {417 return ch.get(q, {noAck: false})})418 .then(function(m) {419 assert.equal(msg1, m.content.toString());420 ch.nack(m);421 return waitForMessages(q);})422 .then(function() {423 return ch.get(q);})424 .then(function(m) {425 assert(m);426 assert.equal(msg1, m.content.toString());427 });428});429// reject is a near-synonym for nack, the latter of which is not430// available in earlier RabbitMQ (or in AMQP proper).431chtest("reject", function(ch) {432 var q = 'test.reject';433 var msg1 = randomString();434 return doAll(435 ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q))436 .then(function() {437 ch.sendToQueue(q, new Buffer(msg1));438 return waitForMessages(q);})439 .then(function() {440 return ch.get(q, {noAck: false})})441 .then(function(m) {442 assert.equal(msg1, m.content.toString());443 ch.reject(m);444 return waitForMessages(q);})445 .then(function() {446 return ch.get(q);})447 .then(function(m) {448 assert(m);449 assert.equal(msg1, m.content.toString());450 });451});452chtest("prefetch", function(ch) {453 var q = 'test.prefetch';454 return doAll(455 ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q),456 ch.prefetch(1))457 .then(function() {458 ch.sendToQueue(q, new Buffer('foobar'));459 ch.sendToQueue(q, new Buffer('foobar'));460 return waitForMessages(q, 2);461 })462 .then(function() {463 var first = defer();464 return doAll(465 ch.consume(q, function(m) {466 first.resolve(m);467 }, {noAck: false}),468 first.promise.then(function(m) {469 first = defer();470 ch.ack(m);471 return first.promise.then(function(m) {472 ch.ack(m);473 })474 }));...

Full Screen

Full Screen

ws.test.ts

Source:ws.test.ts Github

copy

Full Screen

...62 while (client.readyState !== state) {63 await sleep(10);64 }65}66async function waitForMessages(messages: any[], cnt: number): Promise<void> {67 while (messages.length < cnt) {68 await sleep(10);69 }70}71async function createSocketClient(): Promise<[WebSocket, any[]]> {72 const client = new WebSocket(`ws://localhost:${port}/ws`);73 await waitForSocketState(client, client.OPEN);74 const messages: any[] = [];75 client.on("message", (data) => {76 messages.push(JSON.parse(data.toString()));77 });78 return [client, messages];79}80beforeAll(async () => {81 priceMetadata = dummyPriceMetadata(0, 0, 0);82 priceInfos = [83 dummyPriceInfo(expandTo64Len("abcd"), "a1b2c3d4", priceMetadata),84 dummyPriceInfo(expandTo64Len("ef01"), "a1b2c3d4", priceMetadata),85 dummyPriceInfo(expandTo64Len("2345"), "bad01bad", priceMetadata),86 dummyPriceInfo(expandTo64Len("6789"), "bidbidbid", priceMetadata),87 ];88 let priceInfo: PriceStore = {89 getLatestPriceInfo: (_priceFeedId: string) => undefined,90 addUpdateListener: (_callback: (priceInfo: PriceInfo) => any) => undefined,91 getPriceIds: () =>92 new Set(priceInfos.map((priceInfo) => priceInfo.priceFeed.id)),93 };94 api = new WebSocketAPI(priceInfo);95 server = new Server();96 server.listen(port);97 wss = api.run(server);98});99afterAll(async () => {100 wss.close();101 server.close();102});103describe("Client receives data", () => {104 test("When subscribes with valid ids without verbose flag, returns correct price feed", async () => {105 let [client, serverMessages] = await createSocketClient();106 let message: ClientMessage = {107 ids: [priceInfos[0].priceFeed.id, priceInfos[1].priceFeed.id],108 type: "subscribe",109 };110 client.send(JSON.stringify(message));111 await waitForMessages(serverMessages, 1);112 expect(serverMessages[0]).toStrictEqual({113 type: "response",114 status: "success",115 });116 api.dispatchPriceFeedUpdate(priceInfos[0]);117 await waitForMessages(serverMessages, 2);118 expect(serverMessages[1]).toEqual({119 type: "price_update",120 price_feed: priceInfos[0].priceFeed.toJson(),121 });122 api.dispatchPriceFeedUpdate(priceInfos[1]);123 await waitForMessages(serverMessages, 3);124 expect(serverMessages[2]).toEqual({125 type: "price_update",126 price_feed: priceInfos[1].priceFeed.toJson(),127 });128 client.close();129 await waitForSocketState(client, client.CLOSED);130 });131 test("When subscribes with valid ids and verbose flag set to true, returns correct price feed with metadata", async () => {132 let [client, serverMessages] = await createSocketClient();133 let message: ClientMessage = {134 ids: [priceInfos[0].priceFeed.id, priceInfos[1].priceFeed.id],135 type: "subscribe",136 verbose: true,137 };138 client.send(JSON.stringify(message));139 await waitForMessages(serverMessages, 1);140 expect(serverMessages[0]).toStrictEqual({141 type: "response",142 status: "success",143 });144 api.dispatchPriceFeedUpdate(priceInfos[0]);145 await waitForMessages(serverMessages, 2);146 expect(serverMessages[1]).toEqual({147 type: "price_update",148 price_feed: {149 ...priceInfos[0].priceFeed.toJson(),150 metadata: priceMetadata,151 },152 });153 api.dispatchPriceFeedUpdate(priceInfos[1]);154 await waitForMessages(serverMessages, 3);155 expect(serverMessages[2]).toEqual({156 type: "price_update",157 price_feed: {158 ...priceInfos[1].priceFeed.toJson(),159 metadata: priceMetadata,160 },161 });162 client.close();163 await waitForSocketState(client, client.CLOSED);164 });165 test("When subscribes with valid ids and verbose flag set to false, returns correct price feed without metadata", async () => {166 let [client, serverMessages] = await createSocketClient();167 let message: ClientMessage = {168 ids: [priceInfos[0].priceFeed.id, priceInfos[1].priceFeed.id],169 type: "subscribe",170 verbose: false,171 };172 client.send(JSON.stringify(message));173 await waitForMessages(serverMessages, 1);174 expect(serverMessages[0]).toStrictEqual({175 type: "response",176 status: "success",177 });178 api.dispatchPriceFeedUpdate(priceInfos[0]);179 await waitForMessages(serverMessages, 2);180 expect(serverMessages[1]).toEqual({181 type: "price_update",182 price_feed: priceInfos[0].priceFeed.toJson(),183 });184 api.dispatchPriceFeedUpdate(priceInfos[1]);185 await waitForMessages(serverMessages, 3);186 expect(serverMessages[2]).toEqual({187 type: "price_update",188 price_feed: priceInfos[1].priceFeed.toJson(),189 });190 client.close();191 await waitForSocketState(client, client.CLOSED);192 });193 test("When subscribes with invalid ids, returns error", async () => {194 let [client, serverMessages] = await createSocketClient();195 let message: ClientMessage = {196 ids: [expandTo64Len("aaaa")],197 type: "subscribe",198 };199 client.send(JSON.stringify(message));200 await waitForMessages(serverMessages, 1);201 expect(serverMessages.length).toBe(1);202 expect(serverMessages[0].type).toBe("response");203 expect(serverMessages[0].status).toBe("error");204 client.close();205 await waitForSocketState(client, client.CLOSED);206 });207 test("When subscribes for Price Feed A, doesn't receive updates for Price Feed B", async () => {208 let [client, serverMessages] = await createSocketClient();209 let message: ClientMessage = {210 ids: [priceInfos[0].priceFeed.id],211 type: "subscribe",212 };213 client.send(JSON.stringify(message));214 await waitForMessages(serverMessages, 1);215 expect(serverMessages[0]).toStrictEqual({216 type: "response",217 status: "success",218 });219 api.dispatchPriceFeedUpdate(priceInfos[1]);220 await sleep(100);221 api.dispatchPriceFeedUpdate(priceInfos[0]);222 await waitForMessages(serverMessages, 2);223 expect(serverMessages[1]).toEqual({224 type: "price_update",225 price_feed: priceInfos[0].priceFeed.toJson(),226 });227 await sleep(100);228 expect(serverMessages.length).toBe(2);229 client.close();230 await waitForSocketState(client, client.CLOSED);231 });232 test("When subscribes for Price Feed A, receives updated and when unsubscribes stops receiving", async () => {233 let [client, serverMessages] = await createSocketClient();234 let message: ClientMessage = {235 ids: [priceInfos[0].priceFeed.id],236 type: "subscribe",237 };238 client.send(JSON.stringify(message));239 await waitForMessages(serverMessages, 1);240 expect(serverMessages[0]).toStrictEqual({241 type: "response",242 status: "success",243 });244 api.dispatchPriceFeedUpdate(priceInfos[0]);245 await waitForMessages(serverMessages, 2);246 expect(serverMessages[1]).toEqual({247 type: "price_update",248 price_feed: priceInfos[0].priceFeed.toJson(),249 });250 message = {251 ids: [priceInfos[0].priceFeed.id],252 type: "unsubscribe",253 };254 client.send(JSON.stringify(message));255 await waitForMessages(serverMessages, 3);256 expect(serverMessages[2]).toStrictEqual({257 type: "response",258 status: "success",259 });260 api.dispatchPriceFeedUpdate(priceInfos[0]);261 await sleep(100);262 expect(serverMessages.length).toBe(3);263 client.close();264 await waitForSocketState(client, client.CLOSED);265 });266 test("Unsubscribe on not subscribed price feed is ok", async () => {267 let [client, serverMessages] = await createSocketClient();268 let message: ClientMessage = {269 ids: [priceInfos[0].priceFeed.id],270 type: "unsubscribe",271 };272 client.send(JSON.stringify(message));273 await waitForMessages(serverMessages, 1);274 expect(serverMessages[0]).toStrictEqual({275 type: "response",276 status: "success",277 });278 client.close();279 await waitForSocketState(client, client.CLOSED);280 });281 test("Multiple clients with different price feed works", async () => {282 let [client1, serverMessages1] = await createSocketClient();283 let [client2, serverMessages2] = await createSocketClient();284 let message1: ClientMessage = {285 ids: [priceInfos[0].priceFeed.id],286 type: "subscribe",287 };288 client1.send(JSON.stringify(message1));289 let message2: ClientMessage = {290 ids: [priceInfos[1].priceFeed.id],291 type: "subscribe",292 };293 client2.send(JSON.stringify(message2));294 await waitForMessages(serverMessages1, 1);295 await waitForMessages(serverMessages2, 1);296 expect(serverMessages1[0]).toStrictEqual({297 type: "response",298 status: "success",299 });300 expect(serverMessages2[0]).toStrictEqual({301 type: "response",302 status: "success",303 });304 api.dispatchPriceFeedUpdate(priceInfos[0]);305 api.dispatchPriceFeedUpdate(priceInfos[1]);306 await waitForMessages(serverMessages1, 2);307 await waitForMessages(serverMessages2, 2);308 expect(serverMessages1[1]).toEqual({309 type: "price_update",310 price_feed: priceInfos[0].priceFeed.toJson(),311 });312 expect(serverMessages2[1]).toEqual({313 type: "price_update",314 price_feed: priceInfos[1].priceFeed.toJson(),315 });316 client1.close();317 client2.close();318 await waitForSocketState(client1, client1.CLOSED);319 await waitForSocketState(client2, client2.CLOSED);320 });321});

Full Screen

Full Screen

Using AI Code Generation

copy

Full Screen

1var wptools = require('wptools');2var wp = wptools('Albert Einstein');3wp.get(function(err, resp) {4 console.log(resp);5 wp.waitForMessages(function(err, resp) {6 console.log(resp);7 });8});9var wptools = require('wptools');10var wp = wptools('Albert Einstein');11wp.get(function(err, resp) {12 console.log(resp);13 wp.waitForMessages(function(err, resp) {14 console.log(resp);15 });16});17var wptools = require('wptools');18var wp = wptools('Albert Einstein');19wp.get(function(err, resp) {20 console.log(resp);21 wp.waitForMessages(function(err, resp) {22 console.log(resp);23 });24});25var wptools = require('wptools');26var wp = wptools('Albert Einstein');27wp.get(function(err, resp) {28 console.log(resp);29 wp.waitForMessages(function(err, resp) {30 console.log(resp);31 });32});33var wptools = require('wptools');34var wp = wptools('Albert Einstein');35wp.get(function(err, resp) {36 console.log(resp);37 wp.waitForMessages(function(err, resp) {38 console.log(resp);39 });40});41var wptools = require('wptools');42var wp = wptools('Albert Einstein');43wp.get(function(err, resp) {44 console.log(resp);45 wp.waitForMessages(function(err, resp) {46 console.log(resp);47 });48});49var wptools = require('wptools');50var wp = wptools('Albert Einstein');51wp.get(function(err, resp) {52 console.log(resp);53 wp.waitForMessages(function(err, resp) {54 console.log(resp);55 });56});

Full Screen

Using AI Code Generation

copy

Full Screen

1var wptools = require('wptools');2var wp = new wptools('Barack Obama');3wp.waitForMessages(function(err, messages) {4 if (err) {5 console.log('Error: ' + err);6 } else {7 console.log('Messages: ' + messages);8 }9});10var wptools = require('wptools');11var wp = new wptools('Barack Obama');12wp.waitForMessages(function(err, messages) {13 if (err) {14 console.log('Error: ' + err);15 } else {16 console.log('Messages: ' + messages);17 }18});19var wptools = require('wptools');20var wp = new wptools('Barack Obama');21wp.waitForMessages(function(err, messages) {22 if (err) {23 console.log('Error: ' + err);24 } else {25 console.log('Messages: ' + messages);26 }27});28var wptools = require('wptools');29var wp = new wptools('Barack Obama');30wp.waitForMessages(function(err, messages) {31 if (err) {32 console.log('Error: ' + err);33 } else {34 console.log('Messages: ' + messages);35 }36});37var wptools = require('wptools');38var wp = new wptools('Barack Obama');39wp.waitForMessages(function(err, messages) {40 if (err) {41 console.log('Error: ' + err);42 } else {43 console.log('Messages: ' + messages);44 }45});46var wptools = require('wptools');47var wp = new wptools('Barack Obama');48wp.waitForMessages(function(err, messages) {49 if (err) {50 console.log('Error: ' + err);51 } else {52 console.log('Messages: ' + messages);53 }54});

Full Screen

Using AI Code Generation

copy

Full Screen

1var wptoolkit = require('wptoolkit');2var wp = new wptoolkit();3wp.waitForMessages(1, function (messages) {4 console.log('Message: ' + messages[0].message);5 console.log('Sender: ' + messages[0].sender);6 console.log('Receiver: ' + messages[0].receiver);7 console.log('Type: ' + messages[0].type);8 console.log('Timestamp: ' + messages[0].timestamp);9});10var wptoolkit = require('wptoolkit');11var wp = new wptoolkit();12wp.sendMessage('Hello', 'receiver', 'message', function (status) {13 console.log('Status: ' + status);14});15var wptoolkit = require('wptoolkit');16var wp = new wptoolkit();17wp.getMessages(function (messages) {18 console.log('Message: ' + messages[0].message);19 console.log('Sender: ' + messages[0].sender);20 console.log('Receiver: ' + messages[0].receiver);21 console.log('Type: ' + messages[0].type);22 console.log('Timestamp: ' + messages[0].timestamp);23});24var wptoolkit = require('wptoolkit');25var wp = new wptoolkit();26wp.deleteMessage(messageId, function (status) {27 console.log('Status: ' + status);28});29var wptoolkit = require('wptoolkit');30var wp = new wptoolkit();31wp.deleteMessages(function (status) {32 console.log('Status: ' + status);33});34var wptoolkit = require('wptoolkit');35var wp = new wptoolkit();36wp.getPlugins(function (plugins) {37 console.log('Name: ' + plugins[0].name);38 console.log('Version: ' + plugins[0].version);39 console.log('Author

Full Screen

Using AI Code Generation

copy

Full Screen

1var wptoolkit = require('wptoolkit');2var wp = new wptoolkit();3 console.log(data);4});5var wptoolkit = require('wptoolkit');6var wp = new wptoolkit();7var wptoolkit = require('wptoolkit');8var wp = new wptoolkit();9var wptoolkit = require('wptoolkit');10var wp = new wptoolkit();11 console.log(data);12});13var wptoolkit = require('wptoolkit');14var wp = new wptoolkit();15 console.log(data);16});17var wptoolkit = require('wptoolkit');18var wp = new wptoolkit();19 console.log(data);20});21var wptoolkit = require('wptoolkit');22var wp = new wptoolkit();23 console.log(data);24});25var wptoolkit = require('wptoolkit');26var wp = new wptoolkit();27var wptoolkit = require('w

Full Screen

Using AI Code Generation

copy

Full Screen

1var wptoolkit = require('wptoolkit');2var config = {3};4wptoolkit.run(config, function (err, browser) {5 if (err) {6 console.log(err);7 return;8 }9 browser.waitForMessages(function (message) {10 console.log(message);11 }, function (err) {12 console.log(err);13 }, 10000);14});15var wptoolkit = require('wptoolkit');16var config = {17};18wptoolkit.run(config, function (err, browser) {19 if (err) {20 console.log(err);21 return;22 }23 browser.waitForMessages(function (message) {24 console.log(message);25 }, function (err) {26 console.log(err);27 }, 10000, ['Document loaded', 'Document complete']);28});29var wptoolkit = require('wptoolkit');30var config = {31};32wptoolkit.run(config, function (err, browser) {33 if (err) {34 console.log(err);35 return;36 }37 browser.waitForMessages(function (message) {38 console.log(message);39 }, function (err) {40 console.log(err);41 }, 10000, ['Document loaded',

Full Screen

Using AI Code Generation

copy

Full Screen

1var wptoolkit = require("wptoolkit");2wp.waitForMessages("test", 10, function(err, messages) {3if (err) {4console.log(err);5} else {6console.log(messages);7}8});9var wptoolkit = require("wptoolkit");10wp.waitForMessages("test", 10, function(err, messages) {11if (err) {12console.log(err);13} else {14console.log(messages);15}16});17var wptoolkit = require("wptoolkit");18wp.waitForMessages("test", 10, function(err, messages) {19if (err) {20console.log(err);21} else {22console.log(messages);23}24});25var wptoolkit = require("wptoolkit");26wp.waitForMessages("test", 10, function(err, messages) {27if (err) {28console.log(err);29} else {30console.log(messages);31}32});33var wptoolkit = require("wptoolkit");34wp.waitForMessages("test", 10, function(err, messages) {35if (err) {36console.log(err);37} else {38console.log(messages);39}40});41var wptoolkit = require("wptoolkit");42wp.waitForMessages("test", 10, function(err, messages) {43if (err) {44console.log(err);45} else {46console.log(messages);47}48});49var wptoolkit = require("wpt

Full Screen

Using AI Code Generation

copy

Full Screen

1var wptoolkit = require('wptoolkit');2var wpt = new wptoolkit();3 if (err) {4 console.log('Error: ' + err);5 } else {6 console.log('Success: ' + result);7 }8});9var wptoolkit = require('wptoolkit');10var wpt = new wptoolkit();11 if (err) {12 console.log('Error: ' + err);13 } else {14 console.log('Success: ' + result);15 }16});17var wptoolkit = require('wptoolkit');18var wpt = new wptoolkit();19 if (err) {20 console.log('Error: ' + err);21 } else {22 console.log('Success: ' + result);23 }24});25var wptoolkit = require('wptoolkit');26var wpt = new wptoolkit();27 if (err) {28 console.log('Error: ' + err);29 } else {30 console.log('Success: ' + result);31 }32});33var wptoolkit = require('wptoolkit');34var wpt = new wptoolkit();35 if (err) {36 console.log('Error: ' + err);37 }

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run wpt automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful