Best Testcontainers-go code snippet using wait.defaultPollInterval
watcher.go
Source:watcher.go
1package state2import (3 "context"4 "sync"5 "time"6 "github.com/golang/glog"7 "github.com/google/uuid"8 "github.com/pkg/errors"9)10// DefaultPollInterval used directly for polling items, and indirectly for acquiring leases.11var DefaultPollInterval = time.Second12// MinLeaseDuration is the minimum amount of time to lease a partition for.13var MinLeaseDuration = time.Second * 3014var OverrideMinLeaseDuration = false15// Watcher watches partitions, leases them, and calls out to processor to process items.16type Watcher struct {17 Processor18 Repo19 OwnerID string20 // BatchSize is the number of items to process simultaneously.21 BatchSize int22 PollInterval time.Duration23 // Whether to manually increment the gate for checkpoint purposes, or autoclose the partition.24 // Set to true, if you don't want the watcher to automatically increment25 // gates, or set status to Complete when no items are remaining.26 // This is especially useful if you continuously add items to a partition with no checkpointing.27 ManualCheckpoint bool28 AutoClose bool29 LeaseInterval time.Duration30 LeaseDuration time.Duration31 itemQ chan *Item32 leases map[string]*Partition33 mu sync.Mutex34}35// Start the watcher. Sets some defaults if not set.36func (w *Watcher) Start(ctx context.Context) {37 if w.PollInterval == 0 {38 w.PollInterval = DefaultPollInterval39 }40 if w.BatchSize == 0 {41 w.BatchSize = 1042 }43 if w.OwnerID == "" {44 w.OwnerID = uuid.New().String()45 }46 w.leases = map[string]*Partition{}47 if w.LeaseInterval == 0 {48 w.LeaseInterval = 2 * w.PollInterval49 }50 if w.LeaseDuration == 0 {51 w.LeaseDuration = 2 * w.LeaseInterval52 }53 if w.LeaseDuration < MinLeaseDuration && !OverrideMinLeaseDuration {54 glog.Warning("overriding lease duration to 30s, recommended minimum")55 w.LeaseDuration = MinLeaseDuration56 }57 w.itemQ = make(chan *Item, w.BatchSize)58 w.watch(ctx)59}60func (w *Watcher) watch(ctx context.Context) {61 var wg sync.WaitGroup62 glog.Infof("starting watcher %s", w.OwnerID)63 wg.Add(w.BatchSize)64 for i := 0; i < w.BatchSize; i++ {65 go w.itemProcessor(ctx, &wg)66 }67 w.acquireLeases(ctx)68 wg.Wait()69 glog.Info("gracefully shutting down watcher")70}71// acquireLeases contiuously polls the database for potential leases72// based on the columns 'status', 'owner', and 'until'. If found,73// it writes "leases" the partition by writing to the 'owner'74// and 'until' fields, and saves the lease in w.leases.75func (w *Watcher) acquireLeases(ctx context.Context) {76 var wg sync.WaitGroup77 t := time.NewTicker(w.LeaseInterval)78 defer t.Stop()79 for {80 partitions, err := w.GetPotentialLeases(ctx)81 if err != nil {82 glog.Errorf("error getting potential leases: %s", err)83 }84 for _, p := range partitions {85 w.mu.Lock()86 _, ok := w.leases[p.ID]87 if ok {88 glog.Warningf("leased partition expired: %s, consider increasing lease interval", p.ID)89 } else {90 wg.Add(1)91 w.leases[p.ID] = p92 p := p93 go w.watchPartition(ctx, p, &wg)94 }95 w.mu.Unlock()96 }97 select {98 case <-t.C:99 continue100 case <-ctx.Done():101 t.Stop()102 wg.Wait()103 close(w.itemQ)104 return105 }106 }107}108func (w *Watcher) watchPartition(ctx context.Context, p *Partition, wg *sync.WaitGroup) {109 t := time.NewTicker(w.PollInterval)110 defer func() {111 t.Stop()112 w.mu.Lock()113 delete(w.leases, p.ID)114 w.mu.Unlock()115 wg.Done()116 }()117 for {118 items, err := w.GetAvailableItems(ctx, p, w.BatchSize-len(w.itemQ))119 if err != nil {120 glog.Errorf("error querying for items %s", err)121 return122 }123 counts, err := w.GetCountByStatus(ctx, p.ID)124 if err != nil {125 glog.Errorf("error fetching count by lease status for partition %s: %s", p.ID, err)126 return127 }128 if counts[Failed] > 0 {129 glog.Warningf("failures detected within partition %s, moving to failed status", p.ID)130 p.Status = Failed131 } else if counts[Available] > 0 {132 glog.Infof("all items at this gate done, incrementing gate for partition %s", p.ID)133 p.Status = Available134 if len(items) == 0 && !w.ManualCheckpoint {135 p.Gate++136 }137 } else {138 glog.Infof("all items done! closing out partition %s", p.ID)139 if len(items) == 0 && w.AutoClose {140 p.Status = Complete141 }142 }143 p.Owner = w.OwnerID144 p.Until = time.Now().Add(w.LeaseDuration)145 if !w.Save(ctx, p) {146 glog.Errorf("error saving patition %s", p.ID)147 return148 }149 if p.InActive() {150 glog.Warningf("partition no longer active %s", p.ID)151 return152 }153 for _, i := range items {154 w.itemQ <- i155 }156 select {157 case <-t.C:158 continue159 case <-ctx.Done():160 return161 }162 }163}164func (w *Watcher) itemProcessor(ctx context.Context, wg *sync.WaitGroup) {165 for item := range w.itemQ {166 // We don't care about the result, since it will just get added back on the queue later on failure.167 w.processItem(ctx, item)168 }169 wg.Done()170}171// processItem sends the items to the processor, handles error and continuation responses.172func (w *Watcher) processItem(ctx context.Context, i *Item) {173 defer func() {174 if !w.Save(ctx, i) {175 glog.Warningf("error saving item %s to partition %s", i.ID, i.PartitionID)176 }177 }()178 glog.Infof("%s is processing object with ID: %s in partition: %s, s: %s", w.OwnerID, i.ID, i.PartitionID, i.Data)179 resp, err := w.Process(i.ID, i.Data)180 if err != nil {181 i.error(err)182 return183 }184 if resp.Complete {185 i.Status = Complete186 }187 i.Gate = resp.NextGate188 i.Data = resp.Data189}190func (w *Watcher) Healthcheck(ctx context.Context) error {191 var wg sync.WaitGroup192 wg.Add(2)193 var dbErr, procErr error194 go func() {195 dbErr = w.Repo.Healthcheck(ctx)196 wg.Done()197 }()198 go func() {199 procErr = w.Processor.Healthcheck(ctx)200 wg.Done()201 }()202 wg.Wait()203 if dbErr != nil && procErr != nil {204 return errors.Wrap(dbErr, procErr.Error())205 }206 if dbErr != nil {207 return dbErr208 }209 if procErr != nil {210 return procErr211 }212 return nil213}...
counter.go
Source:counter.go
...12 wg sync.WaitGroup13 ticker *time.Ticker14 pollInterval time.Duration15}16const defaultPollInterval = time.Second * 1017// New returns an instance of aggregate word counter18func New(counters ...*wordcounter.Counter) *Counter {19 if len(counters) == 0 {20 // No counters -- probably a bug21 panic("expected at least one counter")22 }23 r := Counter{24 counters: counters,25 wordCountsC: make(chan []countformatter.WordCount),26 pollInterval: defaultPollInterval,27 }28 return &r29}30func (c *Counter) WithPollInterval(interval time.Duration) *Counter {31 if interval <= 0 {32 panic("invalid poll interval")33 }34 c.pollInterval = interval35 return c36}37// Run initiates counting and returns a chan for intermediate results38func (c *Counter) Run() chan []countformatter.WordCount {39 c.ticker = time.NewTicker(defaultPollInterval)40 c.runner()41 go c.poller()42 return c.wordCountsC43}44// Wait blocks until all the counters are done45func (c *Counter) Wait() {46 c.wg.Wait()47 c.ticker.Stop()48}49// CurrentCounts returns aggregated counts for all counters50func (c *Counter) CurrentCounts() []countformatter.WordCount {51 return countformatter.GetWordCounts(c.collectCounts())52}53// runner starts the counters...
sender.go
Source:sender.go
1package delayedmessage2import (3 "context"4 "time"5 "github.com/jmalloc/ax/endpoint"6 "github.com/jmalloc/ax/persistence"7)8// DefaultPollInterval is the duration to wait before checking for new messages9// to send.10var DefaultPollInterval = 15 * time.Second11// state is a function that handles a single state of the sender.12type state func(ctx context.Context) (state, error)13// Sender is a service that sends delayed messages when they become ready to be14// sent.15type Sender struct {16 DataStore persistence.DataStore17 Repository Repository18 OutboundPipeline endpoint.OutboundPipeline19 PollInterval time.Duration20}21// Run sends messages as they become ready to send until ctx is canceled or an22// error occurrs.23func (s *Sender) Run(ctx context.Context) error {24 for {25 if err := s.tick(ctx); err != nil {26 return err27 }28 }29}30// Tick loads the next message from the repository and sends it if it is ready31// to be sent. Otherwise it waits for the poll interval or until the message is32// ready to be sent then tries again.33func (s *Sender) tick(ctx context.Context) error {34 env, ok, err := s.Repository.LoadNextMessage(ctx, s.DataStore)35 if err != nil {36 return err37 }38 d := s.PollInterval39 if d == 0 {40 d = DefaultPollInterval41 }42 if ok {43 delay := time.Until(env.SendAt)44 if delay <= 0 {45 return s.send(ctx, env)46 } else if delay < d {47 d = delay48 }49 }50 return s.sleep(ctx, d)51}52// send sends a message and marks it as sent.53func (s *Sender) send(ctx context.Context, env endpoint.OutboundEnvelope) error {54 if err := s.OutboundPipeline.Accept(ctx, env); err != nil {55 return err56 }57 tx, com, err := s.DataStore.BeginTx(ctx)58 if err != nil {59 return err60 }61 defer com.Rollback()62 if err := s.Repository.MarkAsSent(ctx, tx, env); err != nil {63 return err64 }65 return com.Commit()66}67// sleep blocks until ctx is canceled or the given duration elapses.68func (s *Sender) sleep(ctx context.Context, d time.Duration) error {69 timer := time.NewTimer(d)70 defer timer.Stop()71 select {72 case <-timer.C:73 return nil74 case <-ctx.Done():75 return ctx.Err()76 }77}...
defaultPollInterval
Using AI Code Generation
1import (2func main() {3 caps := selenium.Capabilities{"browserName": "chrome"}4 wd, err := selenium.NewRemote(caps, "")5 if err != nil {6 panic(err)7 }8 defer wd.Quit()9 panic(err)10 }11 if err := wd.WaitWithTimeout(selenium.DefaultPollInterval, 10*time.Second); err != nil {12 panic(err)13 }14 elem, err := wd.FindElement(selenium.ByID, "code")15 if err != nil {16 panic(err)17 }18 if err := elem.Clear(); err != nil {19 panic(err)20 }21 if err := elem.SendKeys("package main22import \"fmt\"23func main() {24 fmt.Println(\"Hello, playground\")25}"); err != nil {26 panic(err)27 }28 btn, err := wd.FindElement(selenium.ByCSSSelector, "#run")29 if err != nil {30 panic(err)31 }32 if err := btn.Click(); err != nil {33 panic(err)34 }35 output, err := wd.WaitWithTimeout(selenium.DefaultPollInterval, 10*time.Second).FindElement(selenium.ByCSSSelector, "#output")36 if err != nil {37 panic(err)38 }39 if txt, err := output.Text(); err != nil {40 panic(err)41 } else {42 fmt.Println("Program output:", txt)43 }44}45import (46func main() {47 caps := selenium.Capabilities{"browserName": "chrome"}48 wd, err := selenium.NewRemote(caps
defaultPollInterval
Using AI Code Generation
1import (2func main() {3 fmt.Println("Starting the application...")4 time.Sleep(2 * time.Second)5 fmt.Println("Terminating the application...")6}7import (8func main() {9 fmt.Println("Starting the application...")10 for start := time.Now(); time.Since(start) < 2*time.Second; {11 fmt.Println("Waiting for 2 seconds...")12 }13 fmt.Println("Terminating the application...")14}
defaultPollInterval
Using AI Code Generation
1import (2func main() {3 ticker := wait.NewTicker(time.Second, 0)4 defer ticker.Stop()5 fmt.Println("Ticker has ticked")6}7import (8func main() {9 ticker := wait.NewTickerWithTimer(time.Second, 0)10 defer ticker.Stop()11 fmt.Println("Ticker has ticked")12}13import (14func main() {15 timer := wait.NewTimer(0)16 fmt.Println("Timer has ticked")17}18import (19func main() {20 timer := wait.NewTimerWithCh(0)21 fmt.Println("Timer has ticked")22}23import (
defaultPollInterval
Using AI Code Generation
1import (2func main() {3 fmt.Println(wait.DefaultPollInterval)4}5import (6func main() {7 fmt.Println(wait.DefaultRetry)8}9{10 1s 1s 0s 1s}10import (11func main() {12 fmt.Println(wait.DefaultBackoff)13}14{0 0s 1s 0s 0s}15import (16func main() {17 fmt.Println(wait.DefaultIfZeroDuration(5*time.Second, 2*time.Second))18 fmt.Println(wait.DefaultIfZeroDuration(0, 2*time.Second))19}20import (21func main() {22 wait.Forever(func() {23 fmt.Println("Hello")24 }, 2*time.Second)25}26import (27func main() {28 fmt.Println(wait.Jitter(5*time.Second, 1.0))29}
defaultPollInterval
Using AI Code Generation
1func main() {2 fmt.Println("Default polling interval is", wait.DefaultPollInterval())3}4func main() {5 fmt.Println("Default timeout is", wait.DefaultTimeout())6}7func main() {8 fmt.Println("Default timeout is", wait.ForPolling())9}
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!