How to use New method of kafka Package

Best Venom code snippet using kafka.New

bootstrap.go

Source:bootstrap.go Github

copy

Full Screen

...26 CfgKafkaMechanisms = "kafka.sasl_mechanisms"27 CfgKafkaKey = "kafka.sasl_username"28 CfgKafkaSecret = "kafka.sasl_password"29 CfgKafkaTopic = "kafka.topics"30 CfgNewRelicKey = "newrelic.key"31 CfgNewRelicDebug = "newrelic.debug"32 CfgMongoURI = "database.mongo.uri"33 CfgMongoDB = "database.mongo.db"34 CfgSentryKey = "sentry.key"35 TelemetryID = "newrelic.id"36 CfgNewrelicSlowQueryThreshold = "newrelic.slowquery.threshold"37 CfgNewrelicSlowQueryEnabled = "newrelic.slowquery.enabled"38)39var (40 logger *log.Logger41 db *sqlx.DB42 kc *kafka.Consumer43 kp *kafka.Producer44 mdb *mongo.Database45 rdb *redis.Ring46 telemetry *newrelic.Application47)48func init() {49 c := config.Configure()50 // hot reload on config change...51 go func() {52 c.WatchConfig()53 c.OnConfigChange(func(e fsnotify.Event) {54 log.Printf("config file changed %v", e.Name)55 })56 }()57 db = InitPostgresDB()58 logger = InitLogger()59 telemetry = NewTelemetry(logger)60 kc = InitKafkaConsumer()61 kp = InitKafkaProducer()62 mdb = InitMongoConnect()63 rdb = InitRedis()64}65func InitPostgresDB() (db *sqlx.DB) {66 dsn := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s",67 config.GetString("database.postgres.write.host"), config.GetInt("database.postgres.write.port"),68 config.GetString("database.postgres.write.user"), config.GetString("database.postgres.write.pass"),69 config.GetString("database.postgres.write.db"), config.GetString("database.postgres.write.sslmode"))70 db = datastore.PostgresConn(dsn)71 return72}73func InitLogger() *log.Logger {74 log.SetFormatter(&log.JSONFormatter{})75 l := log.StandardLogger()76 if dsn := config.GetString(CfgSentryKey); len(dsn) > 0 {77 hook, err := logrus_sentry.NewSentryHook(dsn, []log.Level{78 log.PanicLevel,79 log.FatalLevel,80 log.ErrorLevel,81 })82 if err == nil {83 hook.StacktraceConfiguration.Enable = true84 l.Hooks.Add(hook)85 }86 }87 return l88}89func NewTelemetry(l *log.Logger) *newrelic.Application {90 key := config.GetString(CfgNewRelicKey)91 e := l.WithField("component", "newrelic")92 if len(key) == 0 {93 e.Warnf("configuration %s is not defined", CfgNewRelicKey)94 return nil95 }96 // conf := newrelic.NewConfig(config.GetString(TelemetryID), key)97 // conf.DistributedTracer.Enabled = true98 // conf.Logger = nrlogrus.StandardLogger()99 if isDebug := config.GetBool(CfgNewRelicDebug); isDebug {100 l.SetLevel(log.DebugLevel)101 }102 duration := config.GetFloat64(CfgNewrelicSlowQueryThreshold)103 app, err := newrelic.NewApplication(104 newrelic.ConfigAppName(config.GetString(TelemetryID)),105 newrelic.ConfigLicense(key),106 newrelic.ConfigDistributedTracerEnabled(true),107 nrlogrus.ConfigLogger(l),108 func(cfg *newrelic.Config) {109 cfg.ErrorCollector.RecordPanics = true110 cfg.DatastoreTracer.SlowQuery.Enabled = config.GetBool(CfgNewrelicSlowQueryEnabled)111 cfg.DatastoreTracer.SlowQuery.Threshold = time.Duration(duration) * time.Second112 },113 )114 if err != nil {115 e.Info(errors.Cause(err))116 return nil117 }118 return app119}120func InitKafkaConsumer() *kafka.Consumer {121 return datastore.NewKafkaConsumer(config.GetString(CfgKafkaHost), config.GetString(CfgKafkaGroup), config.GetString(CfgKafkaProtocol), config.GetString(CfgKafkaMechanisms), config.GetString(CfgKafkaKey), config.GetString(CfgKafkaSecret))122}123func InitKafkaProducer() *kafka.Producer {124 return datastore.NewKafkaProducer(config.GetString(CfgKafkaHost), config.GetString(CfgKafkaProtocol), config.GetString(CfgKafkaMechanisms), config.GetString(CfgKafkaKey), config.GetString(CfgKafkaSecret))125}126func InitMongoConnect() *mongo.Database {127 return datastore.MongoMustConnect(config.GetString(CfgMongoURI), config.GetString(CfgMongoDB))128}129func InitRedis() *redis.Ring {130 return datastore.NewRedisClient(config.GetStringMapString(CfgRedisHost), config.GetString(CfgRedisPass), config.GetInt(CfgRedisDB))131}...

Full Screen

Full Screen

initialise.go

Source:initialise.go Github

copy

Full Screen

...12 dphttp "github.com/ONSdigital/dp-net/http"13)14// GetKafkaConsumer returns a Kafka consumer with the provided config15var GetKafkaConsumer = func(ctx context.Context, cfg *config.Config) (kafka.IConsumerGroup, error) {16 kafkaOffset := kafka.OffsetNewest17 if cfg.KafkaConfig.OffsetOldest {18 kafkaOffset = kafka.OffsetOldest19 }20 cgConfig := &kafka.ConsumerGroupConfig{21 BrokerAddrs: cfg.KafkaConfig.Addr,22 Topic: cfg.KafkaConfig.InstanceStartedTopic,23 GroupName: cfg.KafkaConfig.InstanceStartedGroup,24 MinBrokersHealthy: &cfg.KafkaConfig.ConsumerMinBrokersHealthy,25 KafkaVersion: &cfg.KafkaConfig.Version,26 NumWorkers: &cfg.KafkaConfig.NumWorkers,27 Offset: &kafkaOffset,28 }29 if cfg.KafkaConfig.SecProtocol == config.KafkaTLSProtocolFlag {30 cgConfig.SecurityConfig = kafka.GetSecurityConfig(31 cfg.KafkaConfig.SecCACerts,32 cfg.KafkaConfig.SecClientCert,33 cfg.KafkaConfig.SecClientKey,34 cfg.KafkaConfig.SecSkipVerify,35 )36 }37 return kafka.NewConsumerGroup(ctx, cgConfig)38}39// GetKafkaProducer returns a kafka producer with the provided config40var GetKafkaProducer = func(ctx context.Context, cfg *config.Config) (kafka.IProducer, error) {41 pConfig := &kafka.ProducerConfig{42 BrokerAddrs: cfg.KafkaConfig.Addr,43 Topic: cfg.KafkaConfig.CategoryDimensionImportTopic,44 MinBrokersHealthy: &cfg.KafkaConfig.ProducerMinBrokersHealthy,45 KafkaVersion: &cfg.KafkaConfig.Version,46 MaxMessageBytes: &cfg.KafkaConfig.MaxBytes,47 }48 if cfg.KafkaConfig.SecProtocol == config.KafkaTLSProtocolFlag {49 pConfig.SecurityConfig = kafka.GetSecurityConfig(50 cfg.KafkaConfig.SecCACerts,51 cfg.KafkaConfig.SecClientCert,52 cfg.KafkaConfig.SecClientKey,53 cfg.KafkaConfig.SecSkipVerify,54 )55 }56 return kafka.NewProducer(ctx, pConfig)57}58// GetHTTPServer returns an http server59var GetHTTPServer = func(bindAddr string, router http.Handler) HTTPServer {60 s := dphttp.NewServer(bindAddr, router)61 s.HandleOSSignals = false62 return s63}64// GetHealthCheck returns a healthcheck65var GetHealthCheck = func(cfg *config.Config, buildTime, gitCommit, version string) (HealthChecker, error) {66 versionInfo, err := healthcheck.NewVersionInfo(buildTime, gitCommit, version)67 if err != nil {68 return nil, err69 }70 hc := healthcheck.New(versionInfo, cfg.HealthCheckCriticalTimeout, cfg.HealthCheckInterval)71 return &hc, nil72}73var GetCantabularClient = func(cfg *config.Config) CantabularClient {74 return cantabular.NewClient(75 cantabular.Config{76 Host: cfg.CantabularURL,77 ExtApiHost: cfg.CantabularExtURL,78 },79 dphttp.NewClient(),80 nil,81 )82}83var GetRecipeAPIClient = func(cfg *config.Config) RecipeAPIClient {84 return recipe.NewClient(cfg.RecipeAPIURL)85}86var GetDatasetAPIClient = func(cfg *config.Config) DatasetAPIClient {87 return dataset.NewAPIClient(cfg.DatasetAPIURL)88}89var GetImportAPIClient = func(cfg *config.Config) ImportAPIClient {90 return importapi.New(cfg.ImportAPIURL)91}...

