Best Keploy code snippet using graph.Mutation
StateMutationEngine.go
Source:StateMutationEngine.go
1/* StateMutationEngine.go: In many ways, the heart of Kraken, this engine manages state mutations.2 *3 * Author: J. Lowell Wofford <lowell@lanl.gov>4 *5 * This software is open source software available under the BSD-3 license.6 * Copyright (c) 2018, Triad National Security, LLC7 * See LICENSE file for details.8 */9package core10import (11 "encoding/json"12 "fmt"13 "reflect"14 "regexp"15 "sort"16 "strings"17 "sync"18 "time"19 pb "github.com/kraken-hpc/kraken/core/proto"20 ct "github.com/kraken-hpc/kraken/core/proto/customtypes"21 "github.com/kraken-hpc/kraken/lib/types"22 "github.com/kraken-hpc/kraken/lib/util"23)24///////////////////////25// Auxiliary Objects /26/////////////////////27const (28 MutationEvent_MUTATE pb.MutationControl_Type = pb.MutationControl_MUTATE29 MutationEvent_INTERRUPT pb.MutationControl_Type = pb.MutationControl_INTERRUPT30)31var MutationEventString = map[pb.MutationControl_Type]string{32 MutationEvent_MUTATE: "MUTATE",33 MutationEvent_INTERRUPT: "INTERRUPT",34}35type MutationEvent struct {36 Type pb.MutationControl_Type37 // strictly speaking, we may only need the Cfg38 // but we generally have this info on hand anyway39 NodeCfg types.Node40 NodeDsc types.Node41 Mutation [2]string // [0] = module, [1] = mutid42}43func (me *MutationEvent) String() string {44 return fmt.Sprintf("(%s) %s : %s -> %s", MutationEventString[me.Type], me.NodeCfg.ID().String(), me.Mutation[0], me.Mutation[1])45}46type mutationEdge struct {47 cost uint3248 mut types.StateMutation49 from *mutationNode50 to *mutationNode51}52// consider equal if mut is the same (pointer), and to and from are the same53func (m *mutationEdge) Equal(b *mutationEdge) bool {54 if m.to != b.to {55 return false56 }57 if m.from != b.from {58 return false59 }60 if m.mut != b.mut {61 return false62 }63 return true64}65type mutationNode struct {66 spec types.StateSpec // spec with aggregated require/excludes67 in []*mutationEdge68 out []*mutationEdge69}70type mutationPath struct {71 mutex *sync.Mutex72 cur int // where are we currently?73 cmplt bool74 // curSeen is a slice of URLs that we've seen (correct) changes in the current mut75 // This is important to keep track of muts that change more than one URL76 curSeen []string77 start types.Node78 end types.Node79 gstart *mutationNode80 gend *mutationNode81 chain []*mutationEdge82 timer *time.Timer83 waitingFor string // the SI we're currently waiting for84}85// sme tests to see if a new path is really the same as an existing one86// same tests if one of the following is true:87// 1) these paths are the same88// 2) sub is a subpath of mp and:89// a) subpath starts at mp.cur90// b) subpath ends at mp.gend91func (mp *mutationPath) same(sub *mutationPath) bool {92 mp.mutex.Lock()93 defer mp.mutex.Unlock()94 sub.mutex.Lock()95 defer sub.mutex.Unlock()96 if sub.gend != mp.gend {97 // this must be true in either case98 return false99 }100 if sub.gstart == mp.gstart {101 return true102 }103 if mp.chain[mp.cur].from == sub.gstart {104 return true105 }106 return false107}108// alreadyFired tests to see if the curren mp has already fired our first mutation for new mp109func (mp *mutationPath) alreadyFired(nmp *mutationPath) bool {110 // generall nmp.cur == 0, but no reason not to make this more generic111 return mp.chain[mp.cur].mut == nmp.chain[nmp.cur].mut112}113// DefaultRootSpec provides a sensible root StateSpec to build the mutation graph off of114func DefaultRootSpec() types.StateSpec {115 return NewStateSpec(map[string]reflect.Value{"/PhysState": reflect.ValueOf(pb.Node_PHYS_UNKNOWN)}, map[string]reflect.Value{})116}117////////////////////////////////118// StateMutationEngine Object /119//////////////////////////////120var _ types.StateMutationEngine = (*StateMutationEngine)(nil)121// A StateMutationEngine listens for state change events and manages mutations to evolve Dsc state into Cfg state122type StateMutationEngine struct {123 muts []types.StateMutation124 mutResolver map[types.StateMutation][2]string // this allows us to lookup module/id pair from mutation125 // stuff we can compute from muts126 mutators map[string]uint32 // ref count, all URLs that mutate127 requires map[string]uint32 // ref count, referenced (req/exc) urls that don't mutate128 deps map[string]types.StateSpec129 graph *mutationNode // graph start130 graphMutex *sync.RWMutex131 nodes []*mutationNode // so we can search for matches132 edges []*mutationEdge133 em *EventEmitter134 qc chan types.Query135 schan chan<- types.EventListener // subscription channel136 echan chan types.Event137 sichan chan types.Event138 selist *EventListener139 silist *EventListener140 run bool // are we running?141 active map[string]*mutationPath // active mutations142 waiting map[string][]*mutationPath143 activeMutex *sync.Mutex // active (and waiting) needs some synchronization, or we can get in bad places144 query *QueryEngine145 log types.Logger146 self types.NodeID147 root types.StateSpec148 freeze bool149}150// NewStateMutationEngine creates an initialized StateMutationEngine151func NewStateMutationEngine(ctx Context, qc chan types.Query) *StateMutationEngine {152 sme := &StateMutationEngine{153 muts: []types.StateMutation{},154 mutResolver: make(map[types.StateMutation][2]string),155 active: make(map[string]*mutationPath),156 waiting: make(map[string][]*mutationPath),157 activeMutex: &sync.Mutex{},158 mutators: make(map[string]uint32),159 requires: make(map[string]uint32),160 deps: make(map[string]types.StateSpec),161 graph: &mutationNode{spec: ctx.SME.RootSpec},162 graphMutex: &sync.RWMutex{},163 nodes: []*mutationNode{},164 edges: []*mutationEdge{},165 em: NewEventEmitter(types.Event_STATE_MUTATION),166 qc: qc,167 run: false,168 echan: make(chan types.Event),169 sichan: make(chan types.Event),170 query: &ctx.Query,171 schan: ctx.SubChan,172 log: &ctx.Logger,173 self: ctx.Self,174 root: ctx.SME.RootSpec,175 freeze: true,176 }177 sme.log.SetModule("StateMutationEngine")178 return sme179}180// RegisterMutation injects new mutaitons into the SME. muts[i] should match callback[i]181// We take a list so that we only call onUpdate once182// LOCKS: graphMutex (RW)183func (sme *StateMutationEngine) RegisterMutation(si, id string, mut types.StateMutation) (e error) {184 sme.graphMutex.Lock()185 sme.muts = append(sme.muts, mut)186 sme.mutResolver[mut] = [2]string{si, id}187 sme.graphMutex.Unlock()188 sme.onUpdate()189 return190}191// NodeMatch determines how many compatable StateSpecs this node has in the graph192func (sme *StateMutationEngine) NodeMatch(node types.Node) (i int) {193 ns := sme.nodeSearch(node)194 sme.Logf(DEBUG, "===\nNode:\n%v\n", string(node.JSON()))195 sme.Log(DEBUG, "Matched:\n")196 for _, m := range ns {197 sme.Logf(DEBUG, "Spec:\nreq: %v\nexc: %v\n", m.spec.Requires(), m.spec.Excludes())198 }199 return len(sme.nodeSearch(node))200}201func (sme *StateMutationEngine) dumpMapOfValues(m map[string]reflect.Value) (s string) {202 for k := range m {203 s += fmt.Sprintf("%s: %s, ", k, util.ValueToString(m[k]))204 }205 return206}207func (sme *StateMutationEngine) dumpMutMap(m map[string][2]reflect.Value) (s string) {208 for k := range m {209 s += fmt.Sprintf("%s: %s -> %s, ", k, util.ValueToString(m[k][0]), util.ValueToString(m[k][1]))210 }211 return212}213// DumpGraph FIXME: REMOVE -- for debugging214// LOCKS: graphMutex (R)215func (sme *StateMutationEngine) DumpGraph() {216 sme.graphMutex.RLock()217 fmt.Printf("\n")218 fmt.Printf("=== START: Mutators URLs ===\n")219 for k, v := range sme.mutators {220 fmt.Printf("%s: %d\n", k, v)221 }222 fmt.Printf("=== END: Mutators URLs ===\n")223 fmt.Printf("=== START: Requires URLs ===\n")224 for k, v := range sme.requires {225 fmt.Printf("%s: %d\n", k, v)226 }227 fmt.Printf("=== END: Requires URLs ===\n")228 fmt.Printf("\n=== START: Node list ===\n")229 for _, m := range sme.nodes {230 fmt.Printf(`231 Node: %p232 Spec: %p233 req: %s234 exc: %s235 In: %v236 Out: %v237 `, m, m.spec, sme.dumpMapOfValues(m.spec.Requires()), sme.dumpMapOfValues(m.spec.Excludes()), m.in, m.out)238 }239 fmt.Printf("\n=== END: Node list ===\n")240 fmt.Printf("\n=== START: Edge list ===\n")241 for _, m := range sme.edges {242 fmt.Printf(`243 Edge: %p244 Mutation: %p245 mut: %s246 req: %s247 exc: %s248 From: %p249 To: %p250 `, m, m.mut, sme.dumpMutMap(m.mut.Mutates()), sme.dumpMapOfValues(m.mut.Requires()), sme.dumpMapOfValues(m.mut.Excludes()), m.from, m.to)251 }252 fmt.Printf("\n=== END: Edge list ===\n")253 sme.graphMutex.RUnlock()254}255// DumpJSONGraph for debugging the graph256// !!!IMPORTANT!!!257// DumpJSONGraph assumes you already hold a lock258func (sme *StateMutationEngine) DumpJSONGraph(nodes []*mutationNode, edges []*mutationEdge) {259 nl := mutationNodesToProto(nodes)260 el := mutationEdgesToProto(edges)261 graph := struct {262 Nodes []*pb.MutationNode `json:"nodes"`263 Edges []*pb.MutationEdge `json:"edges"`264 }{265 Nodes: nl.MutationNodeList,266 Edges: el.MutationEdgeList,267 }268 jsonGraph, e := json.Marshal(graph)269 if e != nil {270 fmt.Printf("error getting json graph\n")271 return272 }273 fmt.Printf("JSON Graph: \n%v\n", string(jsonGraph))274}275// Converts a slice of sme mutation nodes to a protobuf MutationNodeList276func mutationNodesToProto(nodes []*mutationNode) (r pb.MutationNodeList) {277 for _, mn := range nodes {278 var nmn pb.MutationNode279 nmn.Id = fmt.Sprintf("%p", mn)280 label := ""281 var reqKeys []string282 var excKeys []string283 var reqs = mn.spec.Requires()284 for k := range reqs {285 reqKeys = append(reqKeys, k)286 }287 sort.Strings(reqKeys)288 var excs = mn.spec.Excludes()289 for k := range excs {290 excKeys = append(excKeys, k)291 }292 sort.Strings(excKeys)293 for _, reqKey := range reqKeys {294 reqValue := reqs[reqKey]295 // Add req to label296 trimKey := strings.Replace(reqKey, "type.googleapis.com", "", -1)297 trimKey = strings.Replace(trimKey, "/", "", -1)298 if label == "" {299 label = fmt.Sprintf("%s: %s", trimKey, util.ValueToString(reqValue))300 } else {301 label = fmt.Sprintf("%s\n%s: %s", label, trimKey, util.ValueToString(reqValue))302 }303 }304 for _, excKey := range excKeys {305 excValue := excs[excKey]306 // Add req to label307 trimKey := strings.Replace(excKey, "type.googleapis.com", "", -1)308 trimKey = strings.Replace(trimKey, "/", "", -1)309 if label == "" {310 label = fmt.Sprintf("%s: !%s", trimKey, util.ValueToString(excValue))311 } else {312 label = fmt.Sprintf("%s\n%s: !%s", label, trimKey, util.ValueToString(excValue))313 }314 }315 nmn.Label = label316 r.MutationNodeList = append(r.MutationNodeList, &nmn)317 }318 return319}320// Converts a slice of sme mutation edges to a protobuf MutationEdgeList321func mutationEdgesToProto(edges []*mutationEdge) (r pb.MutationEdgeList) {322 for _, me := range edges {323 var nme pb.MutationEdge324 nme.From = fmt.Sprintf("%p", me.from)325 nme.To = fmt.Sprintf("%p", me.to)326 nme.Id = fmt.Sprintf("%p", me)327 r.MutationEdgeList = append(r.MutationEdgeList, &nme)328 }329 return330}331// Converts an sme mutation path to a protobuf MutationPath332// LOCKS: path.mutex333func mutationPathToProto(path *mutationPath) (r pb.MutationPath, e error) {334 path.mutex.Lock()335 defer path.mutex.Unlock()336 if path != nil {337 r.Cur = int64(path.cur)338 r.Cmplt = path.cmplt339 for _, me := range path.chain {340 var nme pb.MutationEdge341 nme.From = fmt.Sprintf("%p", me.from)342 nme.To = fmt.Sprintf("%p", me.to)343 nme.Id = fmt.Sprintf("%p", me)344 r.Chain = append(r.Chain, &nme)345 }346 } else {347 e = fmt.Errorf("Mutation path is nil")348 }349 return350}351// Returns the mutation nodes that have correlating reqs and execs for a given nodeID352// LOCKS: activeMutex; path.mutex353func (sme *StateMutationEngine) filterMutNodesFromNode(n ct.NodeID) (r []*mutationNode, e error) {354 // Get node from path355 sme.activeMutex.Lock()356 mp := sme.active[n.String()]357 sme.activeMutex.Unlock()358 if mp != nil {359 mp.mutex.Lock()360 node := mp.end361 mp.mutex.Unlock()362 // Combine discoverables and mutators into discoverables map363 discoverables := make(map[string]string)364 for _, siMap := range Registry.Discoverables {365 for key := range siMap {366 discoverables[key] = ""367 }368 }369 sme.graphMutex.RLock()370 for key := range sme.mutators {371 discoverables[key] = ""372 }373 sme.graphMutex.RUnlock()374 filteredNodes := make(map[*mutationNode]string)375 for _, mn := range sme.nodes {376 filteredNodes[mn] = ""377 for reqKey, reqVal := range mn.spec.Requires() {378 // if reqkey is not in discoverables379 if _, ok := discoverables[reqKey]; !ok {380 // if physical node has the reqkey as a value, check if it doesn't match381 if nodeVal, err := node.GetValue(reqKey); err == nil {382 // if it doesn't match, remove mn from final nodes383 if nodeVal.String() != reqVal.String() {384 delete(filteredNodes, mn)385 }386 }387 }388 }389 for excKey, excVal := range mn.spec.Excludes() {390 // if excKey is in discoverables, move on391 if _, ok := discoverables[excKey]; ok {392 break393 }394 // if physical node has the exckey as a value, check if it does match395 if nodeVal, err := node.GetValue(excKey); err == nil {396 // if it doesn't match, remove mn from final nodes397 if nodeVal == excVal {398 delete(filteredNodes, mn)399 }400 }401 }402 }403 for mn := range filteredNodes {404 r = append(r, mn)405 }406 sme.Logf(DDDEBUG, "Final filtered nodes from SME: %v", r)407 } else {408 e = fmt.Errorf("Can't get node info because mutation path is nil")409 }410 return411}412// Returns the mutation edges that match the filtered nodes from filterMutNodesFromNode413// LOCKS: activeMutex via filterMutNodesFromNode; path.mutex via filterMutNodesFromNode414func (sme *StateMutationEngine) filterMutEdgesFromNode(n ct.NodeID) (r []*mutationEdge, e error) {415 nodes, e := sme.filterMutNodesFromNode(n)416 filteredEdges := make(map[*mutationEdge]string)417 for _, mn := range nodes {418 for _, me := range mn.in {419 filteredEdges[me] = ""420 }421 for _, me := range mn.out {422 filteredEdges[me] = ""423 }424 }425 for me := range filteredEdges {426 r = append(r, me)427 }428 return429}430// PathExists returns a boolean indicating whether or not a path exists in the graph between two nodes.431// If the path doesn't exist, it also returns the error.432// LOCKS: graphMutex (R) via findPath433func (sme *StateMutationEngine) PathExists(start types.Node, end types.Node) (r bool, e error) {434 p, e := sme.findPath(start, end)435 if p != nil {436 r = true437 }438 return439}440// goroutine441func (sme *StateMutationEngine) sendQueryResponse(qr types.QueryResponse, r chan<- types.QueryResponse) {442 r <- qr443}444// QueryChan returns a chanel that Queries can be sent on445func (sme *StateMutationEngine) QueryChan() chan<- types.Query {446 return sme.qc447}448// Run is a goroutine that listens for state changes and performs StateMutation magic449// LOCKS: all450func (sme *StateMutationEngine) Run(ready chan<- interface{}) {451 // on run we import all mutations in the registry452 sme.graphMutex.Lock()453 for mod := range Registry.Mutations {454 for id, mut := range Registry.Mutations[mod] {455 sme.muts = append(sme.muts, mut)456 sme.mutResolver[mut] = [2]string{mod, id}457 }458 }459 sme.graphMutex.Unlock()460 sme.onUpdate()461 if sme.GetLoggerLevel() >= DDEBUG {462 sme.DumpGraph() // Use this to debug your graph463 sme.graphMutex.RLock()464 sme.DumpJSONGraph(sme.nodes, sme.edges) // Use this to debug your graph465 sme.graphMutex.RUnlock()466 }467 // create a listener for state change events we care about468 sme.selist = NewEventListener(469 "StateMutationEngine",470 types.Event_STATE_CHANGE,471 func(v types.Event) bool {472 _, url := util.NodeURLSplit(v.URL())473 sme.graphMutex.RLock()474 defer sme.graphMutex.RUnlock()475 for m := range sme.mutators {476 if url == m {477 return true478 }479 }480 for m := range sme.requires {481 if url == m {482 return true483 }484 }485 if url == "" { // this should mean we got CREATE/DELETE486 return true487 }488 return false489 },490 func(v types.Event) error { return ChanSender(v, sme.echan) })491 // subscribe our listener492 sme.schan <- sme.selist493 smurl := regexp.MustCompile(`^\/?Services\/`)494 sme.silist = NewEventListener(495 "StateMutationEngine-SI",496 types.Event_STATE_CHANGE,497 func(v types.Event) bool {498 node, url := util.NodeURLSplit(v.URL())499 if !ct.NewNodeID(node).EqualTo(sme.self) {500 return false501 }502 if smurl.MatchString(url) {503 return true504 }505 return false506 },507 func(v types.Event) error { return ChanSender(v, sme.sichan) },508 )509 sme.schan <- sme.silist510 debugchan := make(chan interface{})511 if sme.GetLoggerLevel() >= DDEBUG {512 go func() {513 for {514 time.Sleep(10 * time.Second)515 debugchan <- nil516 }517 }()518 }519 ready <- nil520 for {521 select {522 case q := <-sme.qc:523 switch q.Type() {524 case types.Query_MUTATIONNODES:525 _, u := util.NodeURLSplit(q.URL())526 var e error527 // If url is empty then assume we want all mutation nodes528 if u == "" {529 sme.graphMutex.RLock()530 v := mutationNodesToProto(sme.nodes)531 sme.graphMutex.RUnlock()532 go sme.sendQueryResponse(NewQueryResponse(533 []reflect.Value{reflect.ValueOf(v)}, e), q.ResponseChan())534 } else {535 n := ct.NewNodeIDFromURL(q.URL())536 sme.graphMutex.RLock()537 fmn, e := sme.filterMutNodesFromNode(*n)538 mnl := mutationNodesToProto(fmn)539 sme.graphMutex.RUnlock()540 go sme.sendQueryResponse(NewQueryResponse(541 []reflect.Value{reflect.ValueOf(mnl)}, e), q.ResponseChan())542 }543 break544 case types.Query_MUTATIONEDGES:545 _, u := util.NodeURLSplit(q.URL())546 var e error547 // If url is empty then assume we want all mutation edges548 if u == "" {549 sme.graphMutex.RLock()550 v := mutationEdgesToProto(sme.edges)551 sme.graphMutex.RUnlock()552 go sme.sendQueryResponse(NewQueryResponse(553 []reflect.Value{reflect.ValueOf(v)}, e), q.ResponseChan())554 } else {555 n := ct.NewNodeIDFromURL(q.URL())556 sme.graphMutex.RLock()557 fme, e := sme.filterMutEdgesFromNode(*n)558 mel := mutationEdgesToProto(fme)559 sme.graphMutex.RUnlock()560 go sme.sendQueryResponse(NewQueryResponse(561 []reflect.Value{reflect.ValueOf(mel)}, e), q.ResponseChan())562 }563 break564 case types.Query_MUTATIONPATH:565 n := ct.NewNodeIDFromURL(q.URL())566 sme.activeMutex.Lock()567 mp := sme.active[n.String()]568 sme.activeMutex.Unlock()569 pmp, e := mutationPathToProto(mp)570 go sme.sendQueryResponse(NewQueryResponse(571 []reflect.Value{reflect.ValueOf(pmp)}, e), q.ResponseChan())572 break573 case types.Query_FREEZE:574 sme.Freeze()575 if sme.Frozen() {576 go sme.sendQueryResponse(NewQueryResponse(577 []reflect.Value{}, nil), q.ResponseChan())578 } else {579 e := fmt.Errorf("sme failed to freeze")580 go sme.sendQueryResponse(NewQueryResponse(581 []reflect.Value{}, e), q.ResponseChan())582 }583 break584 case types.Query_THAW:585 sme.Thaw()586 if !sme.Frozen() {587 go sme.sendQueryResponse(NewQueryResponse(588 []reflect.Value{}, nil), q.ResponseChan())589 } else {590 e := fmt.Errorf("sme failed to thaw")591 go sme.sendQueryResponse(NewQueryResponse(592 []reflect.Value{}, e), q.ResponseChan())593 }594 break595 case types.Query_FROZEN:596 f := sme.Frozen()597 go sme.sendQueryResponse(NewQueryResponse(598 []reflect.Value{reflect.ValueOf(f)}, nil), q.ResponseChan())599 break600 default:601 sme.Logf(DEBUG, "unsupported query type: %d", q.Type())602 }603 break604 case v := <-sme.echan:605 // FIXME: event processing can be expensive;606 // we should make them concurrent with a queue607 if !sme.Frozen() {608 sme.handleEvent(v)609 }610 break611 case v := <-sme.sichan:612 // Got a service change613 sme.handleServiceEvent(v.Data().(*StateChangeEvent))614 case <-debugchan:615 sme.Logf(DDEBUG, "There are %d active mutations.", len(sme.active))616 break617 }618 }619}620func (sme *StateMutationEngine) Frozen() bool {621 sme.activeMutex.Lock()622 defer sme.activeMutex.Unlock()623 return sme.freeze624}625func (sme *StateMutationEngine) Freeze() {626 sme.Log(INFO, "freezing")627 sme.activeMutex.Lock()628 sme.freeze = true629 sme.activeMutex.Unlock()630}631func (sme *StateMutationEngine) Thaw() {632 sme.Log(INFO, "thawing")633 sme.activeMutex.Lock()634 sme.active = make(map[string]*mutationPath)635 sme.freeze = false636 sme.activeMutex.Unlock()637 ns, _ := sme.query.ReadAll()638 for _, n := range ns {639 sme.startNewMutation(n.ID().String())640 }641}642////////////////////////643// Unexported methods /644//////////////////////645// !!!IMPORTANT!!!646// collectURLs assumes you already hold a lock647// currently only used in onUpdate648func (sme *StateMutationEngine) collectURLs() {649 for _, m := range sme.muts {650 for u := range m.Mutates() {651 if _, ok := sme.mutators[u]; !ok {652 sme.mutators[u] = 0653 }654 sme.mutators[u]++655 }656 }657 // We do this as a separate loop because we don't want mutators in requires658 for _, m := range sme.muts {659 for u := range m.Requires() {660 if _, ok := sme.mutators[u]; ok {661 //skip if we've already registered as a mutator662 continue663 }664 if _, ok := sme.requires[u]; !ok {665 sme.requires[u] = 0666 }667 sme.requires[u]++668 }669 // sme.requires is a bit of a misnomer.670 // really we're interested in any url we depend on to asses, including excludes.671 for u := range m.Excludes() {672 if _, ok := sme.mutators[u]; ok {673 //skip if we've already registered as a mutator674 continue675 }676 if _, ok := sme.requires[u]; !ok {677 sme.requires[u] = 0678 }679 sme.requires[u]++680 }681 }682}683func (sme *StateMutationEngine) remapToNode(root *mutationNode, to *mutationNode, reqsOnly bool) []*mutationEdge {684 var mutEqual func(*mutationEdge, *mutationEdge) bool685 // if reqsOnly we consider nodes the same if they have the same requirements686 if reqsOnly {687 mutEqual = func(a *mutationEdge, b *mutationEdge) bool {688 if a.mut != b.mut {689 return false690 }691 if !a.from.spec.ReqsEqual(b.from.spec) {692 return false693 }694 if !a.to.spec.ReqsEqual(b.to.spec) {695 return false696 }697 return true698 }699 } else {700 mutEqual = func(a *mutationEdge, b *mutationEdge) bool { return a.Equal(b) }701 }702 inSlice := func(s []*mutationEdge, n *mutationEdge) bool {703 for _, mn := range s {704 if mutEqual(mn, n) {705 return true706 }707 }708 return false709 }710 rmEdges := []*mutationEdge{}711 // we perform a union on in/out712 for _, in := range root.in {713 in.to = to714 if !inSlice(to.in, in) {715 to.in = append(to.in, in)716 } else {717 rmEdges = append(rmEdges, in)718 // make sure the tail is cleared719 for i, v := range in.from.out {720 if v == in {721 in.from.out = append(in.from.out[:i], in.from.out[i+1:]...)722 }723 }724 }725 }726 for _, out := range root.out {727 out.from = to728 if !inSlice(to.out, out) {729 to.out = append(to.out, out)730 } else {731 // make sure the head is cleared732 for i, v := range out.to.in {733 if v == out {734 out.to.in = append(out.to.in[:i], out.to.in[i+1:]...)735 }736 }737 rmEdges = append(rmEdges, out)738 }739 }740 return rmEdges741}742func nodeMerge(list []int, nodes []*mutationNode) (*mutationNode, []*mutationEdge) {743 // build a new node from a merge of multiple nodes744 // use least-common specification745 // note: this will choke on a zero length list, but that shouldn't happen746 node := nodes[list[0]] // build off of the first node747 for i := 1; i < len(list); i++ {748 // remap edges749 inode := nodes[list[i]]750 for _, e := range inode.in {751 if !edgeInEdges(e, node.in) { // don't add if we already have this752 e.to = node753 node.in = append(node.in, e)754 }755 }756 for _, e := range inode.out {757 if !edgeInEdges(e, node.out) {758 e.from = node759 node.out = append(node.out, e)760 }761 }762 // prune spec763 node.spec.LeastCommon(inode.spec)764 }765 return node, node.out766}767// buildGraphStage1 builds a fully expanded set of paths branching from a starting root768// this will build a very verbose, probably unusable graph769// it is expected that later graph stages will clean it up and make it more sane770// root: current node, edge: edge that got us here (or nil), seenNodes: map of nodes we've seen771func (sme *StateMutationEngine) buildGraphStage1(root *mutationNode, edge *mutationEdge, seenNodes map[types.StateSpec]*mutationNode) ([]*mutationNode, []*mutationEdge) {772 // for this algorithm, it just complicates things to track nodes & edges at the same time. We build the edge list at the end773 nodes := []*mutationNode{}774 edges := []*mutationEdge{}775 // is this node equal to one we've already seen?776 for sp, n := range seenNodes {777 if sp.Equal(root.spec) {778 // yes, we've seen this node, so we're done processing this chain. Merge the nodes.779 dead := sme.remapToNode(root, n, false)780 if len(dead) != 0 {781 fmt.Printf("dead edges was %d, expected 0!", len(dead))782 }783 return nodes, edges784 }785 }786 nodes = append(nodes, root)787 seenNodes[root.spec] = root788 // find connecting mutations789OUTER:790 for _, m := range sme.muts {791 // do we have a valid out arrow?792 if m.SpecCompatOut(root.spec, sme.mutators) {793 // m is a valid out arrow, create an edge for the out arrow794 // first, do we already know about this one?795 for _, edge := range root.out {796 if m == edge.mut {797 continue OUTER798 }799 }800 newEdge := &mutationEdge{801 cost: 1,802 mut: m,803 from: root,804 }805 // ...and construct the new node that it connects to806 newNode := &mutationNode{807 spec: root.spec.SpecMergeMust(m.After()), // a combination of the current spec + the changes the mation creates808 in: []*mutationEdge{newEdge}, // we know we have this in at least809 out: []*mutationEdge{}, // for now, out is empty810 }811 newNode.spec.StripZeros()812 newEdge.to = newNode813 root.out = append(root.out, newEdge)814 // ready to recurse815 ns, _ := sme.buildGraphStage1(newNode, newEdge, seenNodes)816 nodes = append(nodes, ns...)817 } else if m.SpecCompatIn(root.spec, sme.mutators) {818 // note: it doesn't make sense for the same mutator to be both in/out. This would be a meaningless nop.819 // m is a valid in arrow, similar to out with some subtle differences820 for _, edge := range root.in {821 if m == edge.mut {822 continue OUTER823 }824 }825 newEdge := &mutationEdge{826 cost: 1,827 mut: m,828 to: root,829 }830 newNode := &mutationNode{831 spec: root.spec.SpecMergeMust(m.Before()),832 in: []*mutationEdge{},833 out: []*mutationEdge{newEdge},834 }835 newNode.spec.StripZeros()836 newEdge.from = newNode837 root.in = append(root.in, newEdge)838 ns, _ := sme.buildGraphStage1(newNode, newEdge, seenNodes)839 nodes = append(nodes, ns...)840 }841 }842 // build edges list843 for _, n := range nodes {844 edges = append(edges, n.out...)845 }846 return nodes, edges847}848// some useful util functions for graph building849func edgeInEdges(m *mutationEdge, es []*mutationEdge) bool {850 for _, e := range es {851 if m.Equal(e) {852 return true853 }854 }855 return false856}857func mutInEdges(m *mutationEdge, es []*mutationEdge) bool {858 for _, e := range es {859 if m.mut == e.mut {860 return true861 }862 }863 return false864}865// buildGraphReduceNodes remaps any nodes with identical edges to be the same node866// This is currently unused but may be used as a component of subgraph creation in the future867func (sme *StateMutationEngine) buildGraphReduceNodes(nodes []*mutationNode, edges []*mutationEdge) ([]*mutationNode, []*mutationEdge) {868 // some tests we'll use869 nodeEqual := func(a *mutationNode, b *mutationNode) bool {870 if len(a.in) != len(b.in) || len(a.out) != len(b.out) {871 return false872 }873 for _, e := range a.in {874 if !mutInEdges(e, b.in) {875 return false876 }877 }878 for _, e := range a.out {879 if !mutInEdges(e, b.out) {880 return false881 }882 }883 return true884 }885 mergeList := [][]int{}886 merged := map[int]bool{}887 for i := range nodes {888 if _, ok := merged[i]; ok { // skip if already merged889 continue890 }891 list := []int{i}892 for j := i + 1; j < len(nodes); j++ {893 if _, ok := merged[j]; ok { // skip if already merged894 continue895 }896 if nodeEqual(nodes[i], nodes[j]) {897 list = append(list, j)898 merged[j] = true899 }900 }901 mergeList = append(mergeList, list)902 }903 newNodes := []*mutationNode{}904 newEdges := []*mutationEdge{}905 for _, list := range mergeList {906 n, e := nodeMerge(list, nodes)907 newNodes = append(newNodes, n)908 newEdges = append(newEdges, e...)909 }910 return newNodes, newEdges911}912// buildGraphDiscoverDepends calculates the dependencies of discoveries (any mutation that goes from zero to non-zero)913// it returns a map[state_url]spec_of_dependencies914func (sme *StateMutationEngine) buildGraphDiscoverDepends(edges []*mutationEdge) map[string]types.StateSpec {915 clone := func(from map[string]reflect.Value) (to map[string]reflect.Value) {916 to = make(map[string]reflect.Value)917 for k, v := range from {918 to[k] = v919 }920 return921 }922 isDiscoverFor := func(u string, e *mutationEdge) bool {923 for mu, mvs := range e.mut.Mutates() {924 if mu == u && mvs[0].IsZero() {925 return true926 }927 }928 return false929 }930 deps := make(map[string]types.StateSpec)931 for url := range sme.mutators { // do we need to cast a wider net than mutators?932 var spec types.StateSpec933 for _, e := range edges { // find edges934 if isDiscoverFor(url, e) { // this is one of our discovery edges935 if spec == nil { // we need to start with a new, but populated spec936 reqs := clone(e.from.spec.Requires())937 excs := make(map[string]reflect.Value) // stripping states based on excludes can have some strange results...938 // we can't require something of our own url939 // anything co-mutating should have mutation target as a requires940 // should we also remove non-discoverables?941 // FIXME is this correct?942 for k, v := range e.mut.Mutates() {943 if k == url {944 delete(reqs, k)945 } else {946 reqs[k] = v[1]947 }948 }949 delete(excs, url)950 spec = NewStateSpec(reqs, excs)951 } else {952 spec.LeastCommon(e.from.spec)953 }954 }955 }956 if spec == nil {957 sme.Logf(types.LLERROR, "failed to get discovery dependencies for: %s", url)958 continue959 }960 deps[url] = spec961 }962 return deps963}964func (sme *StateMutationEngine) nodeViolatesDeps(deps map[string]types.StateSpec, node *mutationNode) (reqs []string, excs []string) {965 // excludes don't really make sense in this context966 for u, v := range node.spec.Requires() {967 if ds, ok := deps[u]; ok { // this is an url with dependencies968 for req, val := range ds.Requires() { // for each requirement of this dep969 if nval, ok := node.spec.Requires()[req]; ok { // if the requirement is set970 if nval.Interface() != val.Interface() { // if the values aren't equal971 reqs = append(reqs, u) // we can't know this url in this node because a requirement is unequal972 sme.Logf(types.LLDDEBUG, "strip %s == %s because %s %s != %s\n", u, util.ValueToString(v), req, util.ValueToString(nval), util.ValueToString(val))973 }974 } else {975 reqs = append(reqs, u) // we can't know this url in this node because a requirement wasn't set976 sme.Logf(types.LLDDEBUG, "strip %s == %s because req %s missing\n", u, util.ValueToString(v), req)977 }978 }979 for exc, val := range ds.Excludes() { // for each exclude of this dep980 if nval, ok := node.spec.Requires()[exc]; ok { // if the exclude is set981 if nval.Interface() == val.Interface() {982 reqs = append(reqs, u) // we can't know this url because an exclude is violated983 sme.Logf(types.LLDDEBUG, "strip %s == %s because %s %s == %s\n", u, util.ValueToString(v), exc, util.ValueToString(nval), util.ValueToString(val))984 }985 }986 }987 }988 }989 return990}991func (sme *StateMutationEngine) nodeForgetUnknowable(cfg, dsc types.Node) {992 meld := sme.dscNodeMeld(cfg, dsc)993 ms := []string{}994 for k := range sme.mutators {995 ms = append(ms, k)996 }997 for k := range sme.requires {998 ms = append(ms, k)999 }1000 reqs, _ := meld.GetValues(ms)1001 nn := &mutationNode{1002 spec: NewStateSpec(reqs, map[string]reflect.Value{}),1003 }1004 violations, _ := sme.nodeViolatesDeps(sme.deps, nn)1005 sme.Logf(DEBUG, "%s forgetting unknowable values: %v", cfg.ID().String(), violations)1006 for _, r := range violations {1007 v, _ := dsc.GetValue(r)1008 sme.query.SetValueDsc(util.NodeURLJoin(cfg.ID().String(), r), reflect.Zero(v.Type()))1009 }1010 // finally, deal with one special case for services: we can't know service states if we're not in SYNC1011 v, _ := dsc.GetValue("/RunState")1012 if v.Interface() != pb.Node_SYNC {1013 for _, s := range cfg.GetServiceIDs() {1014 url := ""1015 for _, u := range []string{"/Services", s, "State"} {1016 url = util.URLPush(url, u)1017 }1018 sme.query.SetValueDsc(util.NodeURLJoin(cfg.ID().String(), url), reflect.ValueOf(pb.ServiceInstance_UNKNOWN))1019 }1020 }1021}1022func (sme *StateMutationEngine) printDeps(deps map[string]types.StateSpec) {1023 // print some nice log messages documenting our dependencies1024 for u := range deps {1025 msg := fmt.Sprintf("Dependencies for %s: ", u)1026 msg += "requires ("1027 for k, v := range deps[u].Requires() {1028 msg += fmt.Sprintf("%s == %s , ", k, util.ValueToString(v))1029 }1030 msg += "), excludes ("1031 for k, v := range deps[u].Excludes() {1032 msg += fmt.Sprintf("%s == %s , ", k, util.ValueToString(v))1033 }1034 msg += ")"1035 sme.Log(types.LLDEBUG, msg)1036 }1037}1038func (sme *StateMutationEngine) graphIsSane(nodes []*mutationNode, edges []*mutationEdge) bool {1039 ret := true1040 nodeEdges := map[*mutationEdge]uint{}1041 edgeNodes := map[*mutationNode]uint{}1042 for _, n := range nodes {1043 for _, e := range n.in {1044 nodeEdges[e]++1045 }1046 for _, e := range n.out {1047 nodeEdges[e]++1048 }1049 }1050 for _, e := range edges {1051 edgeNodes[e.from]++1052 edgeNodes[e.to]++1053 }1054 fmt.Println("=== BEGIN: Graph sanity check ===")1055 // sanity checks1056 // 1. Equal number of nodeEdges as edges?1057 if len(nodeEdges) != len(edges) {1058 fmt.Printf("len(nodeEdges) != len(edges) : %d != %d\n", len(nodeEdges), len(edges))1059 ret = false1060 }1061 // 2. Equal number of edgeNodes as nodes?1062 if len(edgeNodes) != len(nodes) {1063 fmt.Printf("len(edgeNodes) != len(nodes) : %d != %d\n", len(edgeNodes), len(nodes))1064 ret = false1065 }1066 // 2. nodeEdges should have ref count 21067 bad := []*mutationEdge{}1068 for e, c := range nodeEdges {1069 if c != 2 {1070 bad = append(bad, e)1071 }1072 }1073 if len(bad) > 0 {1074 fmt.Printf("%d edges have ref count != 2: %v\n", len(bad), bad)1075 ret = false1076 }1077 fmt.Println("=== END: Graph sanity check ===")1078 return ret1079}1080// buildGraphStripState takes discovery dependencies into account1081// it uses this info to simplify node state specs so they don't proclaim to know things they can't know anymore1082// it then reduces nodes that became the same after forgetting extra state info1083// note: we can't really know discoverable dependencies for sure until we did stage1 build1084func (sme *StateMutationEngine) buildGraphStripState(nodes []*mutationNode, edges []*mutationEdge) ([]*mutationNode, []*mutationEdge) {1085 deps := sme.buildGraphDiscoverDepends(edges)1086 sme.deps = deps1087 sme.printDeps(deps) // print debugging output for deps1088 rmEdge := func(edges []*mutationEdge, edge *mutationEdge) []*mutationEdge {1089 for i := range edges {1090 if edges[i] == edge {1091 edges = append(edges[:i], edges[i+1:]...)1092 return edges1093 }1094 }1095 return edges1096 }1097 // does s1 contain more info than s2? (i.e. a mutator that's not in s2)1098 hasExtraInfo := func(s1, s2 types.StateSpec) bool {1099 r1 := s1.Requires()1100 r2 := s2.Requires()1101 for k := range r1 {1102 if _, ok := r2[k]; !ok {1103 // we gained info!1104 return true1105 }1106 }1107 return false1108 }1109 // 1. iterate through the nodes1110 // - remove dependecy violating info1111 // - remove zero values1112 // - remap newly redundant nodes1113 newNodes := []*mutationNode{}1114OUTER_NODE:1115 for _, n := range nodes { // for all nodes1116 nr := n.spec.Requires()1117 ne := n.spec.Excludes()1118 vr, ve := sme.nodeViolatesDeps(deps, n) // get list of violating urls1119 for _, r := range vr {1120 delete(nr, r)1121 }1122 for _, e := range ve {1123 delete(ne, e)1124 }1125 for u := range nr {1126 if nr[u].IsZero() {1127 delete(nr, u)1128 }1129 }1130 for u := range ne {1131 if ne[u].IsZero() {1132 delete(ne, u)1133 }1134 }1135 n.spec = NewStateSpec(nr, ne)1136 // Now, has this node become redundant? If so, remap it.1137 for _, nn := range newNodes {1138 // We only care that the requirements are the same1139 // It's OK if the excludes get broader1140 if n.spec.ReqsEqual(nn.spec) { // duplicate node1141 dead := sme.remapToNode(n, nn, true)1142 for _, e := range dead {1143 rmEdge(edges, e)1144 }1145 nn.spec.LeastCommon(n.spec) // strip extra excludes1146 continue OUTER_NODE1147 }1148 }1149 newNodes = append(newNodes, n)1150 }1151 // 2. Now we need to remove edges that have become violations1152 newEdges := []*mutationEdge{}1153OUTER_EDGE:1154 for _, e := range edges {1155 // first, skip this edge if we've already seen one equal to it1156 if edgeInEdges(e, newEdges) {1157 e.from.out = rmEdge(e.from.out, e)1158 e.to.in = rmEdge(e.to.in, e)1159 continue1160 }1161 imp := e.from.spec.SpecMergeMust(e.mut.After())1162 if hasExtraInfo(e.to.spec, imp) { // we're not allowed to gain extra mutation information. FIXME: we should reconsider how we evaluate this1163 for u := range e.to.spec.Requires() {1164 if _, ok := imp.Requires()[u]; !ok { // outlier1165 if _, ok := sme.mutators[u]; ok { // and a mutator, delete1166 e.from.out = rmEdge(e.from.out, e)1167 e.to.in = rmEdge(e.to.in, e)1168 continue OUTER_EDGE1169 }1170 }1171 }1172 }1173 if !e.mut.SpecCompatOut(e.from.spec, sme.mutators) { // this edge is no longer compatible1174 e.from.out = rmEdge(e.from.out, e)1175 e.to.in = rmEdge(e.to.in, e)1176 continue1177 }1178 if !e.mut.SpecCompatIn(e.to.spec, sme.mutators) { // this edge is no longer compatible1179 // there is one special case where this is ok: we forgot our requirement on mutation1180 tmpNode := &mutationNode{1181 spec: e.to.spec.SpecMergeMust(NewStateSpec(e.mut.Requires(), map[string]reflect.Value{})),1182 }1183 r, _ := sme.nodeViolatesDeps(deps, tmpNode)1184 if len(r) != 0 {1185 if !e.mut.SpecCompatIn(tmpNode.spec, sme.mutators) {1186 e.from.out = rmEdge(e.from.out, e)1187 e.to.in = rmEdge(e.to.in, e)1188 continue1189 }1190 } else {1191 // no special case1192 e.from.out = rmEdge(e.from.out, e)1193 e.to.in = rmEdge(e.to.in, e)1194 continue1195 }1196 }1197 newEdges = append(newEdges, e)1198 }1199 return newNodes, newEdges1200}1201// buildGraph builds the graph of Specs/Mutations. It is depth-first, recursive.1202// TODO: this function may eventually need recursion protection1203// !!!IMPORTANT!!!1204// buildGraph assumes you already hold a lock1205// currently only used in onUpdate1206func (sme *StateMutationEngine) buildGraph(root *mutationNode) (nodes []*mutationNode, edges []*mutationEdge) {1207 nodes, edges = sme.buildGraphStage1(root, nil, map[types.StateSpec]*mutationNode{})1208 if sme.log.GetLoggerLevel() > types.LLDEBUG {1209 sme.graphIsSane(nodes, edges)1210 }1211 nodes, edges = sme.buildGraphStripState(nodes, edges)1212 if sme.log.GetLoggerLevel() > types.LLDEBUG {1213 sme.graphIsSane(nodes, edges)1214 }1215 if sme.log.GetLoggerLevel() > types.LLDDEBUG {1216 sme.DumpJSONGraph(nodes, edges)1217 }1218 return1219}1220// !!!IMPORTANT!!!1221// clearGraph assumes you already hold a lock1222// currently only used in onUpdate1223func (sme *StateMutationEngine) clearGraph() {1224 sme.mutators = make(map[string]uint32)1225 sme.requires = make(map[string]uint32)1226 sme.graph.in = []*mutationEdge{}1227 sme.graph.out = []*mutationEdge{}1228 sme.graph.spec = sme.root1229}1230// onUpdate should get called any time a new mutation is registered1231// onUpdate gets a graphMutex around everything, so it's important that it doesn't1232// call anything that tries to get it's own lock, or it will deadlock1233// LOCKS: graphMutex (RW)1234// FIXME: We should re-compute active mutations?1235func (sme *StateMutationEngine) onUpdate() {1236 sme.graphMutex.Lock()1237 sme.clearGraph()1238 sme.collectURLs()1239 sme.nodes, sme.edges = sme.buildGraph(sme.graph)1240 sme.Logf(DEBUG, "Built graph [ Mutations: %d Mutation URLs: %d Requires URLs: %d Graph Nodes: %d Graph Edges: %d ]",1241 len(sme.muts), len(sme.mutators), len(sme.requires), len(sme.nodes), len(sme.edges))1242 sme.graphMutex.Unlock()1243}1244// LOCKS: graphMutex (R)1245func (sme *StateMutationEngine) nodeSearch(node types.Node) (mns []*mutationNode) {1246 sme.graphMutex.RLock()1247 defer sme.graphMutex.RUnlock()1248 for _, n := range sme.nodes {1249 if n == sme.graph {1250 // the root node with match anything, but we don't want it...1251 continue1252 }1253 if n.spec.NodeMatch(node) {1254 mns = append(mns, n)1255 }1256 }1257 return1258}1259// LOCKS: graphMutex (R)1260func (sme *StateMutationEngine) boundarySearch(start types.Node, end types.Node) (gstart []*mutationNode, gend []*mutationNode) {1261 startMerge := sme.dscNodeMeld(end, start)1262 sme.graphMutex.RLock()1263 for _, n := range sme.nodes {1264 // in general, we don't want the graph root as an option1265 if n != sme.graph && n.spec.NodeMatchWithMutators(startMerge, sme.mutators) {1266 gstart = append(gstart, n)1267 }1268 if n != sme.graph && n.spec.NodeCompatWithMutators(end, sme.mutators) { // ends can be more lenient1269 gend = append(gend, n)1270 }1271 }1272 sme.graphMutex.RUnlock()1273 // there's one exception: we may be starting on the graph root (if nothing else matched)1274 /* actually, this is a bad idea1275 if len(gstart) == 0 {1276 gstart = append(gstart, sme.graph)1277 }1278 */1279 return1280}1281// drijkstra implements the Drijkstra shortest path graph algorithm.1282// NOTE: An alternative would be to pre-compute trees for every node1283// LOCKS: graphMutex (R)1284func (sme *StateMutationEngine) drijkstra(gstart *mutationNode, gend []*mutationNode) *mutationPath {1285 sme.graphMutex.RLock()1286 defer sme.graphMutex.RUnlock()1287 isEnd := func(i *mutationNode) (r bool) {1288 for _, j := range gend {1289 if i == j {1290 return true1291 }1292 }1293 return1294 }1295 dist := make(map[*mutationNode]uint32)1296 prev := make(map[*mutationNode]*mutationEdge)1297 queue := make(map[*mutationNode]*mutationNode)1298 for _, n := range sme.nodes {1299 dist[n] = ^uint32(0) - 1 // max uint32 - 1, a total hack1300 prev[n] = nil1301 queue[n] = n1302 }1303 dist[gstart] = 01304 for len(queue) > 0 {1305 min := ^uint32(0)1306 var idx *mutationNode1307 for k, v := range queue {1308 if dist[v] < min {1309 min = dist[v]1310 idx = k1311 }1312 }1313 u := queue[idx]1314 if isEnd(u) {1315 // found it!1316 var chain []*mutationEdge1317 i := u1318 for prev[i] != nil {1319 chain = append([]*mutationEdge{prev[i]}, chain...)1320 i = prev[i].from1321 }1322 path := &mutationPath{1323 mutex: &sync.Mutex{},1324 gstart: gstart,1325 gend: u,1326 chain: chain,1327 curSeen: []string{},1328 cmplt: false,1329 }1330 return path1331 }1332 delete(queue, idx)1333 for _, v := range u.out {1334 if _, ok := queue[v.to]; !ok { // v should be in queue1335 continue1336 }1337 alt := dist[u] + v.cost1338 if alt < dist[v.to] {1339 dist[v.to] = alt1340 prev[v.to] = v1341 }1342 }1343 }1344 return nil1345}1346// findPath finds the sequence of edges (if it exists) between two types.Nodes1347// LOCKS: graphMutex (R) via boundarySearch, drijkstra1348func (sme *StateMutationEngine) findPath(start types.Node, end types.Node) (path *mutationPath, e error) {1349 same := true1350 for m := range sme.mutators {1351 sv, _ := start.GetValue(m)1352 ev, _ := end.GetValue(m)1353 if sv.Interface() != ev.Interface() {1354 same = false1355 break1356 }1357 }1358 if same {1359 path = &mutationPath{1360 mutex: &sync.Mutex{},1361 start: start,1362 end: end,1363 cur: 0,1364 cmplt: true,1365 curSeen: []string{},1366 chain: []*mutationEdge{},1367 }1368 return1369 }1370 gs, ge := sme.boundarySearch(start, end)1371 if len(gs) < 1 {1372 e = fmt.Errorf("could not find path: start not in graph")1373 }1374 if len(ge) < 1 {1375 e = fmt.Errorf("could not find path: end not in graph")1376 if sme.GetLoggerLevel() >= DDEBUG {1377 fmt.Printf("start: %v, end: %v\n", string(start.JSON()), string(end.JSON()))1378 sme.DumpGraph()1379 sme.graphMutex.RLock()1380 sme.DumpJSONGraph(sme.nodes, sme.edges) // Use this to debug your graph1381 sme.graphMutex.RUnlock()1382 }1383 }1384 if e != nil {1385 return1386 }1387 // try starts until we get a path (or fail)1388 for _, st := range gs {1389 path = sme.drijkstra(st, ge) // we require a unique start, but not a unique end1390 if path.chain != nil {1391 break1392 }1393 }1394 path.start = start1395 path.end = end1396 path.cur = 01397 if path.chain == nil {1398 e = fmt.Errorf("path not found: you can't get there from here")1399 path = nil1400 }1401 return1402}1403// startNewMutation sees if we need a new mutation1404// if we do, it starts it1405// if we don't already have a mutation object, it creates it1406// LOCKS: graphMutex (R) via findPath; activeMutex; path.mutex1407func (sme *StateMutationEngine) startNewMutation(node string) {1408 // we assume it's already been verified that this is *new*1409 nid := ct.NewNodeIDFromURL(node)1410 start, e := sme.query.ReadDsc(nid)1411 if e != nil {1412 sme.Log(ERROR, e.Error())1413 return1414 } // this is bad...1415 end, e := sme.query.Read(nid)1416 if e != nil {1417 sme.Log(ERROR, e.Error())1418 return1419 }1420 p, e := sme.findPath(start, end)1421 if e != nil {1422 sme.Log(ERROR, e.Error())1423 return1424 }1425 if len(p.chain) == 0 { // we're already there1426 sme.Logf(DEBUG, "%s discovered that we're already where we want to be", nid.String())1427 return1428 }1429 sme.activeMutex.Lock()1430 alreadyFired := false1431 if cur, ok := sme.active[node]; ok {1432 // is this really a new path?1433 if cur.same(p) {1434 // this is not a change1435 sme.Logf(DDEBUG, "%s startNewMutation called, but found path is same as current path", nid.String())1436 sme.activeMutex.Unlock()1437 return1438 }1439 if cur.alreadyFired(p) {1440 // we're already performing the mut we need1441 alreadyFired = true1442 } else {1443 // we need to cleanup the old mutation1444 cur.mutex.Lock()1445 if cur.timer != nil {1446 cur.timer.Stop()1447 }1448 sme.unwaitForService(cur)1449 cur.mutex.Unlock()1450 }1451 }1452 // new mutation, record it, and start it in motion1453 // we need to hold the path mutex for the rest of this function1454 p.mutex.Lock()1455 defer p.mutex.Unlock()1456 sme.active[node] = p1457 sme.activeMutex.Unlock()1458 sme.Logf(DEBUG, "started new mutation for %s (1/%d).", nid.String(), len(p.chain))1459 if sme.mutationInContext(end, p.chain[p.cur].mut) {1460 if sme.waitForServices(p) {1461 return1462 }1463 if !alreadyFired {1464 sme.Logf(DDEBUG, "firing mutation in context, timeout %s.", p.chain[p.cur].mut.Timeout().String())1465 sme.emitMutation(end, start, p.chain[p.cur].mut)1466 if p.chain[p.cur].mut.Timeout() != 0 {1467 if p.timer != nil {1468 // Stop old timer if it exists1469 p.timer.Stop()1470 }1471 p.timer = time.AfterFunc(p.chain[p.cur].mut.Timeout(), func() { sme.emitFail(start, p) })1472 }1473 } else {1474 // already fired1475 sme.Logf(DDEBUG, "%s starting new mutation chain, but current mutation was already fired by previous chain", nid.String())1476 }1477 } else {1478 sme.Log(DDEBUG, "mutation is not in our context.")1479 }1480}1481// Assumes that path is already locked1482// LOCKS: activeMutex1483func (sme *StateMutationEngine) waitForServices(p *mutationPath) (wait bool) {1484 m, _ := sme.mutResolver[p.chain[p.cur].mut] // what ServiceInstance do we depend on?1485 // is there a current waitFor queue for this SI?1486 si := m[0]1487 // we don't wait for core mutations1488 if si == "core" {1489 return1490 }1491 sme.activeMutex.Lock()1492 defer sme.activeMutex.Unlock()1493 if _, ok := sme.waiting[si]; ok {1494 // queue already exists, just add ourselves to it1495 sme.waiting[si] = append(sme.waiting[si], p)1496 p.waitingFor = si1497 sme.Logf(INFO, "%s is waiting for service %s", p.end.ID().String(), si)1498 return true1499 }1500 // queue doesn't already exist, is service running? Is this too expensive?1501 url := util.NodeURLJoin(sme.self.String(), util.URLPush(util.URLPush("/Services", si), "State"))1502 v, e := sme.query.GetValueDsc(url)1503 if e != nil {1504 sme.Logf(ERROR, "waitForServices could not lookup service state (%s): %v", url, e)1505 return1506 }1507 if pb.ServiceInstance_ServiceState(v.Int()) != pb.ServiceInstance_RUN {1508 // Mutation was requested for a service that isn't running yet1509 // 1. Set it to run1510 if _, e := sme.query.SetValue(url, reflect.ValueOf(pb.ServiceInstance_RUN)); e != nil {1511 sme.Logf(ERROR, "waitForServices failed to set service state (%s): %v", url, e)1512 // we still continue1513 }1514 // 3. Create a waitlist of this SI1515 sme.waiting[si] = []*mutationPath{p}1516 p.waitingFor = si1517 sme.Logf(INFO, "%s is waiting for service %s", p.end.ID().String(), si)1518 return true1519 }1520 return1521}1522// unwaitForService clears waiting status for path1523// assumes activeMutex is already locked1524func (sme *StateMutationEngine) unwaitForService(p *mutationPath) {1525 if p.waitingFor != "" {1526 queue := sme.waiting[p.waitingFor]1527 for i := range queue {1528 if queue[i] == p {1529 // order isn't important1530 queue[i] = queue[len(queue)-1]1531 sme.waiting[p.waitingFor] = queue[:len(queue)-1]1532 break1533 }1534 }1535 p.waitingFor = ""1536 }1537}1538func (sme *StateMutationEngine) handleServiceEvent(v *StateChangeEvent) {1539 if v.Type != StateChange_UPDATE {1540 return1541 } // DSC update1542 _, url := util.NodeURLSplit(v.URL)1543 us := util.URLToSlice(url)1544 if us[len(us)-1] != "State" {1545 // not a change in service state1546 return1547 }1548 if v.Value.Kind() != reflect.TypeOf(pb.ServiceInstance_RUN).Kind() {1549 // it looks like we weren't actually passed the state value1550 // this shouldn't happen, but if we don't check we could panic1551 return1552 }1553 if pb.ServiceInstance_ServiceState(v.Value.Int()) != pb.ServiceInstance_RUN {1554 return1555 } // service discovered run state1556 // get SI1557 si := ""1558 // this makes sure we don't get tripped up by leading slashes1559 for i := range us {1560 if us[i] == "Services" {1561 si = us[i+1]1562 }1563 }1564 if si == "" {1565 sme.Logf(types.LLDEBUG, "failed to parse URL for /Services state change: %s", v.URL)1566 return1567 }1568 // OK, let's resume any waiting chains1569 sme.activeMutex.Lock()1570 queue, ok := sme.waiting[si]1571 if ok {1572 delete(sme.waiting, si)1573 } else {1574 queue = []*mutationPath{}1575 }1576 sme.activeMutex.Unlock()1577 for _, p := range queue {1578 p.mutex.Lock()1579 p.waitingFor = ""1580 p.cur-- // we have to rewind one to advance. This could even mean we go negative1581 sme.advanceMutation(p.end.ID().String(), p)1582 p.mutex.Unlock()1583 }1584}1585// LOCKS: graphMutex (R); path.mutex1586func (sme *StateMutationEngine) emitFail(start types.Node, p *mutationPath) {1587 p.mutex.Lock()1588 defer p.mutex.Unlock()1589 nid := p.start.ID()1590 d := p.chain[p.cur].mut.FailTo()1591 sme.Logf(INFO, "mutation timeout for %s, emitting: %s:%s:%s", nid.String(), d[0], d[1], d[2])1592 // try devolve first1593 val, ok := Registry.Discoverables[d[0]][d[1]][d[2]]1594 if !ok {1595 sme.Logf(ERROR, "could not find value %v:%v:%v in discoverables registry", d[0], d[1], d[2])1596 return1597 }1598 rewind, i, err := sme.devolve(p, d[1], val)1599 n, e := sme.query.ReadDsc(nid)1600 if e != nil {1601 // ok, I give up. The node has just disappeared.1602 sme.Logf(ERROR, "%s node unexpectly disappeared", nid.String())1603 return1604 }1605 nc, e := sme.query.Read(nid)1606 if e != nil {1607 // ok, I give up. The node has just disappeared.1608 sme.Logf(ERROR, "%s node unexpectly disappeared", nid.String())1609 return1610 }1611 // this is a devolution1612 if err == nil {1613 // ok, let's devolve1614 sme.Logf(DEBUG, "%s is devolving back %d steps due to an unexpected regression", nid, p.cur-i)1615 n.SetValues(rewind)1616 p.chain = append(p.chain[:p.cur+1], p.chain[i:]...)1617 sme.advanceMutation(nid.String(), p)1618 return1619 }1620 // this isn't a devolution1621 // our strategy is to:1622 // 0) create a spec that mimics the current state of the node1623 // 1) set the failto value1624 // 2) remove any values that violate epistemology1625 sme.Logf(DEBUG, "%s could not devolve, setting failure %s = %s", nid, d[1], util.ValueToString(val))1626 n.SetValue(d[1], val)1627 sme.nodeForgetUnknowable(nc, n)1628 // now send a discover to whatever failed state1629 url := util.NodeURLJoin(nid.String(), d[1])1630 dv := NewEvent(1631 types.Event_DISCOVERY,1632 url,1633 &DiscoveryEvent{1634 ID: d[0],1635 URL: url,1636 ValueID: d[2],1637 },1638 )1639 // send a mutation interrupt1640 iv := NewEvent(1641 types.Event_STATE_MUTATION,1642 url,1643 &MutationEvent{1644 Type: pb.MutationControl_INTERRUPT,1645 NodeCfg: p.end,1646 NodeDsc: start,1647 Mutation: sme.mutResolver[p.chain[p.cur].mut],1648 },1649 )1650 sme.Emit([]types.Event{dv, iv})1651}1652// advanceMutation fires off the next mutation in the chain1653// does *not* check to make sure there is one1654// assumes m.mutex is locked by surrounding func1655func (sme *StateMutationEngine) advanceMutation(node string, m *mutationPath) {1656 nid := ct.NewNodeIDFromURL(node)1657 m.cur++1658 m.curSeen = []string{}1659 sme.Logf(DEBUG, "resuming mutation for %s (%d/%d).", nid.String(), m.cur+1, len(m.chain))1660 if sme.mutationInContext(m.end, m.chain[m.cur].mut) {1661 if sme.waitForServices(m) {1662 return1663 }1664 sme.Logf(DDEBUG, "firing mutation in context, timeout %s.", m.chain[m.cur].mut.Timeout().String())1665 sme.emitMutation(m.end, m.start, m.chain[m.cur].mut)1666 if m.chain[m.cur].mut.Timeout() != 0 {1667 if m.timer != nil {1668 // Stop old timer if it exists1669 m.timer.Stop()1670 }1671 m.timer = time.AfterFunc(m.chain[m.cur].mut.Timeout(), func() { sme.emitFail(m.start, m) })1672 }1673 } else {1674 sme.Logf(DDEBUG, "node (%s) mutation is not in our context", node)1675 }1676}1677// devolve will reverse through a mutation path until it gets to the desired url and val.1678// If it succeeds, it will return a map of urls to values that need to be set to devolve and the index of the devolve point in the mutation chain.1679// If it fails to devolve, it will return an error.1680// LOCKS: This assumes mutation path has been locked1681func (sme *StateMutationEngine) devolve(m *mutationPath, url string, val reflect.Value) (map[string]reflect.Value, int, error) {1682 rewind := make(map[string]reflect.Value)1683 // starting from the current position, look backwards in the chain1684 // have we seen this value before? Maybe we need to reset to that point...1685 found := false1686 var i int1687 for i = m.cur; i >= 0; i-- {1688 // is there a mutation with this url?1689 for murl, mvs := range m.chain[i].mut.Mutates() {1690 if murl == url {1691 // this mutation deals with the url of interest1692 if mvs[0].Interface() == val.Interface() {1693 // this is our rewind point, but we need the rest of this loop1694 found = true1695 }1696 } else {1697 // add to our rewind1698 rewind[murl] = mvs[0]1699 }1700 }1701 if found {1702 break1703 }1704 }1705 if found {1706 return rewind, i, nil1707 }1708 e := fmt.Errorf("could not find desired url in mutation path for devolve")1709 return nil, 0, e1710}1711// handleUnexpected deals with unexpected events in updateMutation1712// Logic:1713// 1) did we regress to a previous point in the chain? devolve.1714// 2) no? can we find a direct path?1715// 3) no? give up, declare everything (except the unexpected discovery) unknown1716// LOCKS: !!! this does *not* lock the mutationPath, it assumes it is already locked by the calling function1717// (generally updateMutation)1718func (sme *StateMutationEngine) handleUnexpected(node, url string, val reflect.Value) {1719 sme.activeMutex.Lock()1720 m, ok := sme.active[node]1721 sme.activeMutex.Unlock()1722 if !ok {1723 // there's no existing mutation chain1724 // shouldn't really happen1725 sme.startNewMutation(node)1726 return1727 }1728 m.cmplt = false1729 // this is a bit bad. We don't want to get our own state changes, so we change the node directly1730 nid := ct.NewNodeIDFromURL(node)1731 n, e := sme.query.ReadDsc(nid)1732 if e != nil {1733 // ok, I give up. The node has just disappeared.1734 sme.Logf(ERROR, "%s node unexpectly disappeared", node)1735 return1736 }1737 // can we find a path?1738 end, e := sme.query.Read(nid)1739 if e != nil {1740 sme.Log(ERROR, e.Error())1741 return1742 }1743 p, e := sme.findPath(n, end)1744 if e == nil {1745 if len(p.chain) == 0 { // we're already there1746 sme.Logf(DEBUG, "%s discovered that we're already where we want to be", nid.String())1747 return1748 }1749 // update & advance1750 sme.Logf(DEBUG, "%s found a new path", node)1751 m.chain = append(m.chain[:m.cur+1], p.chain...)1752 sme.advanceMutation(node, m)1753 return1754 }1755 sme.Logf(DEBUG, "%s could neither find a path, nor devolve. We're lost.", node)1756}1757// updateMutation attempts to progress along an existing mutation chain1758// LOCKS: activeMutex; path.mutex; graphMutex (R) via startNewMutation1759func (sme *StateMutationEngine) updateMutation(node string, url string, val reflect.Value) {1760 sme.activeMutex.Lock()1761 m, ok := sme.active[node]1762 if !ok {1763 // this shouldn't happen1764 sme.Logf(DDEBUG, "call to updateMutation, but no mutation exists %s", node)1765 sme.startNewMutation(node)1766 sme.activeMutex.Unlock()1767 return1768 }1769 // let's make sure we forget anything we can't know anymore1770 nid := ct.NewNodeID(node)1771 cfg, _ := sme.query.Read(nid)1772 dsc, _ := sme.query.ReadDsc(nid)1773 sme.nodeForgetUnknowable(cfg, dsc)1774 // we should reset waiting status1775 sme.unwaitForService(m)1776 sme.activeMutex.Unlock()1777 m.mutex.Lock()1778 defer m.mutex.Unlock()1779 // stop any timer clocks1780 if m.timer != nil {1781 m.timer.Stop()1782 }1783 // we still query this to make sure it's the Dsc value1784 var e error1785 val, e = sme.query.GetValueDsc(util.NodeURLJoin(node, url))1786 if e != nil {1787 sme.Log(ERROR, e.Error())1788 return1789 }1790 // this is a discovery on a completed chain1791 if m.cur >= len(m.chain) {1792 sme.Logf(DEBUG, "node (%s) got a discovery on a completed chain (%s)", node, url)1793 sme.handleUnexpected(node, url, val)1794 return1795 }1796 // is this a value change we were expecting?1797 cmuts := m.chain[m.cur].mut.Mutates()1798 vs, match := cmuts[url]1799 if !match {1800 // we got an unexpected change! Recalculating...1801 sme.Logf(DEBUG, "node (%s) got an unexpected change of state (%s)", node, url)1802 sme.handleUnexpected(node, url, val)1803 return1804 }1805 // ok, we got an expected URL. Is this the value we were looking for?1806 if val.Interface() == vs[1].Interface() {1807 // Ah! Good, we're mutating as intended.1808 m.curSeen = append(m.curSeen, url)1809 // Ok, everything checks out, but maybe we have more things to discover before progressing?1810 // TODO: more efficient way to do this for large numbers of URL changes/mut?1811 for url := range cmuts {1812 got := false1813 for _, seen := range m.curSeen {1814 if url == seen { // ok, we got this one1815 got = true1816 break1817 }1818 }1819 if !got {1820 // ok, we haven't seen all of the URL's discovered1821 sme.Logf(DEBUG, "mutation chain for %s progressing as normal, but this mutation isn't complete yet. Still need: %s", node, url)1822 return1823 }1824 }1825 m.curSeen = []string{} // possibly redundant1826 if m.timer != nil {1827 m.timer.Stop()1828 }1829 // are we done?1830 if len(m.chain) == m.cur+1 {1831 // all done!1832 sme.Logf(DEBUG, "mutation chain completed for %s (%d/%d)", node, m.cur+1, len(m.chain))1833 m.cmplt = true1834 return1835 }1836 sme.Logf(DEBUG, "mutation for %s progressing as normal, moving to next (%d/%d)", node, m.cur+1, len(m.chain))1837 // advance1838 sme.advanceMutation(node, m)1839 } else if val.Interface() == vs[0].Interface() { // might want to do more with this case later; for now we have to just recalculate1840 sme.Logf(DEBUG, "mutation for %s failed to progress, got %v, expected %v\n", node, val.Interface(), vs[1].Interface())1841 sme.handleUnexpected(node, url, val)1842 } else {1843 sme.Logf(DEBUG, "unexpected mutation step for %s, got %v, expected %v\n", node, val.Interface(), vs[1].Interface())1844 // we got something completely unexpected... start over1845 sme.handleUnexpected(node, url, val)1846 }1847}1848// Assumes you already hold a lock1849func (sme *StateMutationEngine) mutationInContext(n types.Node, m types.StateMutation) (r bool) {1850 switch m.Context() {1851 case types.StateMutationContext_SELF:1852 if sme.self.EqualTo(n.ID()) {1853 return true1854 }1855 break1856 case types.StateMutationContext_CHILD:1857 if sme.self.EqualTo(n.ParentID()) {1858 return true1859 }1860 break1861 case types.StateMutationContext_ALL:1862 return true1863 }1864 return1865}1866func (sme *StateMutationEngine) handleEvent(v types.Event) {1867 sce := v.Data().(*StateChangeEvent)1868 node, url := util.NodeURLSplit(sce.URL)1869 sme.activeMutex.Lock()1870 _, ok := sme.active[node] // get the active mutation, if there is one1871 sme.activeMutex.Unlock()1872 switch sce.Type {1873 case StateChange_CREATE:1874 if ok {1875 // what?! how do we have an active mutation for a node that was just created?1876 // let's print something, and then pretend it *is* new1877 sme.Log(DEBUG, "what?! we got a CREATE event for a node with an existing mutation")1878 sme.activeMutex.Lock()1879 m := sme.active[node]1880 m.mutex.Lock()1881 if m.timer != nil {1882 m.timer.Stop()1883 }1884 sme.unwaitForService(m)1885 delete(sme.active, node)1886 m.mutex.Unlock()1887 sme.activeMutex.Unlock()1888 }1889 sme.startNewMutation(node)1890 case StateChange_DELETE:1891 if ok {1892 sme.activeMutex.Lock()1893 m := sme.active[node]1894 m.mutex.Lock()1895 if m.timer != nil {1896 m.timer.Stop()1897 }1898 sme.unwaitForService(m)1899 delete(sme.active, node)1900 m.mutex.Unlock()1901 sme.activeMutex.Unlock()1902 }1903 case StateChange_UPDATE:1904 sme.updateMutation(node, url, sce.Value)1905 case StateChange_CFG_UPDATE:1906 // for a cfg update, we need to create a new chain1907 sme.Logf(DEBUG, "our cfg has changed, creating new mutaiton path: %s:%s", node, url)1908 sme.startNewMutation(node)1909 default:1910 }1911}1912// LOCKS: graphMutex (R)1913func (sme *StateMutationEngine) emitMutation(cfg types.Node, dsc types.Node, sm types.StateMutation) {1914 sme.graphMutex.RLock()1915 smee := &MutationEvent{1916 Type: MutationEvent_MUTATE,1917 NodeCfg: cfg,1918 NodeDsc: dsc,1919 Mutation: sme.mutResolver[sm],1920 }1921 sme.graphMutex.RUnlock()1922 v := NewEvent(1923 types.Event_STATE_MUTATION,1924 cfg.ID().String(),1925 smee,1926 )1927 sme.EmitOne(v)1928}1929// It might be useful to export this1930// Also, there's no particular reason it belongs here1931// This takes the cfg state and merges only discoverable values from dsc state into it1932func (sme *StateMutationEngine) dscNodeMeld(cfg, dsc types.Node) (r types.Node) {1933 r = NewNodeFromMessage(cfg.Message().(*pb.Node)) // might be a bit expensive1934 diff := []string{}1935 for si := range Registry.Discoverables {1936 for u := range Registry.Discoverables[si] {1937 diff = append(diff, u)1938 }1939 }1940 r.MergeDiff(dsc, diff)1941 return1942}1943///////////////////////////1944// Passthrough Interface /1945/////////////////////////1946/*1947 * Consume Logger1948 */1949var _ types.Logger = (*StateMutationEngine)(nil)1950func (sme *StateMutationEngine) Log(level types.LoggerLevel, m string) { sme.log.Log(level, m) }1951func (sme *StateMutationEngine) Logf(level types.LoggerLevel, fmt string, v ...interface{}) {1952 sme.log.Logf(level, fmt, v...)1953}1954func (sme *StateMutationEngine) SetModule(name string) { sme.log.SetModule(name) }1955func (sme *StateMutationEngine) GetModule() string { return sme.log.GetModule() }1956func (sme *StateMutationEngine) SetLoggerLevel(level types.LoggerLevel) {1957 sme.log.SetLoggerLevel(level)1958}1959func (sme *StateMutationEngine) GetLoggerLevel() types.LoggerLevel { return sme.log.GetLoggerLevel() }1960func (sme *StateMutationEngine) IsEnabledFor(level types.LoggerLevel) bool {1961 return sme.log.IsEnabledFor(level)1962}1963/*1964 * Consume an emitter, so we implement EventEmitter directly1965 */1966func (sme *StateMutationEngine) Subscribe(id string, c chan<- []types.Event) error {1967 return sme.em.Subscribe(id, c)1968}1969func (sme *StateMutationEngine) Unsubscribe(id string) error { return sme.em.Unsubscribe(id) }1970func (sme *StateMutationEngine) Emit(v []types.Event) { sme.em.Emit(v) }1971func (sme *StateMutationEngine) EmitOne(v types.Event) { sme.em.EmitOne(v) }1972func (sme *StateMutationEngine) EventType() types.EventType { return sme.em.EventType() }...
grapqhl_test.go
Source:grapqhl_test.go
...21func ExecuteQuery(db *gorm.DB, query string) *GraphQlContrib.Result {22 schema, err := GraphQlContrib.NewSchema(23 GraphQlContrib.SchemaConfig{24 Query: graphql.Query(db),25 Mutation: mutation.Mutation(db),26 },27 )28 if err != nil {29 panic(err)30 }31 result := GraphQlContrib.Do(GraphQlContrib.Params{32 Schema: schema,33 RequestString: query,34 })35 return result36}37type TestingLoop struct {38 Query string39 Results string40}41// Testing the DB connection.42func (suite *GraphQlTestSuite) TestQuery() {43 content, _ := ioutil.ReadFile("./dummy_json.json")44 text := string(content)45 payload, _ := mutation.JsonStringParse(text)46 mutation.MigrateProcessedObject(payload, suite.DB)47 queries := map[int]TestingLoop{48 0: TestingLoop{49 Query: `query { funds { fund_name } }`,50 Results: "{\"data\":{\"funds\":[{\"fund_name\":\"×קפת ×ר××\"}]}}\n",51 },52 1: TestingLoop{53 Query: `query { instrument(id:1) { instrument_name } }`,54 Results: "{\"data\":{\"instrument\":{\"instrument_name\":\"×× ×§ ×פ××¢××× ××¢\\\"×\"}}}\n",55 },56 }57 e := echo.New()58 h := echo.WrapHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {59 b, err := ioutil.ReadAll(r.Body)60 if err != nil {61 panic(err)62 }63 results := ExecuteQuery(suite.DB, fmt.Sprintf("%s", b))64 json.NewEncoder(w).Encode(results)65 }))66 for _, testingLoop := range queries {67 req := httptest.NewRequest(http.MethodPost, "/graphql", strings.NewReader(testingLoop.Query))68 rec := httptest.NewRecorder()69 c := e.NewContext(req, rec)70 if assert.NoError(suite.T(), h(c)) {71 assert.Equal(suite.T(), http.StatusOK, rec.Code)72 assert.Equal(suite.T(), testingLoop.Results, rec.Body.String())73 }74 }75}76func (suite *GraphQlTestSuite) TestMutation() {77 // todo.78}79func TestGraphQl(t *testing.T) {80 suite.Run(t, new(GraphQlTestSuite))81}...
graphql.go
Source:graphql.go
...12 Target: newGraphQlQuery,13 },14 fx.Annotated{15 Name: "mutation",16 Target: newGraphQlMutation,17 },18 newGraphQlSchema,19)20func newGraphQlQuery() *graphql.Object {21 return graphql.NewObject(graphql.ObjectConfig{22 Name: "Query",23 Description: "The application's root query object",24 Fields: graphql.Fields{},25 })26}27func newGraphQlMutation() *graphql.Object {28 return graphql.NewObject(graphql.ObjectConfig{29 Name: "Mutation",30 Description: "The application's root mutation object",31 Fields: graphql.Fields{},32 })33}34type graphqlDependencies struct {35 fx.In36 Query *graphql.Object `name:"query"`37 Mutation *graphql.Object `name:"mutation"`38}39func newGraphQlSchema(dependencies graphqlDependencies, logger *logrus.Logger) *graphql.Schema {40 query := dependencies.Query41 mutation := dependencies.Query42 schema, err := graphql.NewSchema(graphql.SchemaConfig{43 Query: func() *graphql.Object {44 if len(query.Fields()) <= 0 {45 return nil46 }47 return query48 }(),49 Mutation: func() *graphql.Object {50 if len(mutation.Fields()) <= 0 {51 return nil52 }53 return mutation54 }(),55 })56 if err != nil {57 logger.Error(err)58 }59 return &schema60}61func UseGraphQl(schema *graphql.Schema, server *echo.Echo, logger *logrus.Logger) {62 graphqlHandler := handler.New(&handler.Config{63 Schema: schema,...
Mutation
Using AI Code Generation
1import (2func main() {3 g.AddVertex("A")4 g.AddVertex("B")5 g.AddVertex("C")6 g.AddVertex("D")7 g.AddVertex("E")8 g.AddVertex("F")9 g.AddVertex("G")10 g.AddVertex("H")11 g.AddVertex("I")12 g.AddVertex("J")13 g.AddVertex("K")14 g.AddVertex("L")15 g.AddVertex("M")16 g.AddVertex("N")17 g.AddVertex("O")18 g.AddVertex("P")19 g.AddVertex("Q")20 g.AddVertex("R")21 g.AddVertex("S")22 g.AddVertex("T")23 g.AddVertex("U")24 g.AddVertex("V")25 g.AddVertex("W")26 g.AddVertex("X")27 g.AddVertex("Y")28 g.AddVertex("Z")29 g.AddEdge("A", "B", 1)30 g.AddEdge("A", "C", 1)31 g.AddEdge("A", "D", 1)32 g.AddEdge("A", "E", 1)33 g.AddEdge("A", "F", 1)34 g.AddEdge("A", "G", 1)35 g.AddEdge("A", "H", 1)36 g.AddEdge("A", "I", 1)37 g.AddEdge("A", "J", 1)38 g.AddEdge("A", "K", 1)39 g.AddEdge("A", "L", 1)40 g.AddEdge("A", "M", 1)41 g.AddEdge("A", "N", 1)42 g.AddEdge("A", "O", 1)43 g.AddEdge("A", "P", 1)44 g.AddEdge("A", "Q", 1)45 g.AddEdge("A", "R", 1)46 g.AddEdge("A", "S", 1)47 g.AddEdge("A", "T", 1)48 g.AddEdge("A", "U", 1)49 g.AddEdge("A", "V", 1)50 g.AddEdge("A", "W", 1)51 g.AddEdge("A", "X", 1)52 g.AddEdge("A", "Y", 1)53 g.AddEdge("A", "Z", 1)54 fmt.Println(g)55}
Mutation
Using AI Code Generation
1import "fmt"2func main() {3 g := graph.CreateGraph()4 g.Mutation("1", "2", 1)5 g.Mutation("1", "3", 2)6 g.Mutation("1", "4", 3)7 g.Mutation("2", "5", 4)8 g.Mutation("2", "6", 5)9 g.Mutation("3", "7", 6)10 g.Mutation("3", "8", 7)11 g.Mutation("4", "9", 8)12 g.Mutation("4", "10", 9)13 g.Mutation("5", "11", 10)14 g.Mutation("5", "12", 11)15 g.Mutation("6", "13", 12)16 g.Mutation("6", "14", 13)17 g.Mutation("7", "15", 14)18 g.Mutation("7", "16", 15)19 g.Mutation("8", "17", 16)20 g.Mutation("8", "18", 17)21 g.Mutation("9", "19", 18)22 g.Mutation("9", "20", 19)23 g.Mutation("10", "21", 20)24 g.Mutation("10", "22", 21)25 g.Mutation("11", "23", 22)26 g.Mutation("11", "24", 23)27 g.Mutation("12", "25", 24)28 g.Mutation("12", "26", 25)29 g.Mutation("13", "27", 26)30 g.Mutation("13", "28", 27)31 g.Mutation("14", "29", 28)32 g.Mutation("14", "30", 29)33 g.Mutation("15", "31", 30)34 g.Mutation("15", "32", 31)35 g.Mutation("16", "33", 32)36 g.Mutation("16", "34", 33)37 g.Mutation("17", "35", 34)38 g.Mutation("17", "36", 35)39 g.Mutation("18", "37", 36)40 g.Mutation("18", "38", 37
Mutation
Using AI Code Generation
1import "fmt"2type Graph struct {3}4func (g *Graph) AddEdge(from, to string) {5 g.Edges[from] = append(g.Edges[from], to)6}7func (g *Graph) Mutation(from, to string) {8 g.AddEdge(from, to)9 g.AddEdge(to, from)10}11func main() {12 graph := Graph{Edges: make(map[string][]string)}13 graph.Mutation("A", "B")14 fmt.Println(graph)15}16{map[A:[B] B:[A]]}
Mutation
Using AI Code Generation
1import (2type Graph struct {3}4type Node struct {5}6func (g *Graph) Mutation(n1, n2 int) {7 node1.Children = append(node1.Children, node2)8 node2.Children = append(node2.Children, node1)9}10func (g *Graph) Print() {11 for i, node := range g.Nodes {12 fmt.Print(strconv.Itoa(node.Val) + " -> ")13 for _, child := range node.Children {14 fmt.Print(strconv.Itoa(child.Val) + " ")15 }16 fmt.Println()17 if i == len(g.Nodes)-1 {18 fmt.Println()19 }20 }21}22func main() {23 g := &Graph{}24 n1 := &Node{Val: 1}25 n2 := &Node{Val: 2}26 n3 := &Node{Val: 3}27 n4 := &Node{Val: 4}28 n5 := &Node{Val: 5}29 n6 := &Node{Val: 6}30 g.Nodes = append(g.Nodes, n1, n2, n3, n4, n5, n6)31 g.Mutation(0, 1)32 g.Mutation(0, 2)33 g.Mutation(1, 3)34 g.Mutation(1, 4)35 g.Mutation(2, 5)
Mutation
Using AI Code Generation
1import (2func main() {3 g.setNodes([]int{1, 2, 3, 4, 5, 6, 7, 8, 9})4 g.setEdges([][]int{{1, 2}, {1, 3}, {3, 4}, {3, 5}, {5, 6}, {5, 7}, {7, 8}, {7, 9}})5 g.printGraph()6 g.mutation()7 g.printGraph()8}9type graph struct {10}11func (g *graph) setNodes(nodes []int) {12}13func (g *graph) setEdges(edges [][]int) {14}15func (g graph) printGraph() {16 fmt.Println("Nodes:")17 for _, node := range g.nodes {18 fmt.Printf("%d ", node)19 }20 fmt.Println()21 fmt.Println("Edges:")22 for _, edge := range g.edges {23 fmt.Printf("%d %d\n", edge[0], edge[1])24 }25 fmt.Println()26}27func (g *graph) mutation() {28 rand.Seed(time.Now().UnixNano())29 n := len(g.nodes)30 m := len(g.edges)31 r := rand.Intn(n)32 for _, edge := range g.edges {
Mutation
Using AI Code Generation
1import (2type Graph struct {3}4func (g *Graph) AddNode(node int) {5 g.nodes[node] = make([]int, 0)6}7func (g *Graph) AddEdge(node1, node2 int) {8 g.nodes[node1] = append(g.nodes[node1], node2)9 g.nodes[node2] = append(g.nodes[node2], node1)10}11func (g *Graph) BFS(start int) map[int]int {12 visited := make(map[int]bool)13 distance := make(map[int]int)14 parent := make(map[int]int)15 queue := make([]int, 0)16 queue = append(queue, start)17 for len(queue) != 0 {18 for _, node := range g.nodes[current] {19 if !visited[node] {20 queue = append(queue, node)21 }22 }23 }24}25func (g *Graph) PrintPath(parent map[int]int, start, end int) {26 if start == end {27 fmt.Printf("%v ", start)28 }29 g.PrintPath(parent, start, parent[end])30 fmt.Printf("%v ", end)31}32func (g *Graph) PrintDistance(parent map[int]int, start, end int) {33 if start == end {34 fmt.Printf("0 ")35 }36 g.PrintDistance(parent, start, parent[end])37 fmt.Printf("%v ", end)38}39func (g *Graph) PrintEdges(parent map[int]int, start, end int) {40 if start == end {41 fmt.Printf("0 ")42 }43 g.PrintEdges(parent, start, parent[end])44 fmt.Printf("%v ", end)45}46func main() {
Mutation
Using AI Code Generation
1import (2type Graph struct {3}4func (g *Graph) AddEdge(u int, v int) {5 g.adj[u] = append(g.adj[u], v)6 g.adj[v] = append(g.adj[v], u)7}8func (g *Graph) AddVertex() {9 g.adj = append(g.adj, []int{})10}11func (g *Graph) Mutation() {12 rand.Seed(time.Now().UnixNano())13 randEdge := rand.Intn(g.edges)14 randVertex := rand.Intn(g.vertices)15 g.adj[randEdge] = append(g.adj[randEdge], randVertex)16 g.adj[randVertex] = append(g.adj[randVertex], randEdge)17}18func main() {19 g := new(Graph)20 file, _ := os.Open("input.txt")21 scanner := bufio.NewScanner(file)22 for scanner.Scan() {23 line := scanner.Text()24 if strings.Contains(line, " ") {25 edge := strings.Split(line, " ")26 u, _ := strconv.Atoi(edge[0])27 v, _ := strconv.Atoi(edge[1])28 g.AddEdge(u, v)29 } else {30 g.AddVertex()31 }32 }33 fmt.Println("The adjacency list representation of the graph: ")34 for i := 0; i < g.vertices; i++ {35 fmt.Println(i, ":", g.adj[i])36 }37 g.Mutation()38 fmt.Println("\nThe adjacency list representation of the mutated graph: ")39 for i := 0; i < g.vertices; i++ {40 fmt.Println(i, ":", g.adj[i])41 }42}
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!