Best Gauge code snippet using runner.invokeRPC
grpcRunner.go
Source:grpcRunner.go
...145 default:146 return nil, nil147 }148}149func (r *GrpcRunner) invokeRPC(message *gm.Message, resChan chan *gm.Message, errChan chan error) {150 var res *gm.Message151 var err error152 if r.LegacyClient != nil {153 res, err = r.invokeLegacyLSPService(message)154 } else {155 res, err = r.invokeServiceFor(message)156 }157 if err != nil {158 errChan <- err159 } else {160 resChan <- res161 }162}163func (r *GrpcRunner) executeMessage(message *gm.Message, timeout time.Duration) (*gm.Message, error) {164 resChan := make(chan *gm.Message)165 errChan := make(chan error)166 go r.invokeRPC(message, resChan, errChan)167 timer := setupTimer(timeout, errChan, message.GetMessageType().String())168 defer stopTimer(timer)169 select {170 case response := <-resChan:171 return response, nil172 case err := <-errChan:173 return nil, err174 }175}176// ExecuteMessageWithTimeout process request and give back the response177func (r *GrpcRunner) ExecuteMessageWithTimeout(message *gm.Message) (*gm.Message, error) {178 return r.executeMessage(message, r.Timeout)179}180// ExecuteAndGetStatus executes a given message and response without timeout....
worker.go
Source:worker.go
1package runner2import (3 "context"4 "errors"5 "fmt"6 "io"7 "time"8 "github.com/gogo/protobuf/proto"9 "github.com/jhump/protoreflect/desc"10 "github.com/jhump/protoreflect/dynamic"11 "github.com/jhump/protoreflect/dynamic/grpcdynamic"12 "go.uber.org/multierr"13 "golang.org/x/sync/errgroup"14 "google.golang.org/grpc"15 "google.golang.org/grpc/encoding/gzip"16 "google.golang.org/grpc/metadata"17)18// TickValue is the tick value19type TickValue struct {20 instant time.Time21 reqNumber uint6422}23// Worker is used for doing a single stream of requests in parallel24type Worker struct {25 stub grpcdynamic.Stub26 mtd *desc.MethodDescriptor27 config *RunConfig28 workerID string29 active bool30 stopCh chan bool31 ticks <-chan TickValue32 dataProvider DataProviderFunc33 metadataProvider MetadataProviderFunc34 msgProvider StreamMessageProviderFunc35 streamRecv StreamRecvMsgInterceptFunc36}37func (w *Worker) runWorker() error {38 var err error39 g := new(errgroup.Group)40 for {41 select {42 case <-w.stopCh:43 if w.config.async {44 return g.Wait()45 }46 return err47 case tv := <-w.ticks:48 if w.config.async {49 g.Go(func() error {50 return w.makeRequest(tv)51 })52 } else {53 rErr := w.makeRequest(tv)54 err = multierr.Append(err, rErr)55 }56 }57 }58}59// Stop stops the worker. It has to be started with Run() again.60func (w *Worker) Stop() {61 if !w.active {62 return63 }64 w.active = false65 w.stopCh <- true66}67func (w *Worker) makeRequest(tv TickValue) error {68 reqNum := int64(tv.reqNumber)69 ctd := newCallData(w.mtd, w.config.funcs, w.workerID, reqNum)70 reqMD, err := w.metadataProvider(ctd)71 if err != nil {72 return err73 }74 if w.config.enableCompression {75 reqMD.Append("grpc-accept-encoding", gzip.Name)76 }77 ctx := context.Background()78 var cancel context.CancelFunc79 if w.config.timeout > 0 {80 ctx, cancel = context.WithTimeout(ctx, w.config.timeout)81 } else {82 ctx, cancel = context.WithCancel(ctx)83 }84 defer cancel()85 // include the metadata86 if reqMD != nil {87 ctx = metadata.NewOutgoingContext(ctx, *reqMD)88 }89 inputs, err := w.dataProvider(ctd)90 if err != nil {91 return err92 }93 var msgProvider StreamMessageProviderFunc94 if w.msgProvider != nil {95 msgProvider = w.msgProvider96 } else if w.mtd.IsClientStreaming() {97 if w.config.streamDynamicMessages {98 mp, err := newDynamicMessageProvider(w.mtd, w.config.data, w.config.streamCallCount)99 if err != nil {100 return err101 }102 msgProvider = mp.GetStreamMessage103 } else {104 mp, err := newStaticMessageProvider(w.config.streamCallCount, inputs)105 if err != nil {106 return err107 }108 msgProvider = mp.GetStreamMessage109 }110 }111 if len(inputs) == 0 && msgProvider == nil {112 return fmt.Errorf("no data provided for request")113 }114 var callType string115 if w.config.hasLog {116 callType = "unary"117 if w.mtd.IsClientStreaming() && w.mtd.IsServerStreaming() {118 callType = "bidi"119 } else if w.mtd.IsServerStreaming() {120 callType = "server-streaming"121 } else if w.mtd.IsClientStreaming() {122 callType = "client-streaming"123 }124 w.config.log.Debugw("Making request", "workerID", w.workerID,125 "call type", callType, "call", w.mtd.GetFullyQualifiedName(),126 "input", inputs, "metadata", reqMD)127 }128 // RPC errors are handled via stats handler129 if w.mtd.IsClientStreaming() && w.mtd.IsServerStreaming() {130 _ = w.makeBidiRequest(&ctx, ctd, msgProvider)131 } else if w.mtd.IsClientStreaming() {132 _ = w.makeClientStreamingRequest(&ctx, ctd, msgProvider)133 } else if w.mtd.IsServerStreaming() {134 _ = w.makeServerStreamingRequest(&ctx, inputs[0])135 } else {136 _ = w.makeUnaryRequest(&ctx, reqMD, inputs[0])137 }138 return err139}140func (w *Worker) makeUnaryRequest(ctx *context.Context, reqMD *metadata.MD, input *dynamic.Message) error {141 var res proto.Message142 var resErr error143 var callOptions = []grpc.CallOption{}144 if w.config.enableCompression {145 callOptions = append(callOptions, grpc.UseCompressor(gzip.Name))146 }147 res, resErr = w.stub.InvokeRpc(*ctx, w.mtd, input, callOptions...)148 if w.config.hasLog {149 w.config.log.Debugw("Received response", "workerID", w.workerID, "call type", "unary",150 "call", w.mtd.GetFullyQualifiedName(),151 "input", input, "metadata", reqMD,152 "response", res, "error", resErr)153 }154 return resErr155}156func (w *Worker) makeClientStreamingRequest(ctx *context.Context,157 ctd *CallData, messageProvider StreamMessageProviderFunc) error {158 var str *grpcdynamic.ClientStream159 var callOptions = []grpc.CallOption{}160 if w.config.enableCompression {161 callOptions = append(callOptions, grpc.UseCompressor(gzip.Name))162 }163 str, err := w.stub.InvokeRpcClientStream(*ctx, w.mtd, callOptions...)164 if err != nil {165 if w.config.hasLog {166 w.config.log.Errorw("Invoke Client Streaming RPC call error: "+err.Error(), "workerID", w.workerID,167 "call type", "client-streaming",168 "call", w.mtd.GetFullyQualifiedName(), "error", err)169 }170 return err171 }172 closeStream := func() {173 res, closeErr := str.CloseAndReceive()174 if w.config.hasLog {175 w.config.log.Debugw("Close and receive", "workerID", w.workerID, "call type", "client-streaming",176 "call", w.mtd.GetFullyQualifiedName(),177 "response", res, "error", closeErr)178 }179 }180 performSend := func(payload *dynamic.Message) (bool, error) {181 err := str.SendMsg(payload)182 if w.config.hasLog {183 w.config.log.Debugw("Send message", "workerID", w.workerID, "call type", "client-streaming",184 "call", w.mtd.GetFullyQualifiedName(),185 "payload", payload, "error", err)186 }187 if err == io.EOF {188 return true, nil189 }190 return false, err191 }192 doneCh := make(chan struct{})193 cancel := make(chan struct{}, 1)194 if w.config.streamCallDuration > 0 {195 go func() {196 sct := time.NewTimer(w.config.streamCallDuration)197 select {198 case <-sct.C:199 cancel <- struct{}{}200 return201 case <-doneCh:202 if !sct.Stop() {203 <-sct.C204 }205 return206 }207 }()208 }209 done := false210 counter := uint(0)211 end := false212 for !done && len(cancel) == 0 {213 // default message provider checks counter214 // but we also need to keep our own counts215 // in case of custom client providers216 var payload *dynamic.Message217 payload, err = messageProvider(ctd)218 isLast := false219 if errors.Is(err, ErrLastMessage) {220 isLast = true221 err = nil222 }223 if err != nil {224 if errors.Is(err, ErrEndStream) {225 err = nil226 }227 break228 }229 end, err = performSend(payload)230 if end || err != nil || isLast || len(cancel) > 0 {231 break232 }233 counter++234 if w.config.streamCallCount > 0 && counter >= w.config.streamCallCount {235 break236 }237 if w.config.streamInterval > 0 {238 wait := time.NewTimer(w.config.streamInterval)239 select {240 case <-wait.C:241 break242 case <-cancel:243 if !wait.Stop() {244 <-wait.C245 }246 done = true247 break248 }249 }250 }251 for len(cancel) > 0 {252 <-cancel253 }254 closeStream()255 close(doneCh)256 close(cancel)257 return nil258}259func (w *Worker) makeServerStreamingRequest(ctx *context.Context, input *dynamic.Message) error {260 var callOptions = []grpc.CallOption{}261 if w.config.enableCompression {262 callOptions = append(callOptions, grpc.UseCompressor(gzip.Name))263 }264 callCtx, callCancel := context.WithCancel(*ctx)265 defer callCancel()266 str, err := w.stub.InvokeRpcServerStream(callCtx, w.mtd, input, callOptions...)267 if err != nil {268 if w.config.hasLog {269 w.config.log.Errorw("Invoke Server Streaming RPC call error: "+err.Error(), "workerID", w.workerID,270 "call type", "server-streaming",271 "call", w.mtd.GetFullyQualifiedName(),272 "input", input, "error", err)273 }274 return err275 }276 doneCh := make(chan struct{})277 cancel := make(chan struct{}, 1)278 if w.config.streamCallDuration > 0 {279 go func() {280 sct := time.NewTimer(w.config.streamCallDuration)281 select {282 case <-sct.C:283 cancel <- struct{}{}284 return285 case <-doneCh:286 if !sct.Stop() {287 <-sct.C288 }289 return290 }291 }()292 }293 interceptCanceled := false294 counter := uint(0)295 for err == nil {296 // we should check before receiving a message too297 if w.config.streamCallDuration > 0 && len(cancel) > 0 {298 <-cancel299 callCancel()300 break301 }302 var res proto.Message303 res, err = str.RecvMsg()304 if w.config.hasLog {305 w.config.log.Debugw("Receive message", "workerID", w.workerID, "call type", "server-streaming",306 "call", w.mtd.GetFullyQualifiedName(),307 "response", res, "error", err)308 }309 // with any of the cancellation operations we can't just bail310 // we have to drain the messages until the server gets the cancel and ends their side of the stream311 if w.streamRecv != nil {312 if converted, ok := res.(*dynamic.Message); ok {313 err = w.streamRecv(converted, err)314 if errors.Is(err, ErrEndStream) && !interceptCanceled {315 interceptCanceled = true316 err = nil317 callCancel()318 }319 }320 }321 if err != nil {322 if err == io.EOF {323 err = nil324 }325 break326 }327 counter++328 if w.config.streamCallCount > 0 && counter >= w.config.streamCallCount {329 callCancel()330 }331 if w.config.streamCallDuration > 0 && len(cancel) > 0 {332 <-cancel333 callCancel()334 }335 }336 close(doneCh)337 close(cancel)338 return err339}340func (w *Worker) makeBidiRequest(ctx *context.Context,341 ctd *CallData, messageProvider StreamMessageProviderFunc) error {342 var callOptions = []grpc.CallOption{}343 if w.config.enableCompression {344 callOptions = append(callOptions, grpc.UseCompressor(gzip.Name))345 }346 str, err := w.stub.InvokeRpcBidiStream(*ctx, w.mtd, callOptions...)347 if err != nil {348 if w.config.hasLog {349 w.config.log.Errorw("Invoke Bidi RPC call error: "+err.Error(),350 "workerID", w.workerID, "call type", "bidi",351 "call", w.mtd.GetFullyQualifiedName(), "error", err)352 }353 return err354 }355 counter := uint(0)356 indexCounter := 0357 recvDone := make(chan bool)358 sendDone := make(chan bool)359 closeStream := func() {360 closeErr := str.CloseSend()361 if w.config.hasLog {362 w.config.log.Debugw("Close send", "workerID", w.workerID, "call type", "bidi",363 "call", w.mtd.GetFullyQualifiedName(), "error", closeErr)364 }365 }366 doneCh := make(chan struct{})367 cancel := make(chan struct{}, 1)368 if w.config.streamCallDuration > 0 {369 go func() {370 sct := time.NewTimer(w.config.streamCallDuration)371 select {372 case <-sct.C:373 cancel <- struct{}{}374 return375 case <-doneCh:376 if !sct.Stop() {377 <-sct.C378 }379 return380 }381 }()382 }383 var recvErr error384 go func() {385 interceptCanceled := false386 for recvErr == nil {387 var res proto.Message388 res, recvErr = str.RecvMsg()389 if w.config.hasLog {390 w.config.log.Debugw("Receive message", "workerID", w.workerID, "call type", "bidi",391 "call", w.mtd.GetFullyQualifiedName(),392 "response", res, "error", recvErr)393 }394 if w.streamRecv != nil {395 if converted, ok := res.(*dynamic.Message); ok {396 iErr := w.streamRecv(converted, recvErr)397 if errors.Is(iErr, ErrEndStream) && !interceptCanceled {398 interceptCanceled = true399 if len(cancel) == 0 {400 cancel <- struct{}{}401 }402 recvErr = nil403 }404 }405 }406 if recvErr != nil {407 close(recvDone)408 break409 }410 }411 }()412 go func() {413 done := false414 for err == nil && !done {415 // check at start before send too416 if len(cancel) > 0 {417 <-cancel418 closeStream()419 break420 }421 // default message provider checks counter422 // but we also need to keep our own counts423 // in case of custom client providers424 var payload *dynamic.Message425 payload, err = messageProvider(ctd)426 isLast := false427 if errors.Is(err, ErrLastMessage) {428 isLast = true429 err = nil430 }431 if err != nil {432 if errors.Is(err, ErrEndStream) {433 err = nil434 }435 closeStream()436 break437 }438 err = str.SendMsg(payload)439 if err != nil {440 if err == io.EOF {441 err = nil442 }443 break444 }445 if w.config.hasLog {446 w.config.log.Debugw("Send message", "workerID", w.workerID, "call type", "bidi",447 "call", w.mtd.GetFullyQualifiedName(),448 "payload", payload, "error", err)449 }450 if isLast {451 closeStream()452 break453 }454 counter++455 indexCounter++456 if w.config.streamCallCount > 0 && counter >= w.config.streamCallCount {457 closeStream()458 break459 }460 if len(cancel) > 0 {461 <-cancel462 closeStream()463 break464 }465 if w.config.streamInterval > 0 {466 wait := time.NewTimer(w.config.streamInterval)467 select {468 case <-wait.C:469 break470 case <-cancel:471 if !wait.Stop() {472 <-wait.C473 }474 closeStream()475 done = true476 break477 }478 }479 }480 close(sendDone)481 }()482 _, _ = <-recvDone, <-sendDone483 for len(cancel) > 0 {484 <-cancel485 }486 close(doneCh)487 close(cancel)488 if err == nil && recvErr != nil {489 err = recvErr490 }491 return err492}...
client.go
Source:client.go
1package grpc2import (3 "context"4 "fmt"5 "strings"6 "time"7 "github.com/thejasn/tester/core/client"8 "github.com/pkg/errors"9 "github.com/thejasn/tester/pkg/log"10 "google.golang.org/grpc"11)12type Config struct {13 ctx context.Context14 rc ReflectClientBuilder15 key string16 host string17 port string18 method string19 request string20}21func NewConfig(ctx context.Context, key, host, port string) *Config {22 return &Config{23 ctx: ctx,24 key: key,25 host: host,26 port: port,27 }28}29func (p Config) GetIdentifier() string {30 return p.key31}32func WithRequest(req string) client.RunnerOpts {33 return func(p client.Runner) {34 p.(*Config).request = req35 }36}37func WithMethod(method string) client.RunnerOpts {38 return func(p client.Runner) {39 p.(*Config).method = method40 }41}42func (p *Config) Clear() {43 p.method = ""44 p.request = ""45}46func (p *Config) Build(ctx context.Context) error {47 dial := func() *grpc.ClientConn {48 clientBuilder := GrpcClientBuilder{}49 dialTime := 10 * time.Second50 ctx, cancel := context.WithTimeout(p.ctx, dialTime)51 defer cancel()52 clientBuilder.WithContext(ctx)53 cc, err := clientBuilder.GetConn(p.host, p.port)54 if err != nil {55 log.GetLogger(ctx).Fatal(errors.Wrapf(err, "Failed to dial target host %q and port %q", p.host, p.port))56 }57 return cc58 }59 p.rc = ReflectClientBuilder{}60 p.rc.WithClientConn(dial())61 p.rc.WithContext(p.ctx)62 p.rc.WithPayload(strings.NewReader(p.request))63 log.GetLogger(ctx).Debugf("Rest Client Config: %+v", p)64 return nil65}66func (p *Config) Invoke() (string, error) {67 r, err := p.rc.InvokeRPC(p.method)68 if err != nil {69 fmt.Println(err)70 return "", errors.Wrapf(err, "Error invoking method %q", p.method)71 }72 return r, nil73}...
invokeRPC
Using AI Code Generation
1import (2type Args struct {3}4type Reply struct {5}6func (t *Arith) Multiply(r *http.Request, args *Args, reply *Reply) error {7}8func main() {9 s := rpc.NewServer()10 s.RegisterCodec(json.NewCodec(), "application/json")11 s.RegisterCodec(json.NewCodec(), "application/json;charset=UTF-8")12 s.RegisterService(new(Arith), "")13 r := mux.NewRouter()14 r.Handle("/rpc", s)15 http.Handle("/", r)16 log.Fatal(http.ListenAndServe(":1234", nil))17}18import (19type Args struct {20}21type Reply struct {22}23func main() {24 client, err := rpc.DialHTTP("tcp", "localhost:1234")25 if err != nil {26 log.Fatal("dialing:", err)27 }28 args := &Args{7, 8}29 err = client.Call("Arith.Multiply", args, &reply)30 if err != nil {31 log.Fatal("arith error:", err)32 }33 fmt.Printf("Arith: %d*%d=%d34 args = &Args{17, 8}35 replyCall := client.Go("Arith.Multiply", args, &reply, nil)36 if err := replyCall.Error; err != nil {37 log.Fatal("arith error:", err)38 }39 fmt.Printf("Arith: %d*%d=%d
invokeRPC
Using AI Code Generation
1public class Client {2 public static void main(String[] args) {3 Runner runner = new Runner();4 runner.invokeRPC();5 }6}7import (8const (9type Runner struct {10}11func (r *Runner) InvokeRPC() {12 conn, err := grpc.Dial(address, grpc.WithInsecure())13 if err != nil {14 log.Fatalf("did not connect: %v", err)15 }16 defer conn.Close()17 c := pb.NewGreeterClient(conn)18 if len(os.Args) > 1 {19 }20 r := &pb.HelloRequest{Name: name}21 ctx, cancel := context.WithTimeout(context.Background(), time.Second)22 defer cancel()23 res, err := c.SayHello(ctx, r)24 if err != nil {25 log.Fatalf("could not greet: %v", err)26 }27 log.Printf("Greeting: %s", res.Message)28}29import (30const (31type server struct {32}33func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {34 return &pb.HelloReply{Message: "Hello " + in.Name}, nil35}36func main() {37 lis, err := net.Listen("tcp", port)38 if err != nil {39 log.Fatalf("failed to listen: %v", err)
invokeRPC
Using AI Code Generation
1import (2func main() {3 fmt.Println("Hello, playground")4}5import (6func main() {7 fmt.Println("Hello, playground")8}9import (10func main() {11 fmt.Println("Hello, playground")12}13import (14func main() {15 fmt.Println("Hello, playground")16}17import (18func main() {19 fmt.Println("Hello, playground")20}21import (22func main() {23 fmt.Println("Hello, playground")24}25import (26func main() {27 fmt.Println("Hello, playground")28}29import (30func main() {31 fmt.Println("Hello, playground")32}33import (34func main() {35 fmt.Println("Hello, playground")36}37import (38func main() {
invokeRPC
Using AI Code Generation
1import (2func main() {3 runner := new(Runner)4 runner.invokeRPC()5}6import (7type Runner struct {8}9func (r *Runner) invokeRPC() {10 client, err := rpc.DialHTTP("tcp", "localhost:8080")11 if err != nil {12 fmt.Println(err)13 }14 request := new(Request)15 response := new(Response)16 err = client.Call("Server.Hello", request, response)17 if err != nil {18 fmt.Println(err)19 }20 fmt.Println(response.Message)21}22import (23type Server struct {24}25func (s *Server) Hello(request Request, response *Response) error {26 response.Message = "Hello " + request.Name + ", your age is " + fmt.Sprint(request.Age)27}28type Request struct {29}30type Response struct {31}32import (33func main() {34 server := new(Server)35 rpc.Register(server)36 rpc.HandleHTTP()37 listener, err := net.Listen("tcp", ":8080")38 if err != nil {39 fmt.Println(err)
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!