Full Screen

Full Screen

publisher.go

Source:publisher.go Github

copy

Full Screen

...14 brokers []string15 dialer *kafka.Dialer16 kafkaWriters map[string]*kafka.Writer17}18func NewKafkaPublisher(kafkaDialer *kafka.Dialer, brokers ...string) *publisher {19 return &publisher{20 brokers: brokers,21 kafkaWriters: make(map[string]*kafka.Writer, 0),22 dialer: kafkaDialer,23 }24}25func (p *publisher) getKafkaWriter(topic string) *kafka.Writer {26 if p.kafkaWriters[topic] == nil {27 p.kafkaWriters[topic] = kafka.NewWriter(kafka.WriterConfig{28 Brokers: p.brokers,29 Topic: topic,30 Balancer: &kafka.LeastBytes{},31 CompressionCodec: snappy.NewCompressionCodec(),32 BatchSize: 1,33 BatchTimeout: 10 * time.Millisecond,34 Dialer: p.dialer,35 })36 }37 return p.kafkaWriters[topic]38}39func (p *publisher) Publish(topic string, data interface{}) error {40 kafkaMessages, err := createKafkaMessages(data)41 if err != nil {42 return errors.New("error on publishing kafka message")43 }44 kafkaWriter := p.getKafkaWriter(topic)45 err = kafkaWriter.WriteMessages(context.Background(), kafkaMessages...)46 if err != nil {47 log.WithError(err).Error("error writing kafka message")48 return err49 } else {50 log.WithField("topic", topic).Info("kafka message published successfully")51 log.Info("message bytes size: %d bytes", uint(p.kafkaWriters[topic].Stats().Bytes))52 }53 return nil54}55func createKafkaMessages(data interface{}) ([]kafka.Message, error) {56 var kafkaMessages []kafka.Message57 if utils.IsNilFixed(data) {58 return nil, errors.New("kafka error the data is empty")59 }60 switch reflect.TypeOf(data).Kind() {61 case reflect.Array, reflect.Slice:62 value := reflect.ValueOf(data)63 for index := 0; index < value.Len(); index++ {64 kafkaMessages = append(kafkaMessages, createMessageKafka(value.Index(index)))65 }66 default:67 kafkaMessages = append(kafkaMessages, createMessageKafka(data))68 }69 return kafkaMessages, nil70}71func createMessageKafka(data interface{}) kafka.Message {72 payload := utils.EntityToJson(data)73 key := uuid.New().String()74 kafkaMessage := kafka.Message{75 Key: []byte(key),76 Value: []byte(payload),77 }78 return kafkaMessage79}80func (p *publisher) Close(topic string) error {81 if p.kafkaWriters[topic] == nil {82 return errors.New("error trying to close kafka connection, connection does not exist")83 }84 p.kafkaWriters[topic].Close()85 delete(p.kafkaWriters, topic)86 return nil87}

