Best Venom code snippet using grpc.OnReceiveHeaders
invoke.go
Source:invoke.go
...25 // OnResolveMethod is called with a descriptor of the method that is being invoked.26 OnResolveMethod(*desc.MethodDescriptor)27 // OnSendHeaders is called with the request metadata that is being sent.28 OnSendHeaders(metadata.MD)29 // OnReceiveHeaders is called when response headers have been received.30 OnReceiveHeaders(metadata.MD)31 // OnReceiveResponse is called for each response message received.32 OnReceiveResponse(proto.Message)33 // OnReceiveTrailers is called when response trailers and final RPC status have been received.34 OnReceiveTrailers(*status.Status, metadata.MD)35}36// RequestMessageSupplier is a function that is called to retrieve request37// messages for a GRPC operation. This type is deprecated and will be removed in38// a future release.39//40// Deprecated: This is only used with the deprecated InvokeRpc. Instead, use41// RequestSupplier with InvokeRPC.42type RequestMessageSupplier func() ([]byte, error)43// InvokeRpc uses the given gRPC connection to invoke the given method. This function is deprecated44// and will be removed in a future release. It just delegates to the similarly named InvokeRPC45// method, whose signature is only slightly different.46//47// Deprecated: use InvokeRPC instead.48func InvokeRpc(ctx context.Context, source DescriptorSource, cc *grpc.ClientConn, methodName string,49 headers []string, handler InvocationEventHandler, requestData RequestMessageSupplier) error {50 return InvokeRPC(ctx, source, cc, methodName, headers, handler, func(m proto.Message) error {51 // New function is almost identical, but the request supplier function works differently.52 // So we adapt the logic here to maintain compatibility.53 data, err := requestData()54 if err != nil {55 return err56 }57 return jsonpb.Unmarshal(bytes.NewReader(data), m)58 })59}60// RequestSupplier is a function that is called to populate messages for a gRPC operation. The61// function should populate the given message or return a non-nil error. If the supplier has no62// more messages, it should return io.EOF. When it returns io.EOF, it should not in any way63// modify the given message argument.64type RequestSupplier func(proto.Message) error65// InvokeRPC uses the given gRPC channel to invoke the given method. The given descriptor source66// is used to determine the type of method and the type of request and response message. The given67// headers are sent as request metadata. Methods on the given event handler are called as the68// invocation proceeds.69//70// The given requestData function supplies the actual data to send. It should return io.EOF when71// there is no more request data. If the method being invoked is a unary or server-streaming RPC72// (e.g. exactly one request message) and there is no request data (e.g. the first invocation of73// the function returns io.EOF), then an empty request message is sent.74//75// If the requestData function and the given event handler coordinate or share any state, they should76// be thread-safe. This is because the requestData function may be called from a different goroutine77// than the one invoking event callbacks. (This only happens for bi-directional streaming RPCs, where78// one goroutine sends request messages and another consumes the response messages).79func InvokeRPC(ctx context.Context, source DescriptorSource, ch grpcdynamic.Channel, methodName string,80 headers []string, handler InvocationEventHandler, requestData RequestSupplier) error {81 md := MetadataFromHeaders(headers)82 svc, mth := parseSymbol(methodName)83 if svc == "" || mth == "" {84 return fmt.Errorf("given method name %q is not in expected format: 'service/method' or 'service.method'", methodName)85 }86 dsc, err := source.FindSymbol(svc)87 if err != nil {88 if isNotFoundError(err) {89 return fmt.Errorf("target server does not expose service %q", svc)90 }91 return fmt.Errorf("failed to query for service descriptor %q: %v", svc, err)92 }93 sd, ok := dsc.(*desc.ServiceDescriptor)94 if !ok {95 return fmt.Errorf("target server does not expose service %q", svc)96 }97 mtd := sd.FindMethodByName(mth)98 if mtd == nil {99 return fmt.Errorf("service %q does not include a method named %q", svc, mth)100 }101 handler.OnResolveMethod(mtd)102 // we also download any applicable extensions so we can provide full support for parsing user-provided data103 var ext dynamic.ExtensionRegistry104 alreadyFetched := map[string]bool{}105 if err = fetchAllExtensions(source, &ext, mtd.GetInputType(), alreadyFetched); err != nil {106 return fmt.Errorf("error resolving server extensions for message %s: %v", mtd.GetInputType().GetFullyQualifiedName(), err)107 }108 if err = fetchAllExtensions(source, &ext, mtd.GetOutputType(), alreadyFetched); err != nil {109 return fmt.Errorf("error resolving server extensions for message %s: %v", mtd.GetOutputType().GetFullyQualifiedName(), err)110 }111 msgFactory := dynamic.NewMessageFactoryWithExtensionRegistry(&ext)112 req := msgFactory.NewMessage(mtd.GetInputType())113 handler.OnSendHeaders(md)114 ctx = metadata.NewOutgoingContext(ctx, md)115 stub := grpcdynamic.NewStubWithMessageFactory(ch, msgFactory)116 ctx, cancel := context.WithCancel(ctx)117 defer cancel()118 if mtd.IsClientStreaming() && mtd.IsServerStreaming() {119 return invokeBidi(ctx, stub, mtd, handler, requestData, req)120 } else if mtd.IsClientStreaming() {121 return invokeClientStream(ctx, stub, mtd, handler, requestData, req)122 } else if mtd.IsServerStreaming() {123 return invokeServerStream(ctx, stub, mtd, handler, requestData, req)124 } else {125 return invokeUnary(ctx, stub, mtd, handler, requestData, req)126 }127}128func invokeUnary(ctx context.Context, stub grpcdynamic.Stub, md *desc.MethodDescriptor, handler InvocationEventHandler,129 requestData RequestSupplier, req proto.Message) error {130 err := requestData(req)131 if err != nil && err != io.EOF {132 return fmt.Errorf("error getting request data: %v", err)133 }134 if err != io.EOF {135 // verify there is no second message, which is a usage error136 err := requestData(req)137 if err == nil {138 return fmt.Errorf("method %q is a unary RPC, but request data contained more than 1 message", md.GetFullyQualifiedName())139 } else if err != io.EOF {140 return fmt.Errorf("error getting request data: %v", err)141 }142 }143 // Now we can actually invoke the RPC!144 var respHeaders metadata.MD145 var respTrailers metadata.MD146 resp, err := stub.InvokeRpc(ctx, md, req, grpc.Trailer(&respTrailers), grpc.Header(&respHeaders))147 stat, ok := status.FromError(err)148 if !ok {149 // Error codes sent from the server will get printed differently below.150 // So just bail for other kinds of errors here.151 return fmt.Errorf("grpc call for %q failed: %v", md.GetFullyQualifiedName(), err)152 }153 handler.OnReceiveHeaders(respHeaders)154 if stat.Code() == codes.OK {155 handler.OnReceiveResponse(resp)156 }157 handler.OnReceiveTrailers(stat, respTrailers)158 return nil159}160func invokeClientStream(ctx context.Context, stub grpcdynamic.Stub, md *desc.MethodDescriptor, handler InvocationEventHandler,161 requestData RequestSupplier, req proto.Message) error {162 // invoke the RPC!163 str, err := stub.InvokeRpcClientStream(ctx, md)164 // Upload each request message in the stream165 var resp proto.Message166 for err == nil {167 err = requestData(req)168 if err == io.EOF {169 resp, err = str.CloseAndReceive()170 break171 }172 if err != nil {173 return fmt.Errorf("error getting request data: %v", err)174 }175 err = str.SendMsg(req)176 if err == io.EOF {177 // We get EOF on send if the server says "go away"178 // We have to use CloseAndReceive to get the actual code179 resp, err = str.CloseAndReceive()180 break181 }182 req.Reset()183 }184 // finally, process response data185 stat, ok := status.FromError(err)186 if !ok {187 // Error codes sent from the server will get printed differently below.188 // So just bail for other kinds of errors here.189 return fmt.Errorf("grpc call for %q failed: %v", md.GetFullyQualifiedName(), err)190 }191 if respHeaders, err := str.Header(); err == nil {192 handler.OnReceiveHeaders(respHeaders)193 }194 if stat.Code() == codes.OK {195 handler.OnReceiveResponse(resp)196 }197 handler.OnReceiveTrailers(stat, str.Trailer())198 return nil199}200func invokeServerStream(ctx context.Context, stub grpcdynamic.Stub, md *desc.MethodDescriptor, handler InvocationEventHandler,201 requestData RequestSupplier, req proto.Message) error {202 err := requestData(req)203 if err != nil && err != io.EOF {204 return fmt.Errorf("error getting request data: %v", err)205 }206 if err != io.EOF {207 // verify there is no second message, which is a usage error208 err := requestData(req)209 if err == nil {210 return fmt.Errorf("method %q is a server-streaming RPC, but request data contained more than 1 message", md.GetFullyQualifiedName())211 } else if err != io.EOF {212 return fmt.Errorf("error getting request data: %v", err)213 }214 }215 // Now we can actually invoke the RPC!216 str, err := stub.InvokeRpcServerStream(ctx, md, req)217 if respHeaders, err := str.Header(); err == nil {218 handler.OnReceiveHeaders(respHeaders)219 }220 // Download each response message221 for err == nil {222 var resp proto.Message223 resp, err = str.RecvMsg()224 if err != nil {225 if err == io.EOF {226 err = nil227 }228 break229 }230 handler.OnReceiveResponse(resp)231 }232 stat, ok := status.FromError(err)233 if !ok {234 // Error codes sent from the server will get printed differently below.235 // So just bail for other kinds of errors here.236 return fmt.Errorf("grpc call for %q failed: %v", md.GetFullyQualifiedName(), err)237 }238 handler.OnReceiveTrailers(stat, str.Trailer())239 return nil240}241func invokeBidi(ctx context.Context, stub grpcdynamic.Stub, md *desc.MethodDescriptor, handler InvocationEventHandler,242 requestData RequestSupplier, req proto.Message) error {243 ctx, cancel := context.WithCancel(ctx)244 defer cancel()245 // invoke the RPC!246 str, err := stub.InvokeRpcBidiStream(ctx, md)247 var wg sync.WaitGroup248 var sendErr atomic.Value249 defer wg.Wait()250 if err == nil {251 wg.Add(1)252 go func() {253 defer wg.Done()254 // Concurrently upload each request message in the stream255 var err error256 for err == nil {257 err = requestData(req)258 if err == io.EOF {259 err = str.CloseSend()260 break261 }262 if err != nil {263 err = fmt.Errorf("error getting request data: %v", err)264 cancel()265 break266 }267 err = str.SendMsg(req)268 req.Reset()269 }270 if err != nil {271 sendErr.Store(err)272 }273 }()274 }275 if respHeaders, err := str.Header(); err == nil {276 handler.OnReceiveHeaders(respHeaders)277 }278 // Download each response message279 for err == nil {280 var resp proto.Message281 resp, err = str.RecvMsg()282 if err != nil {283 if err == io.EOF {284 err = nil285 }286 break287 }288 handler.OnReceiveResponse(resp)289 }290 if se, ok := sendErr.Load().(error); ok && se != io.EOF {...
handler.go
Source:handler.go
...76}77// OnSendHeaders is called with the request metadata that is being sent.78func (h *grpcHandler) OnSendHeaders(metadata.MD) {79}80// OnReceiveHeaders is called when response headers have been received.81func (h *grpcHandler) OnReceiveHeaders(md metadata.MD) {82}83func (h *grpcHandler) OnReceiveResponse(resp proto.Message) {84 h.ml.Lock()85 defer h.ml.Unlock()86 h.msgs = append(h.msgs, resp)87}88func (h *grpcHandler) OnReceiveTrailers(s *status.Status, meta metadata.MD) {89 h.code = s.Code()90 h.msg = s.Message()91 h.details = s.Details()92}93func extractBizMessage(details []interface{}) (code int, msg string) {94 for _, v := range details {95 if s, ok := v.(*gstatus.Status); ok {...
event.go
Source:event.go
...11 ResponseMd metadata.MD12 TrailersMd metadata.MD13}1415func (h *CustomEventHandler) OnReceiveHeaders(md metadata.MD) {16 h.DefaultEventHandler.OnReceiveHeaders(md)17 h.ResponseMd = md18}1920func (h *CustomEventHandler) OnReceiveTrailers(stat *status.Status, md metadata.MD) {21 h.DefaultEventHandler.OnReceiveTrailers(stat, md)22 h.TrailersMd = md23}
...
OnReceiveHeaders
Using AI Code Generation
1import (2const (3type server struct {4}5func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {6 log.Printf("Received: %v", in.GetName())7 return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil8}9func main() {10 lis, err := net.Listen("tcp", port)11 if err != nil {12 log.Fatalf("failed to listen: %v", err)13 }14 s := grpc.NewServer(grpc.UnaryInterceptor(unaryInterceptor))15 pb.RegisterGreeterServer(s, &server{})16 if err := s.Serve(lis); err != nil {17 log.Fatalf("failed to serve: %v", err)18 }19}20func unaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {21 md, ok := metadata.FromIncomingContext(ctx)22 if ok {23 log.Printf("md: %v", md)24 }25 p, ok := peer.FromContext(ctx)26 if ok {27 log.Printf("peer: %v", p)28 }29 t := time.Now()30 resp, err := handler(ctx, req)31 log.Printf("time: %v", time.Since(t))32}33import (34const (35type server struct {36}
OnReceiveHeaders
Using AI Code Generation
1import (2func main() {3 l, err := net.Listen("tcp", ":7777")4 if err != nil {5 log.Fatalf("failed to listen: %v", err)6 }7 s := grpc.NewServer(8 grpc_middleware.WithUnaryServerChain(9 grpc_ctxtags.UnaryServerInterceptor(grpc_ctxtags.WithFieldExtractor(grpc_zap.TagFieldExtractor)),10 grpc_logrus.UnaryServerInterceptor(logrus.NewEntry(logrus.New())),11 grpc_middleware.WithStreamServerChain(12 grpc_ctxtags.StreamServerInterceptor(grpc_ctxtags.WithFieldExtractor(grpc_zap.TagFieldExtractor)),13 grpc_logrus.StreamServerInterceptor(logrus.NewEntry(logrus.New())),14 log.Printf("Serving gRPC on %v", l.Addr())15 if err := s.Serve(l); err != nil {16 log.Fatalf("failed to serve: %v", err)17 }18}19import (20func main() {21 l, err := net.Listen("tcp", ":7777")
OnReceiveHeaders
Using AI Code Generation
1import (2type server struct{}3func (s *server) Foo(ctx context.Context, in *FooRequest) (*FooResponse, error) {4 fmt.Printf("Received: %v5", in.GetValue())6 return &FooResponse{Value: in.GetValue()}, nil7}8func main() {9 lis, err := net.Listen("tcp", ":50051")10 if err != nil {11 log.Fatalf("failed to listen: %v", err)12 }13 s := grpc.NewServer()14 RegisterFooServiceServer(s, &server{})15 log.Println("Server started")16 if err := s.Serve(lis); err != nil {17 log.Fatalf("failed to serve: %v", err)18 }19}20import (21func main() {22 conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())23 if err != nil {24 log.Fatalf("did not connect: %v", err)25 }26 defer conn.Close()27 c := NewFooServiceClient(conn)28 ctx, cancel := context.WithTimeout(context.Background(), time.Second)29 defer cancel()30 md := metadata.Pairs("custom-header-1", "custom-value-1")31 ctx = metadata.NewOutgoingContext(ctx, md)32 r, err := c.Foo(ctx, &FooRequest{Value: 1})33 if err != nil {34 log.Fatalf("could not greet: %v", err)35 }36 log.Printf("Greeting: %s", r.GetValue())37}38import (39func main() {40 conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())41 if err != nil {42 log.Fatalf("did not connect: %v", err)43 }44 defer conn.Close()45 c := NewFooServiceClient(conn)46 ctx, cancel := context.WithTimeout(context.Background(), time.Second)
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!