Best Venom code snippet using amqp.New
amqp_test.go
Source:amqp_test.go
1// Licensed to Elasticsearch B.V. under one or more contributor2// license agreements. See the NOTICE file distributed with3// this work for additional information regarding copyright4// ownership. Elasticsearch B.V. licenses this file to you under5// the Apache License, Version 2.0 (the "License"); you may6// not use this file except in compliance with the License.7// You may obtain a copy of the License at8//9// http://www.apache.org/licenses/LICENSE-2.010//11// Unless required by applicable law or agreed to in writing,12// software distributed under the License is distributed on an13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY14// KIND, either express or implied. See the License for the15// specific language governing permissions and limitations16// under the License.17package amqp18import (19 "encoding/hex"20 "net"21 "testing"22 "github.com/stretchr/testify/assert"23 "github.com/snappyflow/beats/v7/libbeat/beat"24 "github.com/snappyflow/beats/v7/libbeat/common"25 "github.com/snappyflow/beats/v7/libbeat/logp"26 "github.com/snappyflow/beats/v7/packetbeat/protos"27 "github.com/snappyflow/beats/v7/packetbeat/publish"28)29type eventStore struct {30 events []beat.Event31}32func (e *eventStore) publish(event beat.Event) {33 publish.MarshalPacketbeatFields(&event, nil)34 e.events = append(e.events, event)35}36func amqpModForTests() (*eventStore, *amqpPlugin) {37 var amqp amqpPlugin38 results := &eventStore{}39 config := defaultConfig40 amqp.init(results.publish, &config)41 return results, &amqp42}43func testTCPTuple() *common.TCPTuple {44 t := &common.TCPTuple{45 IPLength: 4,46 BaseTuple: common.BaseTuple{47 SrcIP: net.IPv4(192, 168, 0, 1), DstIP: net.IPv4(192, 168, 0, 2),48 SrcPort: 6512, DstPort: 3306,49 },50 }51 t.ComputeHashables()52 return t53}54func expectTransaction(t *testing.T, e *eventStore) common.MapStr {55 if len(e.events) == 0 {56 t.Error("No transaction")57 return nil58 }59 event := e.events[0]60 e.events = e.events[1:]61 return event.Fields62}63func TestAmqp_UnknownMethod(t *testing.T) {64 logp.TestingSetup(logp.WithSelectors("amqp", "amqpdetailed"))65 _, amqp := amqpModForTests()66 data, err := hex.DecodeString("0100010000000f006e000c0000075465737447657401ce")67 assert.NoError(t, err)68 stream := &amqpStream{data: data, message: new(amqpMessage)}69 ok, complete := amqp.amqpMessageParser(stream)70 if ok {71 t.Errorf("Parsing should return error")72 }73 if complete {74 t.Errorf("Message should not be complete")75 }76}77func TestAmqp_FrameSize(t *testing.T) {78 logp.TestingSetup(logp.WithSelectors("amqp", "amqpdetailed"))79 _, amqp := amqpModForTests()80 //incomplete frame81 data, err := hex.DecodeString("0100000000000c000a001fffff000200")82 assert.NoError(t, err)83 stream := &amqpStream{data: data, message: new(amqpMessage)}84 ok, complete := amqp.amqpMessageParser(stream)85 if !ok {86 t.Errorf("Parsing should not raise an error")87 }88 if complete {89 t.Errorf("message should not be complete")90 }91}92// Test that the parser doesn't panic on a partial message that includes93// a client header94func TestAmqp_PartialFrameSize(t *testing.T) {95 logp.TestingSetup(logp.WithSelectors("amqp", "amqpdetailed"))96 _, amqp := amqpModForTests()97 //incomplete frame98 data, err := hex.DecodeString("414d515000060606010000000000")99 assert.NoError(t, err)100 stream := &amqpStream{data: data, message: new(amqpMessage)}101 ok, complete := amqp.amqpMessageParser(stream)102 if !ok {103 t.Errorf("Parsing should not raise an error")104 }105 if complete {106 t.Errorf("message should not be complete")107 }108}109func TestAmqp_WrongShortStringSize(t *testing.T) {110 logp.TestingSetup(logp.WithSelectors("amqp", "amqpdetailed"))111 _, amqp := amqpModForTests()112 data, err := hex.DecodeString("02000100000019003c000000000000000000058000ac" +113 "746578742f706c61696ece")114 assert.NoError(t, err)115 stream := &amqpStream{data: data, message: new(amqpMessage)}116 ok, _ := amqp.amqpMessageParser(stream)117 if ok {118 t.Errorf("Parsing failed to detect error")119 }120}121func TestAmqp_QueueDeclaration(t *testing.T) {122 logp.TestingSetup(logp.WithSelectors("amqp", "amqpdetailed"))123 _, amqp := amqpModForTests()124 data, err := hex.DecodeString("0100010000001a0032000a00000e5468697320697" +125 "3206120544553541800000000ce")126 assert.NoError(t, err)127 stream := &amqpStream{data: data, message: new(amqpMessage)}128 m := stream.message129 ok, complete := amqp.amqpMessageParser(stream)130 if !ok {131 t.Errorf("Parsing returned error")132 }133 if !complete {134 t.Errorf("Message should be complete")135 }136 assert.Equal(t, "This is a TEST", m.fields["queue"])137 assert.Equal(t, false, m.fields["passive"])138 assert.Equal(t, false, m.fields["durable"])139 assert.Equal(t, false, m.fields["exclusive"])140 assert.Equal(t, true, m.fields["auto-delete"])141 assert.Equal(t, true, m.fields["no-wait"])142 _, exists := m.fields["arguments"].(common.MapStr)143 if exists {144 t.Errorf("Arguments field should not be present")145 }146}147func TestAmqp_ExchangeDeclaration(t *testing.T) {148 logp.TestingSetup(logp.WithSelectors("amqp", "amqpdetailed"))149 _, amqp := amqpModForTests()150 data, err := hex.DecodeString("0100010000001c0028000a00000a6c6f67735f746f7" +151 "0696305746f7069630200000000ce")152 assert.NoError(t, err)153 stream := &amqpStream{data: data, message: new(amqpMessage)}154 m := stream.message155 ok, complete := amqp.amqpMessageParser(stream)156 if !ok {157 t.Errorf("Parsing returned error")158 }159 if !complete {160 t.Errorf("Message should be complete")161 }162 assert.Equal(t, "exchange.declare", m.method)163 assert.Equal(t, "logs_topic", m.fields["exchange"])164 assert.Equal(t, "logs_topic", m.request)165 assert.Equal(t, true, m.fields["durable"])166 assert.Equal(t, false, m.fields["passive"])167 assert.Equal(t, false, m.fields["no-wait"])168 assert.Equal(t, "topic", m.fields["exchange-type"])169 _, exists := m.fields["arguments"].(common.MapStr)170 if exists {171 t.Errorf("Arguments field should not be present")172 }173}174func TestAmqp_BasicConsume(t *testing.T) {175 logp.TestingSetup(logp.WithSelectors("amqp", "amqpdetailed"))176 _, amqp := amqpModForTests()177 data, err := hex.DecodeString("01000100000028003c001400000e4957616e74" +178 "546f436f6e73756d650d6d6973746572436f6e73756d650300000000ce")179 assert.NoError(t, err)180 stream := &amqpStream{data: data, message: new(amqpMessage)}181 m := stream.message182 ok, complete := amqp.amqpMessageParser(stream)183 if !ok {184 t.Errorf("Parsing returned error")185 }186 if !complete {187 t.Errorf("Message should be complete")188 }189 assert.Equal(t, "basic.consume", m.method)190 assert.Equal(t, "IWantToConsume", m.fields["queue"])191 assert.Equal(t, "misterConsume", m.fields["consumer-tag"])192 assert.Equal(t, true, m.fields["no-ack"])193 assert.Equal(t, false, m.fields["exclusive"])194 assert.Equal(t, true, m.fields["no-local"])195 assert.Equal(t, false, m.fields["no-wait"])196 _, exists := m.fields["arguments"].(common.MapStr)197 if exists {198 t.Errorf("Arguments field should not be present")199 }200}201func TestAmqp_ExchangeDeletion(t *testing.T) {202 logp.TestingSetup(logp.WithSelectors("amqp", "amqpdetailed"))203 _, amqp := amqpModForTests()204 data, err := hex.DecodeString("010001000000100028001400000844656c65746" +205 "54d6501ce")206 assert.NoError(t, err)207 stream := &amqpStream{data: data, message: new(amqpMessage)}208 m := stream.message209 ok, complete := amqp.amqpMessageParser(stream)210 if !ok {211 t.Errorf("Parsing returned error")212 }213 if !complete {214 t.Errorf("Message should be complete")215 }216 assert.Equal(t, "exchange.delete", m.method)217 assert.Equal(t, "DeleteMe", m.fields["exchange"])218 assert.Equal(t, "DeleteMe", m.request)219 assert.Equal(t, true, m.fields["if-unused"])220 assert.Equal(t, false, m.fields["no-wait"])221}222//this method is exclusive to RabbitMQ223func TestAmqp_ExchangeBind(t *testing.T) {224 logp.TestingSetup(logp.WithSelectors("amqp", "amqpdetailed"))225 _, amqp := amqpModForTests()226 data, err := hex.DecodeString("0100010000001c0028001e0000057465737431" +227 "057465737432044d5346540000000000ce")228 assert.NoError(t, err)229 stream := &amqpStream{data: data, message: new(amqpMessage)}230 m := stream.message231 ok, complete := amqp.amqpMessageParser(stream)232 if !ok {233 t.Errorf("Parsing returned error")234 }235 if !complete {236 t.Errorf("Message should be complete")237 }238 assert.Equal(t, "exchange.bind", m.method)239 assert.Equal(t, "test1", m.fields["destination"])240 assert.Equal(t, "test2", m.fields["source"])241 assert.Equal(t, "MSFT", m.fields["routing-key"])242 assert.Equal(t, "test2 test1", m.request)243 assert.Equal(t, false, m.fields["no-wait"])244 _, exists := m.fields["arguments"].(common.MapStr)245 if exists {246 t.Errorf("Arguments field should not be present")247 }248}249//this method is exclusive to RabbitMQ250func TestAmqp_ExchangeUnbindTransaction(t *testing.T) {251 logp.TestingSetup(logp.WithSelectors("amqp", "amqpdetailed"))252 results, amqp := amqpModForTests()253 amqp.sendRequest = true254 data, err := hex.DecodeString("0100010000001c00280028000005746573743105" +255 "7465737432044d5346540000000000ce")256 assert.NoError(t, err)257 data2, err := hex.DecodeString("0100010000000400280033ce")258 assert.NoError(t, err)259 tcptuple := testTCPTuple()260 req := protos.Packet{Payload: data}261 private := protos.ProtocolData(new(amqpPrivateData))262 private = amqp.Parse(&req, tcptuple, 0, private)263 req = protos.Packet{Payload: data2}264 amqp.Parse(&req, tcptuple, 1, private)265 trans := expectTransaction(t, results)266 assert.Equal(t, "exchange.unbind", trans["method"])267 assert.Equal(t, "exchange.unbind test2 test1", trans["request"])268 assert.Equal(t, "amqp", trans["type"])269 assert.Equal(t, common.OK_STATUS, trans["status"])270 fields, ok := trans["amqp"].(common.MapStr)271 if !ok {272 t.Errorf("Field should be present")273 }274 assert.Equal(t, "test1", fields["destination"])275 assert.Equal(t, "test2", fields["source"])276 assert.Equal(t, "MSFT", fields["routing-key"])277 assert.Equal(t, false, fields["no-wait"])278}279func TestAmqp_PublishMessage(t *testing.T) {280 logp.TestingSetup(logp.WithSelectors("amqp", "amqpdetailed"))281 results, amqp := amqpModForTests()282 amqp.sendRequest = true283 data, err := hex.DecodeString("0100010000001b003c002800000a6c6f67735f746f70" +284 "696308414d51507465737400ce")285 assert.NoError(t, err)286 data2, err := hex.DecodeString("02000100000019003c0000000000000000001c800" +287 "00a746578742f706c61696ece")288 assert.NoError(t, err)289 data3, err := hex.DecodeString("0300010000001c48656c6c6f204461726c696e67" +290 "2049276d20686f6d6520616761696ece")291 assert.NoError(t, err)292 tcptuple := testTCPTuple()293 req := protos.Packet{Payload: data}294 private := protos.ProtocolData(new(amqpPrivateData))295 //method frame296 private = amqp.Parse(&req, tcptuple, 0, private)297 req = protos.Packet{Payload: data2}298 //header frame299 private = amqp.Parse(&req, tcptuple, 0, private)300 req = protos.Packet{Payload: data3}301 //body frame302 amqp.Parse(&req, tcptuple, 0, private)303 trans := expectTransaction(t, results)304 body := "Hello Darling I'm home again"305 assert.Equal(t, "basic.publish", trans["method"])306 assert.Equal(t, "amqp", trans["type"])307 assert.Equal(t, body, trans["request"])308 assert.Equal(t, common.OK_STATUS, trans["status"])309 fields, ok := trans["amqp"].(common.MapStr)310 if !ok {311 t.Errorf("Field should be present")312 }313 assert.Equal(t, "text/plain", fields["content-type"])314 assert.Equal(t, "logs_topic", fields["exchange"])315 assert.Equal(t, "AMQPtest", fields["routing-key"])316 assert.Equal(t, false, fields["immediate"])317 assert.Equal(t, false, fields["mandatory"])318}319func TestAmqp_DeliverMessage(t *testing.T) {320 logp.TestingSetup(logp.WithSelectors("amqp", "amqpdetailed"))321 results, amqp := amqpModForTests()322 amqp.sendResponse = true323 data, err := hex.DecodeString("01000100000034003c003c0d6d6973746572436f6e73" +324 "756d650000000000000002000c7465737445786368616e67650b7465737444656c697" +325 "66572ce")326 assert.NoError(t, err)327 data2, err := hex.DecodeString("02000100000019003c000000000000000000058" +328 "0000a746578742f706c61696ece")329 assert.NoError(t, err)330 data3, err := hex.DecodeString("030001000000056b696b6f6fce")331 assert.NoError(t, err)332 tcptuple := testTCPTuple()333 req := protos.Packet{Payload: data}334 private := protos.ProtocolData(new(amqpPrivateData))335 //method frame336 private = amqp.Parse(&req, tcptuple, 0, private)337 req = protos.Packet{Payload: data2}338 //header frame339 private = amqp.Parse(&req, tcptuple, 0, private)340 req = protos.Packet{Payload: data3}341 //body frame342 amqp.Parse(&req, tcptuple, 0, private)343 trans := expectTransaction(t, results)344 assert.Equal(t, "basic.deliver", trans["method"])345 assert.Equal(t, "amqp", trans["type"])346 assert.Equal(t, "kikoo", trans["response"])347 assert.Equal(t, common.OK_STATUS, trans["status"])348 fields, ok := trans["amqp"].(common.MapStr)349 if !ok {350 t.Errorf("Field should be present")351 }352 assert.Equal(t, "misterConsume", fields["consumer-tag"])353 assert.Equal(t, "text/plain", fields["content-type"])354 assert.Equal(t, "testDeliver", fields["routing-key"])355 assert.Equal(t, false, fields["redelivered"])356}357func TestAmqp_MessagePropertiesFields(t *testing.T) {358 logp.TestingSetup(logp.WithSelectors("amqp", "amqpdetailed"))359 _, amqp := amqpModForTests()360 amqp.sendResponse = true361 data, err := hex.DecodeString("01000100000013003c00280000000a546573744865" +362 "6164657200ce02000100000061003c0000000000000000001ab8e00a746578742f706c" +363 "61696e0000002203796f70530000000468696869036e696c56066e756d626572644044" +364 "40000000000002060a656c206d656e73616a650000000055f81dc00c6c6f7665206d65" +365 "7373616765ce0300010000001a5465737420686561646572206669656c647320666f72" +366 "65766572ce")367 assert.NoError(t, err)368 stream := &amqpStream{data: data, message: new(amqpMessage)}369 ok, complete := amqp.amqpMessageParser(stream)370 m := stream.message371 if !ok {372 t.Errorf("Parsing returned error")373 }374 if !complete {375 t.Errorf("Message should be complete")376 }377 assert.Equal(t, "basic.publish", m.method)378 assert.Equal(t, "persistent", m.fields["delivery-mode"])379 assert.Equal(t, "el mensaje", m.fields["message-id"])380 assert.Equal(t, "love message", m.fields["type"])381 assert.Equal(t, "text/plain", m.fields["content-type"])382 //assert.Equal(t, "September 15 15:31:44 2015", m.Fields["timestamp"])383 priority, ok := m.fields["priority"].(uint8)384 if !ok {385 t.Errorf("Field should be present")386 } else if ok && priority != 6 {387 t.Errorf("Wrong argument")388 }389 headers, ok := m.fields["headers"].(common.MapStr)390 if !ok {391 t.Errorf("Headers should be present")392 }393 assert.Equal(t, "hihi", headers["yop"])394 assert.Equal(t, nil, headers["nil"])395 assert.Equal(t, 40.5, headers["number"])396}397func TestAmqp_ChannelError(t *testing.T) {398 logp.TestingSetup(logp.WithSelectors("amqp", "amqpdetailed"))399 _, amqp := amqpModForTests()400 data1, err := hex.DecodeString("0100010000009000140028019685505245434f4e444" +401 "954494f4e5f4641494c4544202d20696e6571756976616c656e74206172672027617574" +402 "6f5f64656c6574652720666f722065786368616e676520277465737445786368616e676" +403 "52720696e2076686f737420272f273a207265636569766564202774727565272062757" +404 "42063757272656e74206973202766616c7365270028000ace")405 assert.NoError(t, err)406 stream := &amqpStream{data: data1, message: new(amqpMessage)}407 ok, complete := amqp.amqpMessageParser(stream)408 m := stream.message409 if !ok {410 t.Errorf("Parsing returned error")411 }412 if !complete {413 t.Errorf("Message should be complete")414 }415 assert.Equal(t, "channel.close", m.method)416 class, ok := m.fields["class-id"].(uint16)417 if !ok {418 t.Errorf("Field should be present")419 } else if ok && class != 40 {420 t.Errorf("Wrong argument")421 }422 method, ok := m.fields["method-id"].(uint16)423 if !ok {424 t.Errorf("Field should be present")425 } else if ok && method != 10 {426 t.Errorf("Wrong argument")427 }428 code, ok := m.fields["reply-code"].(uint16)429 if !ok {430 t.Errorf("Field should be present")431 } else if ok && code != 406 {432 t.Errorf("Wrong argument")433 }434 text := "PRECONDITION_FAILED - inequivalent arg 'auto_delete' for" +435 " exchange 'testExchange' in vhost '/': received 'true' but current is " +436 "'false'"437 assert.Equal(t, text, m.fields["reply-text"])438}439func TestAmqp_NoWaitQueueDeleteMethod(t *testing.T) {440 logp.TestingSetup(logp.WithSelectors("amqp", "amqpdetailed"))441 results, amqp := amqpModForTests()442 amqp.sendRequest = true443 data, err := hex.DecodeString("010001000000120032002800000a546573745468" +444 "6f6d617304ce")445 assert.NoError(t, err)446 tcptuple := testTCPTuple()447 req := protos.Packet{Payload: data}448 private := protos.ProtocolData(new(amqpPrivateData))449 amqp.Parse(&req, tcptuple, 0, private)450 trans := expectTransaction(t, results)451 assert.Equal(t, "queue.delete", trans["method"])452 assert.Equal(t, "queue.delete TestThomas", trans["request"])453 assert.Equal(t, "amqp", trans["type"])454 fields, ok := trans["amqp"].(common.MapStr)455 if !ok {456 t.Errorf("Field should be present")457 }458 assert.Equal(t, true, fields["no-wait"])459 assert.Equal(t, false, fields["if-empty"])460 assert.Equal(t, false, fields["if-unused"])461 assert.Equal(t, "TestThomas", fields["queue"])462}463func TestAmqp_RejectMessage(t *testing.T) {464 logp.TestingSetup(logp.WithSelectors("amqp", "amqpdetailed"))465 results, amqp := amqpModForTests()466 amqp.sendRequest = true467 data, err := hex.DecodeString("0100010000000d003c005a000000000000000101ce")468 assert.NoError(t, err)469 tcptuple := testTCPTuple()470 req := protos.Packet{Payload: data}471 private := protos.ProtocolData(new(amqpPrivateData))472 //method frame473 amqp.Parse(&req, tcptuple, 0, private)474 trans := expectTransaction(t, results)475 assert.Equal(t, "basic.reject", trans["method"])476 assert.Equal(t, "basic.reject 1", trans["request"])477 assert.Equal(t, "amqp", trans["type"])478 assert.Equal(t, common.ERROR_STATUS, trans["status"])479 fields, ok := trans["amqp"].(common.MapStr)480 if !ok {481 t.Errorf("Field should be present")482 }483 assert.Equal(t, true, fields["multiple"])484}485func TestAmqp_GetEmptyMethod(t *testing.T) {486 logp.TestingSetup(logp.WithSelectors("amqp", "amqpdetailed"))487 results, amqp := amqpModForTests()488 amqp.sendRequest = true489 data, err := hex.DecodeString("01000100000013003c004600000b526f626269" +490 "654b65616e6501ce")491 assert.NoError(t, err)492 data2, err := hex.DecodeString("01000100000005003c004800ce")493 assert.NoError(t, err)494 tcptuple := testTCPTuple()495 req := protos.Packet{Payload: data}496 private := protos.ProtocolData(new(amqpPrivateData))497 private = amqp.Parse(&req, tcptuple, 0, private)498 req = protos.Packet{Payload: data2}499 amqp.Parse(&req, tcptuple, 1, private)500 trans := expectTransaction(t, results)501 assert.Equal(t, "basic.get-empty", trans["method"])502 assert.Equal(t, "basic.get RobbieKeane", trans["request"])503 assert.Equal(t, "amqp", trans["type"])504 assert.Equal(t, common.OK_STATUS, trans["status"])505}506func TestAmqp_GetMethod(t *testing.T) {507 logp.TestingSetup(logp.WithSelectors("amqp", "amqpdetailed"))508 results, amqp := amqpModForTests()509 amqp.sendRequest = true510 amqp.sendResponse = true511 data, err := hex.DecodeString("0100010000000f003c0046000007546573744" +512 "7657401ce")513 assert.NoError(t, err)514 data2, err := hex.DecodeString("0100010000001a003c00470000000000000001" +515 "0000075465737447657400000001ce02000100000019003c000000000000000000" +516 "1280000a746578742f706c61696ece03000100000012476574206d6520696620796" +517 "f752064617265ce")518 assert.NoError(t, err)519 tcptuple := testTCPTuple()520 req := protos.Packet{Payload: data}521 private := protos.ProtocolData(new(amqpPrivateData))522 private = amqp.Parse(&req, tcptuple, 0, private)523 req = protos.Packet{Payload: data2}524 amqp.Parse(&req, tcptuple, 1, private)525 trans := expectTransaction(t, results)526 assert.Equal(t, "basic.get", trans["method"])527 assert.Equal(t, "basic.get TestGet", trans["request"])528 assert.Equal(t, "amqp", trans["type"])529 assert.Equal(t, common.OK_STATUS, trans["status"])530 assert.Equal(t, "Get me if you dare", trans["response"])531}532func TestAmqp_MaxBodyLength(t *testing.T) {533 logp.TestingSetup(logp.WithSelectors("amqp", "amqpdetailed"))534 results, amqp := amqpModForTests()535 amqp.maxBodyLength = 10536 amqp.sendRequest = true537 data, err := hex.DecodeString("01000100000010003c002800000007546573744d617" +538 "800ce02000100000019003c0000000000000000001680000a746578742f706c61696ece" +539 "0300010000001649276d2061207665727920626967206d657373616765ce")540 assert.NoError(t, err)541 tcptuple := testTCPTuple()542 req := protos.Packet{Payload: data}543 private := protos.ProtocolData(new(amqpPrivateData))544 //method frame545 private = amqp.Parse(&req, tcptuple, 0, private)546 trans := expectTransaction(t, results)547 assert.Equal(t, "basic.publish", trans["method"])548 assert.Equal(t, "amqp", trans["type"])549 assert.Equal(t, "I'm a very [...]", trans["request"])550 assert.Equal(t, common.OK_STATUS, trans["status"])551 fields, ok := trans["amqp"].(common.MapStr)552 if !ok {553 t.Errorf("Field should be present")554 }555 assert.Equal(t, "text/plain", fields["content-type"])556 assert.Equal(t, "TestMax", fields["routing-key"])557 assert.Equal(t, false, fields["immediate"])558 assert.Equal(t, false, fields["mandatory"])559 _, exists := fields["exchange"]560 assert.False(t, exists)561 data, err = hex.DecodeString("01000100000010003c002800000007546573744d6" +562 "17800ce02000100000018003c0000000000000000003a800009696d6167652f676966" +563 "ce0300010000003a41414141414141414141414141414141414141414141414141414141" +564 "414141414141414141414141414141414141414141414141414141414141ce")565 assert.NoError(t, err)566 tcptuple = testTCPTuple()567 req = protos.Packet{Payload: data}568 private = protos.ProtocolData(new(amqpPrivateData))569 //method frame570 amqp.Parse(&req, tcptuple, 0, private)571 trans = expectTransaction(t, results)572 assert.Equal(t, "basic.publish", trans["method"])573 assert.Equal(t, "amqp", trans["type"])574 assert.Equal(t, "65 65 65 65 65 65 65 65 65 65 [...]", trans["request"])575 assert.Equal(t, common.OK_STATUS, trans["status"])576 fields, ok = trans["amqp"].(common.MapStr)577 if !ok {578 t.Errorf("Field should be present")579 }580 assert.Equal(t, "image/gif", fields["content-type"])581 assert.Equal(t, "TestMax", fields["routing-key"])582 assert.Equal(t, false, fields["immediate"])583 assert.Equal(t, false, fields["mandatory"])584 _, exists = fields["exchange"]585 assert.False(t, exists)586}587func TestAmqp_HideArguments(t *testing.T) {588 logp.TestingSetup(logp.WithSelectors("amqp", "amqpdetailed"))589 results, amqp := amqpModForTests()590 amqp.sendRequest = true591 amqp.parseHeaders = false592 amqp.parseArguments = false593 //parse args594 data, err := hex.DecodeString("0100010000004d0032000a00000a5465737448656164" +595 "6572180000003704626f6f6c74010362697462050568656c6c6f530000001f4869206461" +596 "726c696e6720c3aac3aac3aac3aac3aac3aac3aae697a5e69cacce")597 assert.NoError(t, err)598 tcptuple := testTCPTuple()599 req := protos.Packet{Payload: data}600 private := protos.ProtocolData(new(amqpPrivateData))601 private = amqp.Parse(&req, tcptuple, 0, private)602 trans := expectTransaction(t, results)603 assert.Equal(t, "queue.declare", trans["method"])604 assert.Equal(t, "amqp", trans["type"])605 assert.Equal(t, "queue.declare TestHeader", trans["request"])606 fields, ok := trans["amqp"].(common.MapStr)607 if !ok {608 t.Errorf("Field should be present")609 }610 assert.Equal(t, false, fields["durable"])611 assert.Equal(t, true, fields["auto-delete"])612 _, exists := fields["arguments"].(common.MapStr)613 if exists {614 t.Errorf("Arguments field should not be present")615 }616 //parse headers617 data, err = hex.DecodeString("01000100000013003c00280000000a546573744865616" +618 "4657200ce02000100000026003c0000000000000000001a98800a746578742f706c61696" +619 "e02060a656c206d656e73616a65ce0300010000001a54657374206865616465722066696" +620 "56c647320666f7265766572ce")621 assert.NoError(t, err)622 tcptuple = testTCPTuple()623 req = protos.Packet{Payload: data}624 private = protos.ProtocolData(new(amqpPrivateData))625 amqp.Parse(&req, tcptuple, 0, private)626 trans = expectTransaction(t, results)627 assert.Equal(t, "basic.publish", trans["method"])628 assert.Equal(t, "amqp", trans["type"])629 fields, ok = trans["amqp"].(common.MapStr)630 if !ok {631 t.Errorf("Field should be present")632 }633 assert.Equal(t, "TestHeader", fields["routing-key"])634 _, exists = fields["exchange"]635 assert.False(t, exists)636 assert.Equal(t, false, fields["mandatory"])637 assert.Equal(t, false, fields["immediate"])638 assert.Equal(t, nil, fields["message-id"])639 assert.Equal(t, nil, fields["content-type"])640 assert.Equal(t, nil, fields["delivery-mode"])641 assert.Equal(t, nil, fields["priority"])642}643func TestAmqp_RecoverMethod(t *testing.T) {644 logp.TestingSetup(logp.WithSelectors("amqp", "amqpdetailed"))645 results, amqp := amqpModForTests()646 amqp.sendRequest = true647 data, err := hex.DecodeString("01000100000005003c006e01ce")648 assert.NoError(t, err)649 data2, err := hex.DecodeString("01000100000004003c006fce")650 assert.NoError(t, err)651 tcptuple := testTCPTuple()652 req := protos.Packet{Payload: data}653 private := protos.ProtocolData(new(amqpPrivateData))654 private = amqp.Parse(&req, tcptuple, 0, private)655 req = protos.Packet{Payload: data2}656 amqp.Parse(&req, tcptuple, 1, private)657 trans := expectTransaction(t, results)658 assert.Equal(t, "basic.recover", trans["method"])659 assert.Equal(t, "basic.recover", trans["request"])660 assert.Equal(t, "amqp", trans["type"])661 assert.Equal(t, common.OK_STATUS, trans["status"])662 assert.Equal(t, common.MapStr{"requeue": true}, trans["amqp"])663}664//this is a specific rabbitMQ method665func TestAmqp_BasicNack(t *testing.T) {666 logp.TestingSetup(logp.WithSelectors("amqp", "amqpdetailed"))667 _, amqp := amqpModForTests()668 data1, err := hex.DecodeString("0100010000000d003c0078000000000000000102ce")669 assert.NoError(t, err)670 stream := &amqpStream{data: data1, message: new(amqpMessage)}671 ok, complete := amqp.amqpMessageParser(stream)672 m := stream.message673 if !ok {674 t.Errorf("Parsing returned error")675 }676 if !complete {677 t.Errorf("Message should be complete")678 }679 assert.Equal(t, "basic.nack", m.method)680 assert.Equal(t, false, m.fields["multiple"])681 assert.Equal(t, true, m.fields["requeue"])682}683func TestAmqp_GetTable(t *testing.T) {684 logp.TestingSetup(logp.WithSelectors("amqp", "amqpdetailed"))685 _, amqp := amqpModForTests()686 data, err := hex.DecodeString("010001000000890032000a00000a5465737448656164" +687 "657218000000730974696d657374616d70540000000055f7e40903626974620507646563" +688 "696d616c440500ec49050568656c6c6f530000001f4869206461726c696e6720c3aac3aa" +689 "c3aac3aac3aac3aac3aae697a5e69cac06646f75626c656440453e100cbd7da405666c6f" +690 "6174664124cccd04626f6f6c7401ce")691 assert.NoError(t, err)692 stream := &amqpStream{data: data, message: new(amqpMessage)}693 ok, complete := amqp.amqpMessageParser(stream)694 m := stream.message695 if !ok {696 t.Errorf("Parsing returned error")697 }698 if !complete {699 t.Errorf("Message should be complete")700 }701 args, ok := m.fields["arguments"].(common.MapStr)702 if !ok {703 t.Errorf("Field should be present")704 }705 double, ok := args["double"].(float64)706 if !ok {707 t.Errorf("Field should be present")708 } else if ok && double != 42.4848648 {709 t.Errorf("Wrong argument")710 }711 float, ok := args["float"].(float32)712 if !ok {713 t.Errorf("Field should be present")714 } else if ok && float != 10.3 {715 t.Errorf("Wrong argument")716 }717 argByte, ok := args["bit"].(int8)718 if !ok {719 t.Errorf("Field should be present")720 } else if ok && argByte != 5 {721 t.Errorf("Wrong argument")722 }723 assert.Equal(t, "Hi darling êêêêêêêæ¥æ¬", args["hello"])724 assert.Equal(t, true, args["bool"])725 assert.Equal(t, "154.85189", args["decimal"])726 assert.Equal(t, "queue.declare", m.method)727 assert.Equal(t, false, m.fields["durable"])728 assert.Equal(t, true, m.fields["no-wait"])729 assert.Equal(t, true, m.fields["auto-delete"])730 assert.Equal(t, false, m.fields["exclusive"])731 //assert.Equal(t, "September 15 11:25:29 2015", args["timestamp"])732 assert.Equal(t, "TestHeader", m.request)733}734func TestAmqp_TableInception(t *testing.T) {735 logp.TestingSetup(logp.WithSelectors("amqp", "amqpdetailed"))736 _, amqp := amqpModForTests()737 data, err := hex.DecodeString("010001000000860028000a000005746573743105" +738 "746f706963020000006f09696e63657074696f6e460000005006696e636570315300" +739 "000006445245414d5306696e6365703253000000064d4152494f4e056c696d626f46" +740 "00000021066c696d626f315300000004436f6262066c696d626f3253000000055361" +741 "69746f06626967496e746c00071afd498d0000ce")742 assert.NoError(t, err)743 stream := &amqpStream{data: data, message: new(amqpMessage)}744 ok, complete := amqp.amqpMessageParser(stream)745 m := stream.message746 if !ok {747 t.Errorf("Parsing returned error")748 }749 if !complete {750 t.Errorf("Message should be complete")751 }752 assert.Equal(t, "exchange.declare", m.method)753 assert.Equal(t, "test1", m.fields["exchange"])754 args, ok := m.fields["arguments"].(common.MapStr)755 if !ok {756 t.Errorf("Field should be present")757 }758 assert.Nil(t, m.notes)759 bigInt, ok := args["bigInt"].(uint64)760 if !ok {761 t.Errorf("Field should be present")762 } else if ok && bigInt != 2000000000000000 {763 t.Errorf("Wrong argument")764 }765 inception, ok := args["inception"].(common.MapStr)766 if !ok {767 t.Errorf("Field should be present")768 }769 assert.Equal(t, "DREAMS", inception["incep1"])770 assert.Equal(t, "MARION", inception["incep2"])771 limbo, ok := inception["limbo"].(common.MapStr)772 if !ok {773 t.Errorf("Field should be present")774 }775 assert.Equal(t, "Cobb", limbo["limbo1"])776 assert.Equal(t, "Saito", limbo["limbo2"])777}778func TestAmqp_ArrayFields(t *testing.T) {779 logp.TestingSetup(logp.WithSelectors("amqp", "amqpdetailed"))780 _, amqp := amqpModForTests()781 //byte array, rabbitMQ specific field782 data, err := hex.DecodeString("010001000000260028000a0000057465737431057" +783 "46f706963020000000f05617272617978000000040a007dd2ce")784 assert.NoError(t, err)785 stream := &amqpStream{data: data, message: new(amqpMessage)}786 ok, complete := amqp.amqpMessageParser(stream)787 m := stream.message788 if !ok {789 t.Errorf("Parsing returned error")790 }791 if !complete {792 t.Errorf("Message should be complete")793 }794 args, ok := m.fields["arguments"].(common.MapStr)795 if !ok {796 t.Errorf("Field should be present")797 }798 assert.Nil(t, m.notes)799 assert.Equal(t, "[10, 0, 125, 210]", args["array"])800 data, err = hex.DecodeString("010001000000b60028000a000005746573743105746" +801 "f706963020000009f0474657374530000001061206c6f74206f6620617272617973210a" +802 "6172726179666c6f6174410000001b64404540000000000064403ccccccccccccd64404" +803 "0a66666666666096172726179626f6f6c410000000a740174007400740174010b617272" +804 "6179737472696e674100000030530000000441414141530000000442424242530000001" +805 "9d090d0bdd0bdd0b020d09ad0b0d180d0b5d0bdd0b8d0bdd0b0ce")806 assert.NoError(t, err)807 stream = &amqpStream{data: data, message: new(amqpMessage)}808 ok, complete = amqp.amqpMessageParser(stream)809 m = stream.message810 if !ok {811 t.Errorf("Parsing returned error")812 }813 if !complete {814 t.Errorf("Message should be complete")815 }816 args, ok = m.fields["arguments"].(common.MapStr)817 if !ok {818 t.Errorf("Field should be present")819 }820 assert.Equal(t, "a lot of arrays!", args["test"])821 arrayFloat, ok := args["arrayfloat"].(common.MapStr)822 if !ok {823 t.Errorf("Field should be present")824 }825 assert.Equal(t, 42.5, arrayFloat["0"])826 assert.Equal(t, 28.8, arrayFloat["1"])827 assert.Equal(t, 33.3, arrayFloat["2"])828 arrayBool, ok := args["arraybool"].(common.MapStr)829 if !ok {830 t.Errorf("Field should be present")831 }832 assert.Equal(t, true, arrayBool["0"])833 assert.Equal(t, false, arrayBool["1"])834 assert.Equal(t, false, arrayBool["2"])835 assert.Equal(t, true, arrayBool["3"])836 assert.Equal(t, true, arrayBool["4"])837 arrayString, ok := args["arraystring"].(common.MapStr)838 if !ok {839 t.Errorf("Field should be present")840 }841 assert.Equal(t, "AAAA", arrayString["0"])842 assert.Equal(t, "BBBB", arrayString["1"])843 assert.Equal(t, "Ðнна ÐаÑенина", arrayString["2"])844}845func TestAmqp_WrongTable(t *testing.T) {846 logp.TestingSetup(logp.WithSelectors("amqp", "amqpdetailed"))847 _, amqp := amqpModForTests()848 //declared table size too big849 data, err := hex.DecodeString("010001000000890032000a00000a54657374486561646" +850 "57218000000da0974696d657374616d70540000000055f7e409036269746205076465636" +851 "96d616c440500ec49050568656c6c6f530000001f4869206461726c696e6720c3aac3aac" +852 "3aac3aac3aac3aac3aae697a5e69cac06646f75626c656440453e100cbd7da405666c6f6" +853 "174664124cccd04626f6f6c7401ce")854 assert.NoError(t, err)855 stream := &amqpStream{data: data, message: new(amqpMessage)}856 ok, complete := amqp.amqpMessageParser(stream)857 m := stream.message858 if !ok {859 t.Errorf("Parsing returned error")860 }861 if !complete {862 t.Errorf("Message should be complete")863 }864 _, exists := m.fields["arguments"].(common.MapStr)865 if exists {866 t.Errorf("Field should not exist")867 }868 assert.Equal(t, []string{"Failed to parse additional arguments"}, m.notes)869 //table size ok, but total non-sense inside870 data, err = hex.DecodeString("010001000000890032000a00000a54657374486561646" +871 "57218000000730974696d657374616d7054004400005521e409036269743705076400036" +872 "96d616c447600ec49180568036c6c0b536400001f480a2064076e6c696e0520c3aac3aac" +873 "34613aac3aac3aa01aae697a5e69cac3c780b75626c6564a4453e100cbd7da4320a6c0b0" +874 "90b664124cc1904626f6f6c7401ce")875 assert.NoError(t, err)876 stream = &amqpStream{data: data, message: new(amqpMessage)}877 ok, complete = amqp.amqpMessageParser(stream)878 m = stream.message879 if !ok {880 t.Errorf("Parsing returned error")881 }882 if !complete {883 t.Errorf("Message should be complete")884 }885 _, exists = m.fields["arguments"].(common.MapStr)886 if exists {887 t.Errorf("Field should not exist")888 }889 assert.Equal(t, []string{"Failed to parse additional arguments"}, m.notes)890}891func TestAmqp_isError(t *testing.T) {892 trans := &amqpTransaction{893 method: "channel.close",894 amqp: common.MapStr{895 "reply-code": 200,896 },897 }898 assert.Equal(t, false, isError(trans))899 trans.amqp["reply-code"] = uint16(300)900 assert.Equal(t, true, isError(trans))901 trans.amqp["reply-code"] = uint16(403)902 assert.Equal(t, true, isError(trans))903 trans.method = "basic.reject"904 assert.Equal(t, true, isError(trans))905 trans.method = "basic.return"906 assert.Equal(t, true, isError(trans))907 trans.method = "basic.publish"908 assert.Equal(t, false, isError(trans))909}910func TestAmqp_ChannelCloseErrorMethod(t *testing.T) {911 logp.TestingSetup(logp.WithSelectors("amqp", "amqpdetailed"))912 results, amqp := amqpModForTests()913 data, err := hex.DecodeString("0100010000009000140028019685505245434f4e444" +914 "954494f4e5f4641494c4544202d20696e6571756976616c656e74206172672027617574" +915 "6f5f64656c6574652720666f722065786368616e676520277465737445786368616e676" +916 "52720696e2076686f737420272f273a207265636569766564202774727565272062757" +917 "42063757272656e74206973202766616c7365270028000ace")918 assert.NoError(t, err)919 data2, err := hex.DecodeString("0100010000000400280033ce")920 assert.NoError(t, err)921 tcptuple := testTCPTuple()922 req := protos.Packet{Payload: data}923 private := protos.ProtocolData(new(amqpPrivateData))924 private = amqp.Parse(&req, tcptuple, 0, private)925 req = protos.Packet{Payload: data2}926 amqp.Parse(&req, tcptuple, 1, private)927 trans := expectTransaction(t, results)928 assert.Equal(t, "channel.close", trans["method"])929 assert.Equal(t, "amqp", trans["type"])930 assert.Equal(t, common.ERROR_STATUS, trans["status"])931 assert.Nil(t, trans["notes"])932}933func TestAmqp_ConnectionCloseNoError(t *testing.T) {934 logp.TestingSetup(logp.WithSelectors("amqp", "amqpdetailed"))935 results, amqp := amqpModForTests()936 amqp.hideConnectionInformation = false937 data, err := hex.DecodeString("01000000000012000a003200c8076b74687862616900000000ce")938 assert.NoError(t, err)939 data2, err := hex.DecodeString("01000000000004000a0033ce")940 assert.NoError(t, err)941 tcptuple := testTCPTuple()942 req := protos.Packet{Payload: data}943 private := protos.ProtocolData(new(amqpPrivateData))944 private = amqp.Parse(&req, tcptuple, 0, private)945 req = protos.Packet{Payload: data2}946 amqp.Parse(&req, tcptuple, 1, private)947 trans := expectTransaction(t, results)948 assert.Equal(t, "connection.close", trans["method"])949 assert.Equal(t, "amqp", trans["type"])950 assert.Equal(t, common.OK_STATUS, trans["status"])951 assert.Nil(t, trans["notes"])952 fields, ok := trans["amqp"].(common.MapStr)953 assert.True(t, ok)954 code, ok := fields["reply-code"].(uint16)955 assert.True(t, ok)956 assert.Equal(t, uint16(200), code)957}958func TestAmqp_MultipleBodyFrames(t *testing.T) {959 logp.TestingSetup(logp.WithSelectors("amqp", "amqpdetailed"))960 results, amqp := amqpModForTests()961 amqp.sendRequest = true962 data, err := hex.DecodeString("0100010000000e003c00280000000568656c6c6f00ce" +963 "02000100000021003c0000000000000000002a80400a746578742f706c61696e00000000" +964 "56a22873ce030001000000202a2a2a68656c6c6f2049206c696b6520746f207075626c69" +965 "736820626967206dce")966 assert.NoError(t, err)967 data2, err := hex.DecodeString("0300010000000a657373616765732a2a2ace")968 assert.NoError(t, err)969 tcptuple := testTCPTuple()970 req := protos.Packet{Payload: data}971 private := protos.ProtocolData(new(amqpPrivateData))972 private = amqp.Parse(&req, tcptuple, 0, private)973 req = protos.Packet{Payload: data2}974 amqp.Parse(&req, tcptuple, 0, private)975 trans := expectTransaction(t, results)976 assert.Equal(t, "basic.publish", trans["method"])977 assert.Equal(t, "***hello I like to publish big messages***", trans["request"])978}...
rabbitmqPool.go
Source:rabbitmqPool.go
...251 //fmt.Println("msg: ",msg)252 puberrMsg,_ := msg.(string)253 //fmt.Println("ok: ",ok)254 //fmt.Println("puberrMsg : ",puberrMsg)255 puberr = errors.New(puberrMsg)256 return257 }258 }()259 ch,channelId := S.getChannel()260 cha := channel{}261 if ch == nil {262 cha = S.createChannel(0,S.connections[0])263 defer cha.ch.Close()264 ch = cha.ch265 //fmt.Println("ch: ",ch)266 }267 ch = S.declareExchange(ch,exchangeName,channelId)268 data := S.dataForm(notice)269 var tryTime = 1270 for {271 puberr = S.publish(channelId, ch, exchangeName, routeKey, data)272 if puberr !=nil {273 if tryTime <= retryCount {274 //log.Printf("%s: %s", "Failed to publish a message, try again.", puberr)275 tryTime ++276 continue277 }else{278 //log.Printf("%s: %s data: %s", "Failed to publish a message", puberr,data)279 S.backChannelId(channelId,ch)280 return notice,puberr281 }282 }283 select {284 case confirm := <-S.channels[channelId].notifyConfirm:285 //log.Printf(" [%s] Sent %d message %s", routeKey, confirm.DeliveryTag, data)286 fmt.Println("busy:",len(S.busyChannels))287 if confirm.Ack {288 S.backChannelId(channelId, ch)289 return notice, nil290 }291 return notice,errors.New("ack failed")292 case chaConfirm := <-cha.notifyConfirm:293 //log.Println("å ç车",data)294 if chaConfirm.Ack {295 return notice, nil296 }297 return notice,errors.New("ack failed")298 case <-time.After(waitConfirmTime):299 // log.Printf("message: %s data: %s", "Can not receive the confirm.", data)300 S.backChannelId(channelId, ch)301 confirmErr := errors.New("Can not receive the confirm . ")302 return notice, confirmErr303 }304 }305 return306}...
New
Using AI Code Generation
1import (2func main() {3 if err != nil {4 fmt.Println(err)5 }6 defer conn.Close()7}
New
Using AI Code Generation
1import (2func main() {3 if err != nil {4 fmt.Println(err)5 }6 defer conn.Close()7 ch, err := conn.Channel()8 if err != nil {9 fmt.Println(err)10 }11 defer ch.Close()12 q, err := ch.QueueDeclare(13 if err != nil {14 fmt.Println(err)15 }16 err = ch.Publish(17 amqp.Publishing{18 Body: []byte(body),19 })20 if err != nil {21 fmt.Println(err)22 }23}24import (25func main() {26 if err != nil {27 fmt.Println(err)28 }29 defer conn.Close()30 ch, err := conn.Channel()31 if err != nil {32 fmt.Println(err)33 }34 defer ch.Close()35 q, err := ch.QueueDeclare(36 if err != nil {37 fmt.Println(err)38 }39 msgs, err := ch.Consume(40 if err != nil {41 fmt.Println(err)42 }43 forever := make(chan bool)44 go func() {45 for d := range msgs {46 fmt.Printf("
New
Using AI Code Generation
1import (2func main() {3 if err != nil {4 panic(err)5 }6 defer conn.Close()7 ch, err := conn.Channel()8 if err != nil {9 panic(err)10 }11 defer ch.Close()12 q, err := ch.QueueDeclare(13 if err != nil {14 panic(err)15 }16 err = ch.Publish(17 amqp.Publishing{18 Body: []byte(body),19 })20 if err != nil {21 panic(err)22 }23 fmt.Printf(" [x] Sent %s24}25import (26func main() {27 if err != nil {28 panic(err)29 }30 defer conn.Close()31 ch, err := conn.Channel()32 if err != nil {33 panic(err)34 }35 defer ch.Close()36 q, err := ch.QueueDeclare(37 if err != nil {38 panic(err)39 }40 err = ch.Publish(41 amqp.Publishing{42 Body: []byte(body),43 })44 if err != nil {45 panic(err)46 }
New
Using AI Code Generation
1import (2func main() {3 failOnError(err, "Failed to connect to RabbitMQ")4 defer conn.Close()5 ch, err := conn.Channel()6 failOnError(err, "Failed to open a channel")7 defer ch.Close()8 q, err := ch.QueueDeclare(9 failOnError(err, "Failed to declare a queue")10 err = ch.Publish(11 amqp.Publishing{12 Body: []byte(body),13 })14 fmt.Printf(" [x] Sent %s15 failOnError(err, "Failed to publish a message")16}17import (18func main() {19 failOnError(err, "Failed to connect to RabbitMQ")20 defer conn.Close()21 ch, err := conn.Channel()22 failOnError(err, "Failed to open a channel")23 defer ch.Close()24 q, err := ch.QueueDeclare(25 failOnError(err, "Failed to declare a queue")26 msgs, err := ch.Consume(
New
Using AI Code Generation
1func main() {2 failOnError(err, "Failed to connect to RabbitMQ")3 defer conn.Close()4 ch, err := conn.Channel()5 failOnError(err, "Failed to open a channel")6 defer ch.Close()7 q, err := ch.QueueDeclare(8 failOnError(err, "Failed to declare a queue")9 err = ch.Publish(10 amqp.Publishing{11 Body: []byte(body),12 })13 failOnError(err, "Failed to publish a message")14}15func main() {16 failOnError(err, "Failed to connect to RabbitMQ")17 defer conn.Close()18 ch, err := conn.Channel()19 failOnError(err, "Failed to open a channel")20 defer ch.Close()21 q, err := ch.QueueDeclare(22 failOnError(err, "Failed to declare a queue")23 msgs, err := ch.Consume(
New
Using AI Code Generation
1func main() {2 amqp := amqp.New()3 amqp.Connect()4 amqp.DeclareExchange()5 amqp.DeclareQueue()6 amqp.BindQueue()7 amqp.PublishMessage()8 amqp.ConsumeMessage()9}10func main() {11 amqp := amqp.New()12 amqp.Connect()13 amqp.DeclareExchange()14 amqp.DeclareQueue()15 amqp.BindQueue()16 amqp.PublishMessage()17 amqp.ConsumeMessage()18}19func main() {20 amqp := amqp.New()21 amqp.Connect()22 amqp.DeclareExchange()23 amqp.DeclareQueue()24 amqp.BindQueue()25 amqp.PublishMessage()26 amqp.ConsumeMessage()27}28func main() {29 amqp := amqp.New()30 amqp.Connect()31 amqp.DeclareExchange()32 amqp.DeclareQueue()33 amqp.BindQueue()34 amqp.PublishMessage()35 amqp.ConsumeMessage()36}37func main() {38 amqp := amqp.New()39 amqp.Connect()40 amqp.DeclareExchange()41 amqp.DeclareQueue()42 amqp.BindQueue()43 amqp.PublishMessage()44 amqp.ConsumeMessage()45}46func main() {47 amqp := amqp.New()48 amqp.Connect()49 amqp.DeclareExchange()50 amqp.DeclareQueue()51 amqp.BindQueue()52 amqp.PublishMessage()53 amqp.ConsumeMessage()54}55func main() {
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!