Full Screen

Full Screen

New

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 config := sarama.NewConfig()4 producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)5 if err != nil {6 panic(err)7 }8 defer func() {9 if err := producer.Close(); err != nil {10 panic(err)11 }12 }()13 msg := &sarama.ProducerMessage{14 Value: sarama.StringEncoder("testing 123"),15 }16 partition, offset, err := producer.SendMessage(msg)17 if err != nil {18 panic(err)19 }20 fmt.Printf("Message is stored in topic(%s)/partition(%d)/offset(%d)21}22Message is stored in topic(test)/partition(0)/offset(0)23import (24func main() {25 config := sarama.NewConfig()26 master, err := sarama.NewConsumer([]string{"localhost:9092"}, config)27 if err != nil {28 panic(err)29 }30 defer func() {31 if err := master.Close(); err != nil {32 panic(err)33 }34 }()35 consumer, err := master.ConsumePartition("test", 0, sarama.OffsetNewest)36 if err != nil {37 panic(err)38 }39 defer func() {40 if err := consumer.Close(); err != nil {41 panic(err)42 }43 }()44 for {45 select {46 case err := <-consumer.Errors():47 fmt.Println(err)48 case msg := <-consumer.Messages():49 fmt.Println(string(msg.Key), string(msg.Value))50 }51 }52}53import (54func main() {55 config := sarama.NewConfig()56 producer, err := sarama.NewAsyncProducer([]string{"localhost

Full Screen

Full Screen

New

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 config := sarama.NewConfig()4 brokers := []string{"localhost:9092"}5 producer, err := sarama.NewSyncProducer(brokers, config)6 if err != nil {7 panic(err)8 }9 defer func() {10 if err := producer.Close(); err != nil {11 panic(err)12 }13 }()14 msg := &sarama.ProducerMessage{15 Value: sarama.StringEncoder("testing 123"),16 }17 partition, offset, err := producer.SendMessage(msg)18 if err != nil {19 panic(err)20 }21 fmt.Printf("Message is stored in topic(%s)/partition(%d)/offset(%d)22}23import (24func main() {25 config := sarama.NewConfig()26 brokers := []string{"localhost:9092"}27 producer, err := sarama.NewAsyncProducer(brokers, config)28 if err != nil {29 panic(err)30 }31 defer func() {32 if err := producer.Close(); err != nil {33 panic(err)34 }35 }()36 msg := &sarama.ProducerMessage{37 Value: sarama.StringEncoder("testing 123"),38 }39 producer.Input() <- msg40 select {41 case suc := <-producer.Successes():42 fmt.Printf("offset: %d, timestamp: %s43", suc.Offset, suc.Timestamp.String())44 case fail := <-producer.Errors():45 fmt.Printf("err: %s46", fail.Err.Error())47 }48}49import (50func main() {51 config := sarama.NewConfig()52 brokers := []string{"localhost:9092"}53 producer, err := sarama.NewAsyncProducer(brokers, config)54 if err != nil {

Full Screen

Full Screen

New

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 config := sarama.NewConfig()4 producer, err := sarama.NewSyncProducer([]string{broker}, config)5 if err != nil {6 panic(err)7 }8 defer func() {9 if err := producer.Close(); err != nil {10 panic(err)11 }12 }()13 msg := &sarama.ProducerMessage{14 Value: sarama.StringEncoder("Hello World"),15 }16 partition, offset, err := producer.SendMessage(msg)17 if err != nil {18 panic(err)19 }20 fmt.Printf("Message is stored in topic(%s)/partition(%d)/offset(%d)21}22Message is stored in topic(topic1)/partition(0)/offset(0)23import (24func main() {25 config := sarama.NewConfig()26 consumer, err := sarama.NewConsumer([]string{broker}, config)27 if err != nil {28 panic(err)29 }30 defer func() {31 if err := consumer.Close(); err != nil {32 panic(err)33 }34 }()35 partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetNewest)36 if err != nil {37 panic(err)38 }39 defer func() {40 if err := partitionConsumer.Close(); err != nil {41 panic(err)42 }43 }()44 for msg := range partitionConsumer.Messages() {

Full Screen

Full Screen

New

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 config := sarama.NewConfig()4 producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)5 if err != nil {6 log.Fatalln(err)7 }8 defer producer.Close()9 msg := &sarama.ProducerMessage{10 Value: sarama.StringEncoder("testing 123"),11 }12 partition, offset, err := producer.SendMessage(msg)13 if err != nil {14 log.Fatalln(err)15 }16 log.Printf("Message is stored in topic(%s)/partition(%d)/offset(%d)17}18Message is stored in topic(test)/partition(0)/offset(0)19import (20func main() {21 config := sarama.NewConfig()22 master, err := sarama.NewConsumer([]string{"localhost:9092"}, config)23 if err != nil {24 panic(err)25 }26 defer func() {27 if err := master.Close(); err != nil {28 panic(err)29 }30 }()31 consumer, err := master.ConsumePartition("test", 0, sarama.OffsetNewest)32 if err != nil {33 panic(err)34 }35 defer func() {36 if err := consumer.Close(); err != nil {37 panic(err)38 }39 }()40 ctx, cancel := context.WithCancel(context.Background())41 defer cancel()42 go func() {43 for err := range consumer.Errors() {44 fmt.Println(err)45 }46 }()47 go func() {48 for msg := range consumer.Messages() {49 fmt.Println(string(msg.Value))50 }51 }()52 <-ctx.Done()53}

Full Screen

Full Screen

New

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 brokers := []string{"localhost:9092"}4 topics := []string{"my_topic"}5 config := cluster.NewConfig()6 consumer, err := cluster.NewConsumer(brokers, "my-group", topics, config)7 if err != nil {8 panic(err)9 }10 defer consumer.Close()11 go func() {12 for err := range consumer.Errors() {13 fmt.Println(err)14 }15 }()16 go func() {17 for ntf := range consumer.Notifications() {18 fmt.Println(ntf)19 }20 }()21 for {22 select {23 case msg, ok := <-consumer.Messages():24 if ok {25 fmt.Printf("Message topic: %s, partition: %d, offset: %d, key: %s, value: %s26 }27 }28 }29}

Full Screen

Full Screen

New

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, nil)4 if err != nil {5 panic(err)6 }7 defer func() {8 if err := producer.Close(); err != nil {9 panic(err)10 }11 }()12 msg := &sarama.ProducerMessage{13 Value: sarama.StringEncoder("testing 123"),14 }15 partition, offset, err := producer.SendMessage(msg)16 if err != nil {17 panic(err)18 }19 fmt.Printf("Message is stored in topic(%s)/partition(%d)/offset(%d)20}21import (22func main() {23 producer, err := sarama.NewProducer([]string{"localhost:9092"}, nil)24 if err != nil {25 panic(err)26 }27 defer func() {28 if err := producer.Close(); err != nil {29 panic(err)30 }31 }()

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful