Best JavaScript code snippet using mocha
index.spec.js
Source:index.spec.js
...44 producer && (await producer.disconnect())45 consumer && (await consumer.disconnect())46 })47 test('throws an error if the topic is invalid', async () => {48 producer = createProducer({ cluster: createCluster(), logger: newLogger() })49 await expect(producer.send({ acks: 1, topic: null })).rejects.toHaveProperty(50 'message',51 'Invalid topic'52 )53 })54 test('throws an error if messages is invalid', async () => {55 producer = createProducer({ cluster: createCluster(), logger: newLogger() })56 await expect(57 producer.send({ acks: 1, topic: topicName, messages: null })58 ).rejects.toHaveProperty('message', `Invalid messages array [null] for topic "${topicName}"`)59 })60 test('throws an error for messages with a value of undefined', async () => {61 producer = createProducer({ cluster: createCluster(), logger: newLogger() })62 await expect(63 producer.send({ acks: 1, topic: topicName, messages: [{ foo: 'bar' }] })64 ).rejects.toHaveProperty(65 'message',66 `Invalid message without value for topic "${topicName}": {"foo":"bar"}`67 )68 })69 test('throws an error if the producer is not connected', async () => {70 producer = createProducer({ cluster: createCluster(), logger: newLogger() })71 await expect(72 producer.send({73 topic: topicName,74 messages: [{ key: 'key', value: 'value' }],75 })76 ).rejects.toThrow(/The producer is disconnected/)77 })78 test('throws an error if the producer is disconnecting', async () => {79 const cluster = createCluster()80 const originalDisconnect = cluster.disconnect81 cluster.disconnect = async () => {82 await sleep(10)83 return originalDisconnect.apply(cluster)84 }85 producer = createProducer({ cluster, logger: newLogger() })86 await producer.connect()87 producer.disconnect() // slow disconnect should give a disconnecting status88 await expect(89 producer.send({90 topic: topicName,91 messages: [{ key: 'key', value: 'value' }],92 })93 ).rejects.toThrow(/The producer is disconnecting/)94 cluster.disconnect = originalDisconnect95 })96 test('allows messages with a null value to support tombstones', async () => {97 producer = createProducer({ cluster: createCluster(), logger: newLogger() })98 await producer.connect()99 await producer.send({ acks: 1, topic: topicName, messages: [{ foo: 'bar', value: null }] })100 })101 test('support SSL connections', async () => {102 const cluster = createCluster(sslConnectionOpts(), sslBrokers())103 producer = createProducer({ cluster, logger: newLogger() })104 await producer.connect()105 })106 for (const e of saslEntries) {107 test(`support SASL ${e.name} connections`, async () => {108 const cluster = createCluster(e.opts(), saslBrokers())109 producer = createProducer({ cluster, logger: newLogger() })110 await producer.connect()111 })112 if (e.wrongOpts) {113 test(`throws an error if SASL ${e.name} fails to authenticate`, async () => {114 const cluster = createCluster(e.wrongOpts(), saslBrokers())115 producer = createProducer({ cluster, logger: newLogger() })116 await expect(producer.connect()).rejects.toThrow(e.expectedErr)117 })118 }119 }120 test('reconnects the cluster if disconnected', async () => {121 const cluster = createCluster({122 createPartitioner: createModPartitioner,123 })124 await createTopic({ topic: topicName })125 producer = createProducer({ cluster, logger: newLogger() })126 await producer.connect()127 await producer.send({128 acks: 1,129 topic: topicName,130 messages: [{ key: '1', value: '1' }],131 })132 expect(cluster.isConnected()).toEqual(true)133 await cluster.disconnect()134 expect(cluster.isConnected()).toEqual(false)135 await producer.send({136 acks: 1,137 topic: topicName,138 messages: [{ key: '2', value: '2' }],139 })140 expect(cluster.isConnected()).toEqual(true)141 })142 test('gives access to its logger', () => {143 producer = createProducer({ cluster: createCluster(), logger: newLogger() })144 expect(producer.logger()).toMatchSnapshot()145 })146 test('on throws an error when provided with an invalid event name', () => {147 producer = createProducer({ cluster: createCluster(), logger: newLogger() })148 expect(() => producer.on('NON_EXISTENT_EVENT', () => {})).toThrow(149 /Event name should be one of producer.events./150 )151 })152 test('emits connection events', async () => {153 producer = createProducer({ cluster: createCluster(), logger: newLogger() })154 const connectListener = jest.fn().mockName('connect')155 const disconnectListener = jest.fn().mockName('disconnect')156 producer.on(producer.events.CONNECT, connectListener)157 producer.on(producer.events.DISCONNECT, disconnectListener)158 await producer.connect()159 expect(connectListener).toHaveBeenCalled()160 await producer.disconnect()161 expect(disconnectListener).toHaveBeenCalled()162 })163 test('emits the request event', async () => {164 const emitter = new InstrumentationEventEmitter()165 producer = createProducer({166 logger: newLogger(),167 cluster: createCluster({ instrumentationEmitter: emitter }),168 instrumentationEmitter: emitter,169 })170 const requestListener = jest.fn().mockName('request')171 producer.on(producer.events.REQUEST, requestListener)172 await producer.connect()173 expect(requestListener).toHaveBeenCalledWith({174 id: expect.any(Number),175 timestamp: expect.any(Number),176 type: 'producer.network.request',177 payload: {178 apiKey: expect.any(Number),179 apiName: 'ApiVersions',180 apiVersion: expect.any(Number),181 broker: expect.any(String),182 clientId: expect.any(String),183 correlationId: expect.any(Number),184 createdAt: expect.any(Number),185 duration: expect.any(Number),186 pendingDuration: expect.any(Number),187 sentAt: expect.any(Number),188 size: expect.any(Number),189 },190 })191 })192 test('emits the request timeout event', async () => {193 const emitter = new InstrumentationEventEmitter()194 const cluster = createCluster({195 requestTimeout: 1,196 enforceRequestTimeout: true,197 instrumentationEmitter: emitter,198 })199 producer = createProducer({200 cluster,201 logger: newLogger(),202 instrumentationEmitter: emitter,203 })204 const requestListener = jest.fn().mockName('request_timeout')205 producer.on(producer.events.REQUEST_TIMEOUT, requestListener)206 await producer207 .connect()208 .then(() =>209 producer.send({210 acks: -1,211 topic: topicName,212 messages: [{ key: 'key-0', value: 'value-0' }],213 })214 )215 .catch(e => e)216 expect(requestListener).toHaveBeenCalledWith({217 id: expect.any(Number),218 timestamp: expect.any(Number),219 type: 'producer.network.request_timeout',220 payload: {221 apiKey: expect.any(Number),222 apiName: expect.any(String),223 apiVersion: expect.any(Number),224 broker: expect.any(String),225 clientId: expect.any(String),226 correlationId: expect.any(Number),227 createdAt: expect.any(Number),228 pendingDuration: expect.any(Number),229 sentAt: expect.any(Number),230 },231 })232 })233 test('emits the request queue size event', async () => {234 await createTopic({ topic: topicName, partitions: 8 })235 const emitter = new InstrumentationEventEmitter()236 const cluster = createCluster({237 instrumentationEmitter: emitter,238 maxInFlightRequests: 1,239 clientId: 'test-client-id11111',240 })241 producer = createProducer({242 cluster,243 logger: newLogger(),244 instrumentationEmitter: emitter,245 })246 const requestListener = jest.fn().mockName('request_queue_size')247 producer.on(producer.events.REQUEST_QUEUE_SIZE, requestListener)248 await producer.connect()249 await Promise.all([250 producer.send({251 acks: -1,252 topic: topicName,253 messages: [254 { partition: 0, value: 'value-0' },255 { partition: 1, value: 'value-1' },256 { partition: 2, value: 'value-2' },257 ],258 }),259 producer.send({260 acks: -1,261 topic: topicName,262 messages: [263 { partition: 0, value: 'value-0' },264 { partition: 1, value: 'value-1' },265 { partition: 2, value: 'value-2' },266 ],267 }),268 ])269 expect(requestListener).toHaveBeenCalledWith({270 id: expect.any(Number),271 timestamp: expect.any(Number),272 type: 'producer.network.request_queue_size',273 payload: {274 broker: expect.any(String),275 clientId: expect.any(String),276 queueSize: expect.any(Number),277 },278 })279 })280 describe('when acks=0', () => {281 it('returns immediately', async () => {282 const cluster = createCluster({283 ...connectionOpts(),284 createPartitioner: createModPartitioner,285 })286 await createTopic({ topic: topicName })287 producer = createProducer({ cluster, logger: newLogger() })288 await producer.connect()289 const sendMessages = async () =>290 await producer.send({291 acks: 0,292 topic: topicName,293 messages: new Array(10).fill().map((_, i) => ({294 key: `key-${i}`,295 value: `value-${i}`,296 })),297 })298 expect(await sendMessages()).toEqual([])299 })300 })301 function testProduceMessages(idempotent = false) {302 const acks = idempotent ? -1 : 1303 test('produce messages', async () => {304 const cluster = createCluster({305 createPartitioner: createModPartitioner,306 })307 await createTopic({ topic: topicName })308 producer = createProducer({ cluster, logger: newLogger(), idempotent })309 await producer.connect()310 const sendMessages = async () =>311 await producer.send({312 acks,313 topic: topicName,314 messages: new Array(10).fill().map((_, i) => ({315 key: `key-${i}`,316 value: `value-${i}`,317 })),318 })319 expect(await sendMessages()).toEqual([320 {321 baseOffset: '0',322 topicName,323 errorCode: 0,324 partition: 0,325 logAppendTime: '-1',326 logStartOffset: '0',327 },328 ])329 expect(await sendMessages()).toEqual([330 {331 baseOffset: '10',332 topicName,333 errorCode: 0,334 partition: 0,335 logAppendTime: '-1',336 logStartOffset: '0',337 },338 ])339 })340 test('it should allow sending an empty list of messages', async () => {341 const cluster = createCluster({342 createPartitioner: createModPartitioner,343 })344 await createTopic({ topic: topicName })345 producer = createProducer({ cluster, logger: newLogger(), idempotent })346 await producer.connect()347 await expect(producer.send({ acks, topic: topicName, messages: [] })).toResolve()348 })349 test('produce messages to multiple topics', async () => {350 const topics = [`test-topic-${secureRandom()}`, `test-topic-${secureRandom()}`]351 await createTopic({ topic: topics[0] })352 await createTopic({ topic: topics[1] })353 const cluster = createCluster({354 ...connectionOpts(),355 createPartitioner: createModPartitioner,356 })357 const byTopicName = (a, b) => a.topicName.localeCompare(b.topicName)358 producer = createProducer({ cluster, logger: newLogger(), idempotent })359 await producer.connect()360 const sendBatch = async topics => {361 const topicMessages = topics.map(topic => ({362 acks,363 topic,364 messages: new Array(10).fill().map((_, i) => ({365 key: `key-${i}`,366 value: `value-${i}`,367 })),368 }))369 return producer.sendBatch({370 acks,371 topicMessages,372 })373 }374 let result = await sendBatch(topics)375 expect(result.sort(byTopicName)).toEqual(376 [377 {378 baseOffset: '0',379 topicName: topics[0],380 errorCode: 0,381 partition: 0,382 logStartOffset: '0',383 logAppendTime: '-1',384 },385 {386 topicName: topics[1],387 errorCode: 0,388 baseOffset: '0',389 partition: 0,390 logStartOffset: '0',391 logAppendTime: '-1',392 },393 ].sort(byTopicName)394 )395 result = await sendBatch(topics)396 expect(result.sort(byTopicName)).toEqual(397 [398 {399 topicName: topics[0],400 errorCode: 0,401 baseOffset: '10',402 partition: 0,403 logAppendTime: '-1',404 logStartOffset: '0',405 },406 {407 topicName: topics[1],408 errorCode: 0,409 baseOffset: '10',410 partition: 0,411 logAppendTime: '-1',412 logStartOffset: '0',413 },414 ].sort(byTopicName)415 )416 })417 test('sendBatch should allow sending an empty list of topicMessages', async () => {418 const cluster = createCluster({419 createPartitioner: createModPartitioner,420 })421 await createTopic({ topic: topicName })422 producer = createProducer({ cluster, logger: newLogger(), idempotent })423 await producer.connect()424 await expect(producer.sendBatch({ acks, topicMessages: [] })).toResolve()425 })426 test('sendBatch should consolidate topicMessages by topic', async () => {427 const cluster = createCluster({428 createPartitioner: createModPartitioner,429 })430 await createTopic({ topic: topicName, partitions: 1 })431 const messagesConsumed = []432 consumer = createConsumer({433 groupId: `test-consumer-${uuid()}`,434 cluster: createCluster(),435 logger: newLogger(),436 })437 await consumer.connect()438 await consumer.subscribe({ topic: topicName, fromBeginning: true })439 await consumer.run({440 eachMessage: async event => {441 messagesConsumed.push(event)442 },443 })444 producer = createProducer({ cluster, logger: newLogger(), idempotent })445 await producer.connect()446 const topicMessages = [447 {448 topic: topicName,449 messages: [450 { key: 'key-1', value: 'value-1' },451 { key: 'key-2', value: 'value-2' },452 ],453 },454 {455 topic: topicName,456 messages: [{ key: 'key-3', value: 'value-3' }],457 },458 ]459 const result = await producer.sendBatch({460 acks,461 topicMessages,462 })463 expect(result).toEqual([464 {465 topicName,466 errorCode: 0,467 baseOffset: '0',468 partition: 0,469 logAppendTime: '-1',470 logStartOffset: '0',471 },472 ])473 await waitForMessages(messagesConsumed, { number: 3 })474 await expect(waitForMessages(messagesConsumed, { number: 3 })).resolves.toEqual([475 expect.objectContaining({476 topic: topicName,477 partition: 0,478 message: expect.objectContaining({479 key: Buffer.from('key-1'),480 value: Buffer.from('value-1'),481 offset: '0',482 }),483 }),484 expect.objectContaining({485 topic: topicName,486 partition: 0,487 message: expect.objectContaining({488 key: Buffer.from('key-2'),489 value: Buffer.from('value-2'),490 offset: '1',491 }),492 }),493 expect.objectContaining({494 topic: topicName,495 partition: 0,496 message: expect.objectContaining({497 key: Buffer.from('key-3'),498 value: Buffer.from('value-3'),499 offset: '2',500 }),501 }),502 ])503 })504 testIfKafkaAtLeast_0_11('produce messages for Kafka 0.11', async () => {505 const cluster = createCluster({506 createPartitioner: createModPartitioner,507 })508 await createTopic({ topic: topicName })509 producer = createProducer({ cluster, logger: newLogger(), idempotent })510 await producer.connect()511 const sendMessages = async () =>512 await producer.send({513 acks,514 topic: topicName,515 messages: new Array(10).fill().map((_, i) => ({516 key: `key-${i}`,517 value: `value-${i}`,518 })),519 })520 expect(await sendMessages()).toEqual([521 {522 topicName,523 baseOffset: '0',524 errorCode: 0,525 logAppendTime: '-1',526 logStartOffset: '0',527 partition: 0,528 },529 ])530 expect(await sendMessages()).toEqual([531 {532 topicName,533 baseOffset: '10',534 errorCode: 0,535 logAppendTime: '-1',536 logStartOffset: '0',537 partition: 0,538 },539 ])540 })541 testIfKafkaAtLeast_0_11(542 'produce messages for Kafka 0.11 without specifying message key',543 async () => {544 const cluster = createCluster({545 createPartitioner: createModPartitioner,546 })547 await createTopic({ topic: topicName })548 producer = createProducer({ cluster, logger: newLogger(), idempotent })549 await producer.connect()550 await expect(551 producer.send({552 acks,553 topic: topicName,554 messages: [555 {556 value: 'test-value',557 },558 ],559 })560 ).toResolve()561 }562 )563 testIfKafkaAtLeast_0_11('produce messages for Kafka 0.11 with headers', async () => {564 const cluster = createCluster({565 createPartitioner: createModPartitioner,566 })567 await createTopic({ topic: topicName })568 producer = createProducer({ cluster, logger: newLogger(), idempotent })569 await producer.connect()570 const sendMessages = async () =>571 await producer.send({572 acks,573 topic: topicName,574 messages: new Array(10).fill().map((_, i) => ({575 key: `key-${i}`,576 value: `value-${i}`,577 headers: {578 [`header-a${i}`]: `header-value-a${i}`,579 [`header-b${i}`]: `header-value-b${i}`,580 [`header-c${i}`]: `header-value-c${i}`,581 },582 })),583 })584 expect(await sendMessages()).toEqual([585 {586 topicName,587 baseOffset: '0',588 errorCode: 0,589 logAppendTime: '-1',590 logStartOffset: '0',591 partition: 0,592 },593 ])594 expect(await sendMessages()).toEqual([595 {596 topicName,597 baseOffset: '10',598 errorCode: 0,599 logAppendTime: '-1',600 logStartOffset: '0',601 partition: 0,602 },603 ])604 })605 }606 testProduceMessages(false)607 describe('when idempotent=true', () => {608 testProduceMessages(true)609 test('throws an error if sending a message with acks != -1', async () => {610 const cluster = createCluster({611 createPartitioner: createModPartitioner,612 })613 producer = createProducer({ cluster, logger: newLogger(), idempotent: true })614 await producer.connect()615 await expect(616 producer.send({617 acks: 1,618 topic: topicName,619 messages: new Array(10).fill().map((_, i) => ({620 key: `key-${i}`,621 value: `value-${i}`,622 })),623 })624 ).rejects.toEqual(625 new KafkaJSNonRetriableError(626 "Not requiring ack for all messages invalidates the idempotent producer's EoS guarantees"627 )628 )629 await expect(630 producer.send({631 acks: 0,632 topic: topicName,633 messages: new Array(10).fill().map((_, i) => ({634 key: `key-${i}`,635 value: `value-${i}`,636 })),637 })638 ).rejects.toEqual(639 new KafkaJSNonRetriableError(640 "Not requiring ack for all messages invalidates the idempotent producer's EoS guarantees"641 )642 )643 })644 test('sets the default retry value to MAX_SAFE_INTEGER', async () => {645 const cluster = createCluster({646 createPartitioner: createModPartitioner,647 })648 producer = createProducer({ cluster, logger: newLogger(), idempotent: true })649 expect(retrySpy).toHaveBeenCalledWith({ retries: Number.MAX_SAFE_INTEGER })650 })651 test('throws an error if retries < 1', async () => {652 expect(() =>653 createProducer({654 cluster: {},655 logger: newLogger(),656 idempotent: true,657 retry: { retries: 0 },658 })659 ).toThrowError(660 new KafkaJSNonRetriableError(661 'Idempotent producer must allow retries to protect against transient errors'662 )663 )664 })665 test('only calls initProducerId if unitialized', async () => {666 const cluster = createCluster({667 createPartitioner: createModPartitioner,668 })669 producer = createProducer({ cluster, logger: newLogger(), idempotent: true })670 await producer.connect()671 expect(initProducerIdSpy).toHaveBeenCalledTimes(1)672 initProducerIdSpy.mockClear()673 await producer.connect()674 expect(initProducerIdSpy).toHaveBeenCalledTimes(0)675 })676 })677 describe('transactions', () => {678 let transactionalId679 beforeEach(() => {680 transactionalId = `transactional-id-${secureRandom()}`681 })682 const testTransactionEnd = (shouldCommit = true) => {683 const endFn = shouldCommit ? 'commit' : 'abort'684 testIfKafkaAtLeast_0_11(`transaction flow ${endFn}`, async () => {685 const cluster = createCluster({686 createPartitioner: createModPartitioner,687 })688 await createTopic({ topic: topicName })689 producer = createProducer({690 cluster,691 logger: newLogger(),692 transactionalId,693 })694 await producer.connect()695 const txn = await producer.transaction()696 await expect(producer.transaction()).rejects.toEqual(697 new KafkaJSNonRetriableError(698 'There is already an ongoing transaction for this producer. Please end the transaction before beginning another.'699 )700 )701 await txn.send({702 topic: topicName,703 messages: [{ key: '2', value: '2' }],704 })705 await txn.sendBatch({706 topicMessages: [707 {708 topic: topicName,709 messages: [{ key: '2', value: '2' }],710 },711 ],712 })713 await txn[endFn]() // Dynamic714 await expect(txn.send()).rejects.toEqual(715 new KafkaJSNonRetriableError('Cannot continue to use transaction once ended')716 )717 await expect(txn.sendBatch()).rejects.toEqual(718 new KafkaJSNonRetriableError('Cannot continue to use transaction once ended')719 )720 await expect(txn.commit()).rejects.toEqual(721 new KafkaJSNonRetriableError('Cannot continue to use transaction once ended')722 )723 await expect(txn.abort()).rejects.toEqual(724 new KafkaJSNonRetriableError('Cannot continue to use transaction once ended')725 )726 expect(await producer.transaction()).toBeTruthy() // Can create another transaction727 })728 }729 testTransactionEnd(true)730 testTransactionEnd(false)731 testIfKafkaAtLeast_0_11('allows sending messages outside a transaction', async () => {732 const cluster = createCluster({733 createPartitioner: createModPartitioner,734 })735 await createTopic({ topic: topicName })736 producer = createProducer({737 cluster,738 logger: newLogger(),739 transactionalId,740 })741 await producer.connect()742 await producer.transaction()743 await producer.send({744 topic: topicName,745 messages: [746 {747 key: 'key',748 value: 'value',749 },750 ],751 })752 await producer.sendBatch({753 topicMessages: [754 {755 topic: topicName,756 messages: [757 {758 key: 'key',759 value: 'value',760 },761 ],762 },763 ],764 })765 })766 testIfKafkaAtLeast_0_11('supports sending offsets', async () => {767 const cluster = createCluster()768 const markOffsetAsCommittedSpy = jest.spyOn(cluster, 'markOffsetAsCommitted')769 await createTopic({ topic: topicName, partitions: 2 })770 producer = createProducer({771 cluster,772 logger: newLogger(),773 transactionalId,774 })775 await producer.connect()776 const consumerGroupId = `consumer-group-id-${secureRandom()}`777 const topics = [778 {779 topic: topicName,780 partitions: [781 {782 partition: 0,783 offset: '5',784 },...
createProducer-test.js
Source:createProducer-test.js
...21 sandbox.restore()22})23test('should throw an error if now queueName is provided', async (t) => {24 try {25 await createProducer({26 logger: console,27 amqUrl: TEST_AMQ_URL28 })29 t.fail()30 } catch (err) {31 t.truthy(err)32 }33})34test('should throw an error if a connection cannot be made', async (t) => {35 const createConnection = Promise.reject36 const createProducer = proxyquire('~/queue/util/createProducer', {37 './createConnection': createConnection38 })39 try {40 await createProducer({41 logger: console,42 amqUrl: TEST_AMQ_URL,43 producerOptions: {44 queueName: TEST_QUEUE_NAME45 }46 })47 } catch (err) {48 t.truthy(err)49 }50})51test('should throw an error if a producer cannot start', async (t) => {52 const producerStartError = new Error('producer start err')53 class MockQueueProducer {54 start () {55 return Promise.reject(producerStartError)56 }57 }58 const createConnection = () => new MockConnection(new MockChannel())59 const createProducer = proxyquire('~/queue/util/createProducer', {60 './createConnection': createConnection,61 '../QueueProducer': MockQueueProducer62 })63 try {64 await createProducer({65 logger: console,66 amqUrl: TEST_AMQ_URL,67 producerOptions: {68 queueName: TEST_QUEUE_NAME69 }70 })71 t.fail()72 } catch (err) {73 t.is(err, producerStartError)74 }75})76test('should throw an error if a producer cannot start', async (t) => {77 class MockQueueProducer {78 start () {79 return Promise.resolve()80 }81 }82 const createConnection = () => new MockConnection(new MockChannel())83 const createProducer = proxyquire('~/queue/util/createProducer', {84 './createConnection': createConnection,85 '../QueueProducer': MockQueueProducer86 })87 await createProducer({88 logger: console,89 amqUrl: TEST_AMQ_URL,90 producerOptions: {91 queueName: TEST_QUEUE_NAME92 }93 })94 t.pass()95})96test('should emit an error if connection emits an error', async (t) => {97 const { connection } = t.context98 const producer = await createProducer({99 logger: console,100 amqUrl: TEST_AMQ_URL,101 producerOptions: {102 queueName: TEST_QUEUE_NAME,103 connection104 }105 })106 const errorEventsPromise = Promise.all([107 waitForEvent(producer, 'error', (err) => {108 return err.message === 'test error'109 }),110 waitForEvent(connection, 'error', (err) => {111 return err.message === 'test error'112 })...
myproducer.js
Source:myproducer.js
...28 return reject(err);29 });30 });31}32//function createProducer(config, onDeliveryReport) {33function createProducer( onDeliveryReport) {34 const producer = new Kafka.Producer({35 'bootstrap.servers': bootstrapServers,36 //'bootstrap.servers': config['bootstrap.servers'],37 'dr_msg_cb': true38 });39 return new Promise((resolve, reject) => {40 producer41 .on('ready', () => resolve(producer))42 .on('delivery-report', onDeliveryReport)43 .on('event.error', (err) => {44 console.warn('event.error', err);45 reject(err);46 });47 producer.connect();48 });49}50async function produceExample() {51 /*const config = await configFromCli();52 if (config.usage) {53 return console.log(config.usage);54 }55 await ensureTopicExists(config);56 */57 //const producer = await createProducer(config, (err, report) => {58 const producer = await createProducer((err, report) => {59 if (err) {60 console.warn('Error producing', err)61 } else {62 const {topic, partition, value} = report;63 console.log(`Successfully produced record to topic "${topic}" partition ${partition} ${value}`);64 }65 });66 let response = ''67 for (let idx = 0; idx < 10; ++idx) {68 const key = 'john';69 const value = Buffer.from(JSON.stringify({ count: idx }));70 response += `Producing record ${key}\t${value}`71 producer.produce(mytopic.topic, -1, value, key);72 }...
index.js
Source:index.js
1import Switch from "../../../../utils/functors/Switch";2import INITIAL_STATE from "../state";3import Types from "../actions/actionTypes";4import * as loadProducers from "./operations/loadProducers";5import * as loadProducerDetails from "./operations/loadProducerDetails";6import * as createProducer from "./operations/createProducer";7import * as updateProducer from "./operations/updateProducer";8import * as deleteProducer from "./operations/deleteProducer";9const reducer = (state = INITIAL_STATE, { type, payload }) => {10 return Switch.on(type, state, payload, INITIAL_STATE)11 .case(Types.LOAD_PRODUCERS_REQUEST, loadProducers.request)12 .case(Types.LOAD_PRODUCERS_SUCCESS, loadProducers.success)13 .case(Types.LOAD_PRODUCERS_FAILURE, loadProducers.failure)14 .case(Types.LOAD_PRODUCER_DETAILS_REQUEST, loadProducerDetails.request)15 .case(Types.LOAD_PRODUCER_DETAILS_SUCCESS, loadProducerDetails.success)16 .case(Types.LOAD_PRODUCER_DETAILS_FAILURE, loadProducerDetails.failure)17 .case(Types.CREATE_PRODUCER_REQUEST, createProducer.request)18 .case(Types.CREATE_PRODUCER_SUCCESS, createProducer.success)19 .case(Types.CREATE_PRODUCER_FAILURE, createProducer.failure)20 .case(Types.UPDATE_PRODUCER_REQUEST, updateProducer.request)21 .case(Types.UPDATE_PRODUCER_SUCCESS, updateProducer.success)22 .case(Types.UPDATE_PRODUCER_FAILURE, updateProducer.failure)23 .case(Types.DELETE_PRODUCER_REQUEST, deleteProducer.request)24 .case(Types.DELETE_PRODUCER_SUCCESS, deleteProducer.success)25 .case(Types.DELETE_PRODUCER_FAILURE, deleteProducer.failure)26 .default(state);27};...
sensorController.js
Source:sensorController.js
1const createProducer = require('../kafka/producer');2exports.postAirQualityLog = async (req, res) => {3 try {4 const data = req.body;5 await createProducer('airQualityLog', data);6 res.json({data: true, code: "SUCCESS", timestamp: Date.now()}).status(200);7 } catch (err) {8 res.json({9 error: error.message,10 code: error.__proto__.name,11 timestamp: Date.now()12 }).status(500)13 }14}15exports.postTemperatureLog = async (req, res) => {16 try {17 const data = req.body;18 await createProducer('temperatureLog', data);19 res.json({data: true, code: "SUCCESS", timestamp: Date.now()}).status(200);20 } catch (err) {21 res.json({22 error: error.message,23 code: error.__proto__.name,24 timestamp: Date.now()25 }).status(500)26 }27}28exports.postElectricConsumptionLog = async (req, res) => {29 try {30 const data = req.body;31 await createProducer('electricConsumptionLog', data);32 res.json({data: true, code: "SUCCESS", timestamp: Date.now()}).status(200);33 } catch (err) {34 res.json({35 error: error.message,36 code: error.__proto__.name,37 timestamp: Date.now()38 }).status(500)39 }...
Create.js
Source:Create.js
1import React from 'react';2import { Card ,Message} from 'antd';3import { connect } from 'dva/index';4import ProducerForm from './Form.js'56@connect(({producer})=>({producer}))7class CreateProducer extends React.Component{8 constructor(props){9 super(props);10 this.state={11 record:{}12 };13 }1415 submitForm = values => {16 this.props.dispatch({17 type: 'producer/createProducer',18 payload: values,19 callback: () => {20 Message.success("æ·»å æå")21 this.props.history.push('list');22 },23 });24 };25 render(){26 return (27 <div>28 <Card title={<b>æ·»å ç产åä¿¡æ¯</b>}>29 <ProducerForm currentProducer={{}} submitForm={this.submitForm}></ProducerForm>30 </Card>31 </div>32 )33 }34}
...
producer.js
Source:producer.js
1import React from "react";2import PrivateRoute from "../PrivateRoute";3// components4import Producers from "../../pages/producer/producers/Producers";5import Producer from "../../pages/producer/details/Producer";6import CreateProducer from "../../pages/producer/form/CreateProducer";7import EditProducer from "../../pages/producer/form/EditProducer";8const ProducerRoutes = [9 <PrivateRoute10 key="producers/search"11 path="/producers/search"12 component={Producers}13 exact14 />,15 <PrivateRoute16 key="producer-create"17 path="/producers/create"18 component={CreateProducer}19 exact20 />,21 <PrivateRoute22 key="producer-edit"23 path="/producers/edit/:producerId"24 component={EditProducer}25 exact26 />,27 <PrivateRoute28 key="producer"29 path="/producers/:producerId"30 component={Producer}31 exact32 />33];...
kafkaHelper.js
Source:kafkaHelper.js
1'use strict'2const Producer = require('@mojaloop/central-services-stream').Kafka.Producer3const Logger = require('@mojaloop/central-services-logger')4const createProducer = async (config) => {5 Logger.debug('createProducer::start')6 // set the logger7 config.logger = Logger8 var p = new Producer(config)9 Logger.info('createProducer::- Connecting...')10 var connectionResult = await p.connect()11 Logger.info(`createProducer::- Connected result=${connectionResult}`)12 Logger.debug('createProducer::end')13 return p14}...
Using AI Code Generation
1var Mocha = require('mocha');2var mocha = new Mocha();3mocha.addFile('test.js');4mocha.run(function(failures) {5 process.on('exit', function() {6 process.exit(failures);7 });8});9var Mocha = require('mocha');10var mocha = new Mocha();11mocha.addFile('test.js');12mocha.run(function(failures) {13 process.on('exit', function() {14 process.exit(failures);15 });16});17var Mocha = require('mocha');18var mocha = new Mocha();19mocha.addFile('test.js');20mocha.run(function(failures) {21 process.on('exit', function() {22 process.exit(failures);23 });24});25var Mocha = require('mocha');26var mocha = new Mocha();27mocha.addFile('test.js');28mocha.run(function(failures) {29 process.on('exit', function() {30 process.exit(failures);31 });32});33var Mocha = require('mocha');34var mocha = new Mocha();35mocha.addFile('test.js');36mocha.run(function(failures) {37 process.on('exit', function() {38 process.exit(failures);39 });40});41var Mocha = require('mocha');42var mocha = new Mocha();43mocha.addFile('test.js');44mocha.run(function(failures) {45 process.on('exit', function() {46 process.exit(failures);47 });48});49var Mocha = require('mocha');50var mocha = new Mocha();51mocha.addFile('test.js');52mocha.run(function(failures) {53 process.on('exit', function() {54 process.exit(failures);55 });56});57var Mocha = require('mocha
Using AI Code Generation
1describe('Math', function() {2 it('add', function() {3 assert.equal(1 + 2, 3);4 });5});6{7 "scripts": {8 },9 "dependencies": {10 }11}12{13 "scripts": {14 },15 "dependencies": {16 }17}18describe('Math', function() {19 it('add', function() {20 assert.equal(1 + 2, 3);21 });22});23{24 "scripts": {25 },26 "dependencies": {
Using AI Code Generation
1var Mocha = require('mocha');2var mocha = new Mocha({3 reporterOptions: {4 }5});6var path = require('path');7mocha.addFile(path.join(__dirname, 'test.js'));8mocha.run(function(failures){9 process.on('exit', function () {10 });11});12var Mocha = require('mocha');13var mocha = new Mocha({14 reporterOptions: {15 }16});17var path = require('path');18mocha.addFile(path.join(__dirname, 'test.js'));19mocha.run(function(failures){20 process.on('exit', function () {21 });22});23var Mocha = require('mocha');24var mocha = new Mocha({25 reporterOptions: {26 }27});28var path = require('path');29mocha.addFile(path.join(__dirname, 'test.js'));30mocha.run(function(failures){31 process.on('exit', function () {32 });33});34var Mocha = require('mocha');35var mocha = new Mocha({36 reporterOptions: {37 }38});39var path = require('path');40mocha.addFile(path.join(__dirname, 'test.js'));41mocha.run(function(failures){42 process.on('exit', function () {
Using AI Code Generation
1let producer = new createProducer();2let consumer = new createConsumer();3let producer = new createProducer();4let consumer = new createConsumer();5let producer = new createProducer();6let consumer = new createConsumer();7let producer = new createProducer();8let consumer = new createConsumer();9let producer = new createProducer();10let consumer = new createConsumer();11let producer = new createProducer();12let consumer = new createConsumer();13let producer = new createProducer();14let consumer = new createConsumer();15let producer = new createProducer();16let consumer = new createConsumer();17let producer = new createProducer();18let consumer = new createConsumer();19let producer = new createProducer();20let consumer = new createConsumer();21let producer = new createProducer();22let consumer = new createConsumer();23let producer = new createProducer();24let consumer = new createConsumer();
Using AI Code Generation
1var mocha = require('mocha');2var chai = require('chai');3var expect = chai.expect;4describe('Create Producer', function() {5 it('should create a producer', function() {6 var producer = mocha.createProducer();7 expect(producer).to.be.an('object');8 });9});
Using AI Code Generation
1const producer = new Mocha.createProducer();2const consumer = new Mocha.createConsumer();3const topic = new Mocha.createTopic();4const subscription = new Mocha.createSubscription();5const message = new Mocha.createMessage();6const batchMessage = new Mocha.createBatchMessage();7const queue = new Mocha.createQueue();8const transaction = new Mocha.createTransaction();9const transactionContext = new Mocha.createTransactionContext();10const transactionProducer = new Mocha.createTransactionProducer();11const transactionConsumer = new Mocha.createTransactionConsumer();12const transactionQueue = new Mocha.createTransactionQueue();13const transactionTopic = new Mocha.createTransactionTopic();14const transactionSubscription = new Mocha.createTransactionSubscription();15const transactionMessage = new Mocha.createTransactionMessage();16const transactionBatchMessage = new Mocha.createTransactionBatchMessage();17const transactionContext = new Mocha.createTransactionContext();
Using AI Code Generation
1const Mocha = require('./mocha');2const mocha = new Mocha();3mocha.createProducer();4class Mocha {5 createConsumer() {6 this.consumer = this.kafka.consumer({ groupId: this.groupId });7 this.consumer.connect();8 this.consumer.subscribe({ topic: this.topic, fromBeginning: true });9 this.consumer.run({10 eachMessage: async ({ topic, partition, message }) => {11 console.log({12 value: message.value.toString(),13 });14 },15 });16 }17}18const Mocha = require('./mocha');19const mocha = new Mocha();20mocha.createConsumer();21class Mocha {22 createProducerAndConsumer() {23 this.createProducer();24 this.createConsumer();25 }26}27const Mocha = require('./mocha');28const mocha = new Mocha();29mocha.createProducerAndConsumer();30class Mocha {31 async sendMessage() {32 await this.producer.send({33 { value: 'Hello KafkaJS user!' },34 { value: 'This is KafkaJS' },35 });36 }37}38const Mocha = require('./mocha');39const mocha = new Mocha();40mocha.createProducer();41mocha.sendMessage();42class Mocha {43 createConsumer() {44 this.consumer = this.kafka.consumer({ groupId: this.groupId });45 this.consumer.connect();46 this.consumer.subscribe({ topic: this.topic, fromBeginning: true });47 this.consumer.run({48 eachMessage: async ({ topic, partition, message }) => {49 console.log({
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!