Best K6 code snippet using executor.NewBaseExecutor
builder.go
Source:builder.go
1// Copyright 2015 PingCAP, Inc.2//3// Licensed under the Apache License, Version 2.0 (the "License");4// you may not use this file except in compliance with the License.5// You may obtain a copy of the License at6//7// http://www.apache.org/licenses/LICENSE-2.08//9// Unless required by applicable law or agreed to in writing, software10// distributed under the License is distributed on an "AS IS" BASIS,11// See the License for the specific language governing permissions and12// limitations under the License.13package executor14import (15 "context"16 "math"17 "sort"18 "sync"19 "time"20 "github.com/cznic/mathutil"21 "github.com/cznic/sortutil"22 "github.com/pingcap/errors"23 "github.com/pingcap/tidb/distsql"24 "github.com/pingcap/tidb/domain"25 "github.com/pingcap/tidb/executor/aggfuncs"26 "github.com/pingcap/tidb/expression"27 "github.com/pingcap/tidb/expression/aggregation"28 "github.com/pingcap/tidb/infoschema"29 "github.com/pingcap/tidb/parser/ast"30 "github.com/pingcap/tidb/parser/model"31 plannercore "github.com/pingcap/tidb/planner/core"32 "github.com/pingcap/tidb/sessionctx"33 "github.com/pingcap/tidb/table"34 "github.com/pingcap/tidb/types"35 "github.com/pingcap/tidb/util/admin"36 "github.com/pingcap/tidb/util/chunk"37 "github.com/pingcap/tipb/go-tipb"38)39// executorBuilder builds an Executor from a Plan.40// The InfoSchema must not change during execution.41type executorBuilder struct {42 ctx sessionctx.Context43 is infoschema.InfoSchema44 startTS uint64 // cached when the first time getStartTS() is called45 // err is set when there is error happened during Executor building process.46 err error47}48func newExecutorBuilder(ctx sessionctx.Context, is infoschema.InfoSchema) *executorBuilder {49 return &executorBuilder{50 ctx: ctx,51 is: is,52 }53}54// MockPhysicalPlan is used to return a specified executor in when build.55// It is mainly used for testing.56type MockPhysicalPlan interface {57 plannercore.PhysicalPlan58 GetExecutor() Executor59}60func (b *executorBuilder) build(p plannercore.Plan) Executor {61 switch v := p.(type) {62 case nil:63 return nil64 case *plannercore.DDL:65 return b.buildDDL(v)66 case *plannercore.Delete:67 return b.buildDelete(v)68 case *plannercore.Explain:69 return b.buildExplain(v)70 case *plannercore.Insert:71 return b.buildInsert(v)72 case *plannercore.PhysicalLimit:73 return b.buildLimit(v)74 case *plannercore.ShowDDL:75 return b.buildShowDDL(v)76 case *plannercore.PhysicalShowDDLJobs:77 return b.buildShowDDLJobs(v)78 case *plannercore.PhysicalShow:79 return b.buildShow(v)80 case *plannercore.Simple:81 return b.buildSimple(v)82 case *plannercore.Set:83 return b.buildSet(v)84 case *plannercore.PhysicalSort:85 return b.buildSort(v)86 case *plannercore.PhysicalTopN:87 return b.buildTopN(v)88 case *plannercore.PhysicalUnionScan:89 return b.buildUnionScanExec(v)90 case *plannercore.PhysicalHashJoin:91 return b.buildHashJoin(v)92 case *plannercore.PhysicalMergeJoin:93 return b.buildMergeJoin(v)94 case *plannercore.PhysicalSelection:95 return b.buildSelection(v)96 case *plannercore.PhysicalHashAgg:97 return b.buildHashAgg(v)98 case *plannercore.PhysicalProjection:99 return b.buildProjection(v)100 case *plannercore.PhysicalMemTable:101 return b.buildMemTable(v)102 case *plannercore.PhysicalTableDual:103 return b.buildTableDual(v)104 case *plannercore.Analyze:105 return b.buildAnalyze(v)106 case *plannercore.PhysicalTableReader:107 return b.buildTableReader(v)108 case *plannercore.PhysicalIndexReader:109 return b.buildIndexReader(v)110 case *plannercore.PhysicalIndexLookUpReader:111 return b.buildIndexLookUpReader(v)112 default:113 if mp, ok := p.(MockPhysicalPlan); ok {114 return mp.GetExecutor()115 }116 b.err = ErrUnknownPlan.GenWithStack("Unknown Plan %T", p)117 return nil118 }119}120func (b *executorBuilder) buildShowDDL(v *plannercore.ShowDDL) Executor {121 // We get DDLInfo here because for Executors that returns result set,122 // next will be called after transaction has been committed.123 // We need the transaction to get DDLInfo.124 e := &ShowDDLExec{125 baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),126 }127 var err error128 ownerManager := domain.GetDomain(e.ctx).DDL().OwnerManager()129 ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)130 e.ddlOwnerID, err = ownerManager.GetOwnerID(ctx)131 cancel()132 if err != nil {133 b.err = err134 return nil135 }136 txn, err := e.ctx.Txn(true)137 if err != nil {138 b.err = err139 return nil140 }141 ddlInfo, err := admin.GetDDLInfo(txn)142 if err != nil {143 b.err = err144 return nil145 }146 e.ddlInfo = ddlInfo147 e.selfID = ownerManager.ID()148 return e149}150func (b *executorBuilder) buildShowDDLJobs(v *plannercore.PhysicalShowDDLJobs) Executor {151 e := &ShowDDLJobsExec{152 jobNumber: v.JobNumber,153 is: b.is,154 baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),155 }156 return e157}158func (b *executorBuilder) buildLimit(v *plannercore.PhysicalLimit) Executor {159 childExec := b.build(v.Children()[0])160 if b.err != nil {161 return nil162 }163 n := int(mathutil.MinUint64(v.Count, uint64(b.ctx.GetSessionVars().MaxChunkSize)))164 base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), childExec)165 base.initCap = n166 e := &LimitExec{167 baseExecutor: base,168 begin: v.Offset,169 end: v.Offset + v.Count,170 }171 return e172}173func (b *executorBuilder) buildShow(v *plannercore.PhysicalShow) Executor {174 e := &ShowExec{175 baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),176 Tp: v.Tp,177 DBName: model.NewCIStr(v.DBName),178 Table: v.Table,179 Column: v.Column,180 IndexName: v.IndexName,181 IfNotExists: v.IfNotExists,182 Flag: v.Flag,183 Full: v.Full,184 GlobalScope: v.GlobalScope,185 is: b.is,186 }187 return e188}189func (b *executorBuilder) buildSimple(v *plannercore.Simple) Executor {190 base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID())191 base.initCap = chunk.ZeroCapacity192 e := &SimpleExec{193 baseExecutor: base,194 Statement: v.Statement,195 is: b.is,196 }197 return e198}199func (b *executorBuilder) buildSet(v *plannercore.Set) Executor {200 base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID())201 base.initCap = chunk.ZeroCapacity202 e := &SetExecutor{203 baseExecutor: base,204 vars: v.VarAssigns,205 }206 return e207}208func (b *executorBuilder) buildInsert(v *plannercore.Insert) Executor {209 b.startTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS()210 selectExec := b.build(v.SelectPlan)211 if b.err != nil {212 return nil213 }214 var baseExec baseExecutor215 if selectExec != nil {216 baseExec = newBaseExecutor(b.ctx, nil, v.ExplainID(), selectExec)217 } else {218 baseExec = newBaseExecutor(b.ctx, nil, v.ExplainID())219 }220 baseExec.initCap = chunk.ZeroCapacity221 ivs := &InsertValues{222 baseExecutor: baseExec,223 Table: v.Table,224 Columns: v.Columns,225 Lists: v.Lists,226 SetList: v.SetList,227 allAssignmentsAreConstant: v.AllAssignmentsAreConstant,228 hasRefCols: v.NeedFillDefaultValue,229 SelectExec: selectExec,230 }231 err := ivs.initInsertColumns()232 if err != nil {233 b.err = err234 return nil235 }236 if v.IsReplace {237 return b.buildReplace(ivs)238 }239 insert := &InsertExec{240 InsertValues: ivs,241 }242 return insert243}244func (b *executorBuilder) buildReplace(vals *InsertValues) Executor {245 replaceExec := &ReplaceExec{246 InsertValues: vals,247 }248 return replaceExec249}250func (b *executorBuilder) buildDDL(v *plannercore.DDL) Executor {251 e := &DDLExec{252 baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),253 stmt: v.Statement,254 is: b.is,255 }256 return e257}258// buildExplain builds a explain executor. `e.rows` collects final result to `ExplainExec`.259func (b *executorBuilder) buildExplain(v *plannercore.Explain) Executor {260 explainExec := &ExplainExec{261 baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),262 explain: v,263 }264 return explainExec265}266func (b *executorBuilder) buildUnionScanExec(v *plannercore.PhysicalUnionScan) Executor {267 reader := b.build(v.Children()[0])268 if b.err != nil {269 return nil270 }271 return b.buildUnionScanFromReader(reader, v)272}273// buildUnionScanFromReader builds union scan executor from child executor.274// Note that this function may be called by inner workers of index lookup join concurrently.275// Be careful to avoid data race.276func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannercore.PhysicalUnionScan) Executor {277 us := &UnionScanExec{baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), reader)}278 // Get the handle column index of the below Plan.279 us.belowHandleIndex = v.HandleCol.Index280 us.mutableRow = chunk.MutRowFromTypes(retTypes(us))281 switch x := reader.(type) {282 case *TableReaderExecutor:283 us.desc = x.desc284 // Union scan can only be in a write transaction, so DirtyDB should has non-nil value now, thus285 // GetDirtyDB() is safe here. If this table has been modified in the transaction, non-nil DirtyTable286 // can be found in DirtyDB now, so GetDirtyTable is safe; if this table has not been modified in the287 // transaction, empty DirtyTable would be inserted into DirtyDB, it does not matter when multiple288 // goroutines write empty DirtyTable to DirtyDB for this table concurrently. Although the DirtyDB looks289 // safe for data race in all the cases, the map of golang will throw panic when it's accessed in parallel.290 // So we lock it when getting dirty table.291 physicalTableID := getPhysicalTableID(x.table)292 us.dirty = GetDirtyDB(b.ctx).GetDirtyTable(physicalTableID)293 us.conditions = v.Conditions294 us.columns = x.columns295 us.table = x.table296 case *IndexReaderExecutor:297 us.desc = x.desc298 for _, ic := range x.index.Columns {299 for i, col := range x.columns {300 if col.Name.L == ic.Name.L {301 us.usedIndex = append(us.usedIndex, i)302 break303 }304 }305 }306 physicalTableID := getPhysicalTableID(x.table)307 us.dirty = GetDirtyDB(b.ctx).GetDirtyTable(physicalTableID)308 us.conditions = v.Conditions309 us.columns = x.columns310 us.table = x.table311 case *IndexLookUpExecutor:312 us.desc = x.desc313 for _, ic := range x.index.Columns {314 for i, col := range x.columns {315 if col.Name.L == ic.Name.L {316 us.usedIndex = append(us.usedIndex, i)317 break318 }319 }320 }321 physicalTableID := getPhysicalTableID(x.table)322 us.dirty = GetDirtyDB(b.ctx).GetDirtyTable(physicalTableID)323 us.conditions = v.Conditions324 us.columns = x.columns325 us.table = x.table326 default:327 // The mem table will not be written by sql directly, so we can omit the union scan to avoid err reporting.328 return reader329 }330 return us331}332// buildMergeJoin builds MergeJoinExec executor.333func (b *executorBuilder) buildMergeJoin(v *plannercore.PhysicalMergeJoin) Executor {334 leftExec := b.build(v.Children()[0])335 if b.err != nil {336 return nil337 }338 rightExec := b.build(v.Children()[1])339 if b.err != nil {340 return nil341 }342 defaultValues := v.DefaultValues343 if defaultValues == nil {344 if v.JoinType == plannercore.RightOuterJoin {345 defaultValues = make([]types.Datum, leftExec.Schema().Len())346 } else {347 defaultValues = make([]types.Datum, rightExec.Schema().Len())348 }349 }350 e := &MergeJoinExec{351 stmtCtx: b.ctx.GetSessionVars().StmtCtx,352 baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), leftExec, rightExec),353 compareFuncs: v.CompareFuncs,354 joiner: newJoiner(355 b.ctx,356 v.JoinType,357 v.JoinType == plannercore.RightOuterJoin,358 defaultValues,359 v.OtherConditions,360 retTypes(leftExec),361 retTypes(rightExec),362 ),363 isOuterJoin: v.JoinType.IsOuterJoin(),364 }365 leftKeys := v.LeftJoinKeys366 rightKeys := v.RightJoinKeys367 e.outerIdx = 0368 innerFilter := v.RightConditions369 e.innerTable = &mergeJoinInnerTable{370 reader: rightExec,371 joinKeys: rightKeys,372 }373 e.outerTable = &mergeJoinOuterTable{374 reader: leftExec,375 filter: v.LeftConditions,376 keys: leftKeys,377 }378 if v.JoinType == plannercore.RightOuterJoin {379 e.outerIdx = 1380 e.outerTable.reader = rightExec381 e.outerTable.filter = v.RightConditions382 e.outerTable.keys = rightKeys383 innerFilter = v.LeftConditions384 e.innerTable.reader = leftExec385 e.innerTable.joinKeys = leftKeys386 }387 // optimizer should guarantee that filters on inner table are pushed down388 // to tikv or extracted to a Selection.389 if len(innerFilter) != 0 {390 b.err = errors.Annotate(ErrBuildExecutor, "merge join's inner filter should be empty.")391 return nil392 }393 return e394}395func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executor {396 leftExec := b.build(v.Children()[0])397 if b.err != nil {398 return nil399 }400 rightExec := b.build(v.Children()[1])401 if b.err != nil {402 return nil403 }404 e := &HashJoinExec{405 baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), leftExec, rightExec),406 concurrency: v.Concurrency,407 joinType: v.JoinType,408 innerSideEstCount: v.Children()[v.InnerChildIdx].StatsCount(),409 }410 defaultValues := v.DefaultValues411 lhsTypes, rhsTypes := retTypes(leftExec), retTypes(rightExec)412 if v.InnerChildIdx == 0 {413 if len(v.LeftConditions) > 0 {414 b.err = errors.Annotate(ErrBuildExecutor, "join's inner condition should be empty")415 return nil416 }417 e.innerSideExec = leftExec418 e.outerSideExec = rightExec419 e.outerSideFilter = v.RightConditions420 e.innerKeys = v.LeftJoinKeys421 e.outerKeys = v.RightJoinKeys422 if defaultValues == nil {423 defaultValues = make([]types.Datum, e.innerSideExec.Schema().Len())424 }425 } else {426 if len(v.RightConditions) > 0 {427 b.err = errors.Annotate(ErrBuildExecutor, "join's inner condition should be empty")428 return nil429 }430 e.innerSideExec = rightExec431 e.outerSideExec = leftExec432 e.outerSideFilter = v.LeftConditions433 e.innerKeys = v.RightJoinKeys434 e.outerKeys = v.LeftJoinKeys435 if defaultValues == nil {436 defaultValues = make([]types.Datum, e.innerSideExec.Schema().Len())437 }438 }439 e.joiners = make([]joiner, e.concurrency)440 for i := uint(0); i < e.concurrency; i++ {441 e.joiners[i] = newJoiner(b.ctx, v.JoinType, v.InnerChildIdx == 0, defaultValues,442 v.OtherConditions, lhsTypes, rhsTypes)443 }444 return e445}446func (b *executorBuilder) buildHashAgg(v *plannercore.PhysicalHashAgg) Executor {447 src := b.build(v.Children()[0])448 if b.err != nil {449 return nil450 }451 sessionVars := b.ctx.GetSessionVars()452 e := &HashAggExec{453 baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), src),454 sc: sessionVars.StmtCtx,455 PartialAggFuncs: make([]aggfuncs.AggFunc, 0, len(v.AggFuncs)),456 GroupByItems: v.GroupByItems,457 }458 // We take `create table t(a int, b int);` as example.459 //460 // 1. If all the aggregation functions are FIRST_ROW, we do not need to set the defaultVal for them:461 // e.g.462 // mysql> select distinct a, b from t;463 // 0 rows in set (0.00 sec)464 //465 // 2. If there exists group by items, we do not need to set the defaultVal for them either:466 // e.g.467 // mysql> select avg(a) from t group by b;468 // Empty set (0.00 sec)469 //470 // mysql> select avg(a) from t group by a;471 // +--------+472 // | avg(a) |473 // +--------+474 // | NULL |475 // +--------+476 // 1 row in set (0.00 sec)477 if len(v.GroupByItems) != 0 || aggregation.IsAllFirstRow(v.AggFuncs) {478 e.defaultVal = nil479 } else {480 e.defaultVal = chunk.NewChunkWithCapacity(retTypes(e), 1)481 }482 partialOrdinal := 0483 for i, aggDesc := range v.AggFuncs {484 ordinal := []int{partialOrdinal}485 partialOrdinal++486 if aggDesc.Name == ast.AggFuncAvg {487 ordinal = append(ordinal, partialOrdinal+1)488 partialOrdinal++489 }490 partialAggDesc, finalDesc := aggDesc.Split(ordinal)491 partialAggFunc := aggfuncs.Build(b.ctx, partialAggDesc, i)492 finalAggFunc := aggfuncs.Build(b.ctx, finalDesc, i)493 e.PartialAggFuncs = append(e.PartialAggFuncs, partialAggFunc)494 e.FinalAggFuncs = append(e.FinalAggFuncs, finalAggFunc)495 if e.defaultVal != nil {496 value := aggDesc.GetDefaultValue()497 e.defaultVal.AppendDatum(i, &value)498 }499 }500 return e501}502func (b *executorBuilder) buildSelection(v *plannercore.PhysicalSelection) Executor {503 childExec := b.build(v.Children()[0])504 if b.err != nil {505 return nil506 }507 e := &SelectionExec{508 baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), childExec),509 filters: v.Conditions,510 }511 return e512}513func (b *executorBuilder) buildProjection(v *plannercore.PhysicalProjection) Executor {514 childExec := b.build(v.Children()[0])515 if b.err != nil {516 return nil517 }518 e := &ProjectionExec{519 baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), childExec),520 numWorkers: b.ctx.GetSessionVars().ProjectionConcurrency,521 evaluatorSuit: expression.NewEvaluatorSuite(v.Exprs),522 }523 // If the calculation row count for this Projection operator is smaller524 // than a Chunk size, we turn back to the un-parallel Projection525 // implementation to reduce the goroutine overhead.526 if int64(v.StatsCount()) < int64(b.ctx.GetSessionVars().MaxChunkSize) {527 e.numWorkers = 0528 }529 return e530}531func (b *executorBuilder) buildTableDual(v *plannercore.PhysicalTableDual) Executor {532 if v.RowCount != 0 && v.RowCount != 1 {533 b.err = errors.Errorf("buildTableDual failed, invalid row count for dual table: %v", v.RowCount)534 return nil535 }536 base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID())537 base.initCap = v.RowCount538 e := &TableDualExec{539 baseExecutor: base,540 numDualRows: v.RowCount,541 }542 return e543}544func (b *executorBuilder) getStartTS() (uint64, error) {545 if b.startTS != 0 {546 // Return the cached value.547 return b.startTS, nil548 }549 txn, err := b.ctx.Txn(true)550 if err != nil {551 return 0, err552 }553 b.startTS = txn.StartTS()554 if b.startTS == 0 {555 return 0, errors.Trace(ErrGetStartTS)556 }557 return b.startTS, nil558}559func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executor {560 var e Executor561 tb, _ := b.is.TableByID(v.Table.ID)562 e = &TableScanExec{563 baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),564 t: tb,565 columns: v.Columns,566 seekHandle: math.MinInt64,567 isVirtualTable: !tb.Type().IsNormalTable(),568 }569 return e570}571func (b *executorBuilder) buildSort(v *plannercore.PhysicalSort) Executor {572 childExec := b.build(v.Children()[0])573 if b.err != nil {574 return nil575 }576 sortExec := SortExec{577 baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), childExec),578 ByItems: v.ByItems,579 schema: v.Schema(),580 }581 return &sortExec582}583func (b *executorBuilder) buildTopN(v *plannercore.PhysicalTopN) Executor {584 childExec := b.build(v.Children()[0])585 if b.err != nil {586 return nil587 }588 sortExec := SortExec{589 baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), childExec),590 ByItems: v.ByItems,591 schema: v.Schema(),592 }593 return &TopNExec{594 SortExec: sortExec,595 limit: &plannercore.PhysicalLimit{Count: v.Count, Offset: v.Offset},596 }597}598func (b *executorBuilder) buildDelete(v *plannercore.Delete) Executor {599 tblID2table := make(map[int64]table.Table)600 for _, info := range v.TblColPosInfos {601 tblID2table[info.TblID], _ = b.is.TableByID(info.TblID)602 }603 b.startTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS()604 selExec := b.build(v.SelectPlan)605 if b.err != nil {606 return nil607 }608 base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), selExec)609 base.initCap = chunk.ZeroCapacity610 deleteExec := &DeleteExec{611 baseExecutor: base,612 tblID2Table: tblID2table,613 tblColPosInfos: v.TblColPosInfos,614 }615 return deleteExec616}617func (b *executorBuilder) buildAnalyzeIndexPushdown(task plannercore.AnalyzeIndexTask) *analyzeTask {618 sc := b.ctx.GetSessionVars().StmtCtx619 e := &AnalyzeIndexExec{620 ctx: b.ctx,621 physicalTableID: task.PhysicalTableID,622 idxInfo: task.IndexInfo,623 concurrency: b.ctx.GetSessionVars().IndexSerialScanConcurrency,624 analyzePB: &tipb.AnalyzeReq{625 Tp: tipb.AnalyzeType_TypeIndex,626 Flags: sc.PushDownFlags(),627 },628 }629 e.analyzePB.IdxReq = &tipb.AnalyzeIndexReq{630 BucketSize: int64(defaultNumBuckets),631 NumColumns: int32(len(task.IndexInfo.Columns)),632 }633 depth := int32(defaultCMSketchDepth)634 width := int32(defaultCMSketchWidth)635 e.analyzePB.IdxReq.CmsketchDepth = &depth636 e.analyzePB.IdxReq.CmsketchWidth = &width637 return &analyzeTask{taskType: idxTask, idxExec: e}638}639func (b *executorBuilder) buildAnalyzeColumnsPushdown(task plannercore.AnalyzeColumnsTask) *analyzeTask {640 cols := task.ColsInfo641 if task.PKInfo != nil {642 cols = append([]*model.ColumnInfo{task.PKInfo}, cols...)643 }644 sc := b.ctx.GetSessionVars().StmtCtx645 e := &AnalyzeColumnsExec{646 ctx: b.ctx,647 physicalTableID: task.PhysicalTableID,648 colsInfo: task.ColsInfo,649 pkInfo: task.PKInfo,650 concurrency: b.ctx.GetSessionVars().DistSQLScanConcurrency,651 analyzePB: &tipb.AnalyzeReq{652 Tp: tipb.AnalyzeType_TypeColumn,653 Flags: sc.PushDownFlags(),654 },655 }656 depth := int32(defaultCMSketchDepth)657 width := int32(defaultCMSketchWidth)658 e.analyzePB.ColReq = &tipb.AnalyzeColumnsReq{659 BucketSize: int64(defaultNumBuckets),660 SampleSize: maxRegionSampleSize,661 SketchSize: maxSketchSize,662 ColumnsInfo: model.ColumnsToProto(cols, task.PKInfo != nil),663 CmsketchDepth: &depth,664 CmsketchWidth: &width,665 }666 b.err = plannercore.SetPBColumnsDefaultValue(b.ctx, e.analyzePB.ColReq.ColumnsInfo, cols)667 return &analyzeTask{taskType: colTask, colExec: e}668}669func (b *executorBuilder) buildAnalyze(v *plannercore.Analyze) Executor {670 e := &AnalyzeExec{671 baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),672 tasks: make([]*analyzeTask, 0, len(v.ColTasks)+len(v.IdxTasks)),673 wg: &sync.WaitGroup{},674 }675 for _, task := range v.ColTasks {676 e.tasks = append(e.tasks, b.buildAnalyzeColumnsPushdown(task))677 if b.err != nil {678 return nil679 }680 }681 for _, task := range v.IdxTasks {682 e.tasks = append(e.tasks, b.buildAnalyzeIndexPushdown(task))683 if b.err != nil {684 return nil685 }686 }687 return e688}689func constructDistExec(sctx sessionctx.Context, plans []plannercore.PhysicalPlan) ([]*tipb.Executor, error) {690 executors := make([]*tipb.Executor, 0, len(plans))691 for _, p := range plans {692 execPB, err := p.ToPB(sctx)693 if err != nil {694 return nil, err695 }696 executors = append(executors, execPB)697 }698 return executors, nil699}700func (b *executorBuilder) constructDAGReq(plans []plannercore.PhysicalPlan) (dagReq *tipb.DAGRequest, err error) {701 dagReq = &tipb.DAGRequest{}702 sc := b.ctx.GetSessionVars().StmtCtx703 dagReq.Flags = sc.PushDownFlags()704 dagReq.Executors, err = constructDistExec(b.ctx, plans)705 return dagReq, err706}707func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableReader) (*TableReaderExecutor, error) {708 dagReq, err := b.constructDAGReq(v.TablePlans)709 if err != nil {710 return nil, err711 }712 ts := v.TablePlans[0].(*plannercore.PhysicalTableScan)713 tbl, _ := b.is.TableByID(ts.Table.ID)714 startTS, err := b.getStartTS()715 if err != nil {716 return nil, err717 }718 e := &TableReaderExecutor{719 baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),720 dagPB: dagReq,721 startTS: startTS,722 table: tbl,723 keepOrder: ts.KeepOrder,724 desc: ts.Desc,725 columns: ts.Columns,726 plans: v.TablePlans,727 }728 for i := range v.Schema().Columns {729 dagReq.OutputOffsets = append(dagReq.OutputOffsets, uint32(i))730 }731 return e, nil732}733// buildTableReader builds a table reader executor. It first build a no range table reader,734// and then update it ranges from table scan plan.735func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) *TableReaderExecutor {736 ret, err := buildNoRangeTableReader(b, v)737 if err != nil {738 b.err = err739 return nil740 }741 ts := v.TablePlans[0].(*plannercore.PhysicalTableScan)742 ret.ranges = ts.Ranges743 sctx := b.ctx.GetSessionVars().StmtCtx744 sctx.TableIDs = append(sctx.TableIDs, ts.Table.ID)745 return ret746}747func buildNoRangeIndexReader(b *executorBuilder, v *plannercore.PhysicalIndexReader) (*IndexReaderExecutor, error) {748 dagReq, err := b.constructDAGReq(v.IndexPlans)749 if err != nil {750 return nil, err751 }752 is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan)753 tbl, _ := b.is.TableByID(is.Table.ID)754 startTS, err := b.getStartTS()755 if err != nil {756 return nil, err757 }758 e := &IndexReaderExecutor{759 baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),760 dagPB: dagReq,761 startTS: startTS,762 physicalTableID: is.Table.ID,763 table: tbl,764 index: is.Index,765 keepOrder: is.KeepOrder,766 desc: is.Desc,767 columns: is.Columns,768 idxCols: is.IdxCols,769 colLens: is.IdxColLens,770 plans: v.IndexPlans,771 outputColumns: v.OutputColumns,772 }773 for _, col := range v.OutputColumns {774 dagReq.OutputOffsets = append(dagReq.OutputOffsets, uint32(col.Index))775 }776 return e, nil777}778func (b *executorBuilder) buildIndexReader(v *plannercore.PhysicalIndexReader) *IndexReaderExecutor {779 ret, err := buildNoRangeIndexReader(b, v)780 if err != nil {781 b.err = err782 return nil783 }784 is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan)785 ret.ranges = is.Ranges786 sctx := b.ctx.GetSessionVars().StmtCtx787 sctx.IndexNames = append(sctx.IndexNames, is.Table.Name.O+":"+is.Index.Name.O)788 return ret789}790func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIndexLookUpReader) (*IndexLookUpExecutor, error) {791 indexReq, err := b.constructDAGReq(v.IndexPlans)792 if err != nil {793 return nil, err794 }795 tableReq, err := b.constructDAGReq(v.TablePlans)796 if err != nil {797 return nil, err798 }799 is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan)800 indexReq.OutputOffsets = []uint32{uint32(len(is.Index.Columns))}801 tbl, _ := b.is.TableByID(is.Table.ID)802 for i := 0; i < v.Schema().Len(); i++ {803 tableReq.OutputOffsets = append(tableReq.OutputOffsets, uint32(i))804 }805 ts := v.TablePlans[0].(*plannercore.PhysicalTableScan)806 startTS, err := b.getStartTS()807 if err != nil {808 return nil, err809 }810 e := &IndexLookUpExecutor{811 baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),812 dagPB: indexReq,813 startTS: startTS,814 table: tbl,815 index: is.Index,816 keepOrder: is.KeepOrder,817 desc: is.Desc,818 tableRequest: tableReq,819 columns: ts.Columns,820 dataReaderBuilder: &dataReaderBuilder{executorBuilder: b},821 idxCols: is.IdxCols,822 colLens: is.IdxColLens,823 idxPlans: v.IndexPlans,824 tblPlans: v.TablePlans,825 }826 if v.ExtraHandleCol != nil {827 e.handleIdx = v.ExtraHandleCol.Index828 }829 return e, nil830}831func (b *executorBuilder) buildIndexLookUpReader(v *plannercore.PhysicalIndexLookUpReader) *IndexLookUpExecutor {832 ret, err := buildNoRangeIndexLookUpReader(b, v)833 if err != nil {834 b.err = err835 return nil836 }837 is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan)838 ts := v.TablePlans[0].(*plannercore.PhysicalTableScan)839 ret.ranges = is.Ranges840 sctx := b.ctx.GetSessionVars().StmtCtx841 sctx.IndexNames = append(sctx.IndexNames, is.Table.Name.O+":"+is.Index.Name.O)842 sctx.TableIDs = append(sctx.TableIDs, ts.Table.ID)843 return ret844}845// dataReaderBuilder build an executor.846// The executor can be used to read data in the ranges which are constructed by datums.847// Differences from executorBuilder:848// 1. dataReaderBuilder calculate data range from argument, rather than plan.849// 2. the result executor is already opened.850type dataReaderBuilder struct {851 plannercore.Plan852 *executorBuilder853}854func (builder *dataReaderBuilder) buildTableReaderFromHandles(ctx context.Context, e *TableReaderExecutor, handles []int64) (Executor, error) {855 if e.dagPB.CollectExecutionSummaries == nil {856 colExec := true857 e.dagPB.CollectExecutionSummaries = &colExec858 }859 startTS, err := builder.getStartTS()860 if err != nil {861 return nil, err862 }863 sort.Sort(sortutil.Int64Slice(handles))864 var b distsql.RequestBuilder865 kvReq, err := b.SetTableHandles(getPhysicalTableID(e.table), handles).866 SetDAGRequest(e.dagPB).867 SetStartTS(startTS).868 SetDesc(e.desc).869 SetKeepOrder(e.keepOrder).870 SetFromSessionVars(e.ctx.GetSessionVars()).871 Build()872 if err != nil {873 return nil, err874 }875 e.kvRanges = append(e.kvRanges, kvReq.KeyRanges...)876 e.resultHandler = &tableResultHandler{}877 result, err := distsql.Select(ctx, builder.ctx, kvReq, retTypes(e))878 if err != nil {879 return nil, err880 }881 e.resultHandler.open(nil, result)882 return e, nil883}884func getPhysicalTableID(t table.Table) int64 {885 if p, ok := t.(table.PhysicalTable); ok {886 return p.GetPhysicalID()887 }888 return t.Meta().ID889}...
build.go
Source:build.go
1package executor2import (3 "context"4 "github.com/cznic/mathutil"5 "github.com/pingcap/errors"6 "github.com/pingcap/parser/ast"7 "github.com/pingcap/parser/model"8 "github.com/pingcap/tidb/types"9 "github.com/pingcap/tidb/util/chunk"10 "github.com/pingcap/tidb/util/stringutil"11 "github.com/zhihu/zetta/tablestore/domain"12 "github.com/zhihu/zetta/tablestore/infoschema"13 "github.com/zhihu/zetta/tablestore/mysql/executor/aggfuncs"14 "github.com/zhihu/zetta/tablestore/mysql/expression"15 "github.com/zhihu/zetta/tablestore/mysql/planner"16 "github.com/zhihu/zetta/tablestore/mysql/sctx"17 "github.com/zhihu/zetta/tablestore/table"18)19// recordSet wraps an executor, implements sqlexec.RecordSet interface20type recordSet struct {21 fields []*ast.ResultField22 executorBuilder *ExecutorBuilder23 executor Executor24 lastErr error25 txnStartTS uint6426}27func (a *recordSet) Fields() []*ast.ResultField {28 if len(a.fields) == 0 {29 a.fields = colNames2ResultFields(a.executor.Schema(), a.executorBuilder.OutputNames, a.executorBuilder.Ctx.GetSessionVars().CurrentDB)30 }31 return a.fields32}33func colNames2ResultFields(schema *expression.Schema, names []*types.FieldName, defaultDB string) []*ast.ResultField {34 rfs := make([]*ast.ResultField, 0, schema.Len())35 defaultDBCIStr := model.NewCIStr(defaultDB)36 for i := 0; i < schema.Len(); i++ {37 dbName := names[i].DBName38 if dbName.L == "" && names[i].TblName.L != "" {39 dbName = defaultDBCIStr40 }41 origColName := names[i].OrigColName42 if origColName.L == "" {43 origColName = names[i].ColName44 }45 rf := &ast.ResultField{46 Column: &model.ColumnInfo{Name: origColName, FieldType: *schema.Columns[i].RetType},47 ColumnAsName: names[i].ColName,48 Table: &model.TableInfo{Name: names[i].OrigTblName},49 TableAsName: names[i].TblName,50 DBName: dbName,51 }52 rfs = append(rfs, rf)53 }54 return rfs55}56// Next use uses recordSet's executor to get next available chunk for later usage.57// If chunk does not contain any rows, then we update last query found rows in session variable as current found rows.58// The reason we need update is that chunk with 0 rows indicating we already finished current query, we need prepare for59// next query.60// If stmt is not nil and chunk with some rows inside, we simply update last query found rows by the number of row in chunk.61func (a *recordSet) Next(ctx context.Context, req *chunk.Chunk) (err error) {62 err = Next(ctx, a.executor, req)63 if err != nil {64 a.lastErr = err65 return err66 }67 numRows := req.NumRows()68 if numRows == 0 {69 return nil70 }71 return nil72}73// NewChunk create a chunk base on top-level executor's newFirstChunk().74func (a *recordSet) NewChunk() *chunk.Chunk {75 return NewFirstChunk(a.executor)76}77func (a *recordSet) Close() error {78 err := a.executor.Close()79 if err != nil {80 return err81 }82 //sessVars := a.executorBuilder.Ctx.GetSessionVars()83 //if !sessVars.InTxn() {84 err = a.executorBuilder.Ctx.CommitTxnWrapper(context.Background())85 //}86 return err87}88type Compiler struct {89 Ctx sctx.Context90}91func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecutorBuilder, error) {92 is := domain.GetOnlyDomain().InfoSchema()93 planBuilder := planner.NewPlanBuilder(c.Ctx, is)94 plan, err := planBuilder.Build(ctx, stmtNode)95 if err != nil {96 return nil, err97 }98 eb := &ExecutorBuilder{99 Plan: plan,100 Ctx: c.Ctx,101 is: is,102 StmtNode: stmtNode,103 }104 if plan != nil {105 eb.OutputNames = plan.OutputNames()106 }107 return eb, nil108}109type ExecutorBuilder struct {110 Plan planner.Plan111 Ctx sctx.Context112 StmtNode ast.StmtNode113 is infoschema.InfoSchema114 OutputNames types.NameSlice115}116func (eb *ExecutorBuilder) Build(ctx context.Context) (*recordSet, error) {117 e, err := eb.build(eb.Plan)118 if err != nil {119 return nil, err120 }121 err = e.Open(ctx)122 if err != nil {123 return nil, err124 }125 return &recordSet{126 executor: e,127 executorBuilder: eb,128 }, nil129}130func (eb *ExecutorBuilder) build(p planner.Plan) (Executor, error) {131 switch v := p.(type) {132 case *planner.LogicalShow:133 return eb.buildShow(v)134 case *planner.LogicalProjection:135 return eb.buildProjection(v)136 case *planner.LogicalSelection:137 return eb.buildSelection(v)138 case *planner.DataSource:139 return eb.buildTableScan(v)140 case *planner.LogicalTableDual:141 return eb.buildTableDual(v)142 case *planner.Simple:143 return eb.buildSimple(v)144 case *planner.DDL:145 return eb.buildDDL(v)146 case *planner.Insert:147 return eb.buildInsert(v)148 case *planner.Delete:149 return eb.buildDelete(v)150 case *planner.LogicalUpdate:151 return eb.buildUpdate(v)152 case *planner.LogicalLimit:153 return eb.buildLimit(v)154 case *planner.LogicalSort:155 return eb.buildSort(v)156 case *planner.LogicalAggregation:157 return eb.buildAgg(v)158 case *planner.Set:159 return eb.buildSet(v)160 case *planner.SplitRegion:161 return eb.buildSplitRegion(v)162 case *planner.BatchPointGetPlan:163 return eb.buildBatchPointGetPlan(v)164 case *planner.SimpleScanPlan:165 return eb.buildSimpleScanPlan(v)166 default:167 return eb.buildFaker(v)168 }169 return nil, errors.New("Unsupported plan")170}171func (eb *ExecutorBuilder) buildSimpleScanPlan(v *planner.SimpleScanPlan) (Executor, error) {172 cols := make([]*table.Column, v.Schema().Len())173 for i, ecol := range v.Schema().Columns {174 col := v.Table.Meta().FindColumnByName(ecol.OrigName)175 cols[i] = &table.Column{col}176 }177 e := &SimpleScanExecutor{178 baseExecutor: newBaseExecutor(eb.Ctx, v.Schema(), v.ExplainID()),179 indexMeta: v.IndexMeta,180 //indexValue: v.IndexValue,181 table: v.Table,182 cols: cols,183 uppers: v.Upper,184 lowers: v.Lower,185 //desc: v.Desc,186 limit: v.Limit,187 }188 return e, nil189}190func (eb *ExecutorBuilder) buildBatchPointGetPlan(v *planner.BatchPointGetPlan) (Executor, error) {191 cols := make([]*table.Column, v.Schema().Len())192 for i, ecol := range v.Schema().Columns {193 col := v.Table.Meta().FindColumnByName(ecol.OrigName)194 cols[i] = &table.Column{col}195 }196 e := &BatchPointGetExecutor{197 baseExecutor: newBaseExecutor(eb.Ctx, v.Schema(), v.ExplainID()),198 indexMeta: v.IndexMeta,199 indexValues: v.IndexValues,200 table: v.Table,201 cols: cols,202 delete: v.Delete,203 }204 return e, nil205}206func (b *ExecutorBuilder) buildSplitRegion(v *planner.SplitRegion) (Executor, error) {207 e := &SplitIndexRegionExec{208 baseExecutor: newBaseExecutor(b.Ctx, v.Schema(), v.ExplainID()),209 tableInfo: v.TableMeta,210 indexInfo: v.IndexMeta,211 lower: v.Lower,212 upper: v.Upper,213 valueLists: v.ValueLists,214 num: v.Num,215 }216 return e, nil217}218func (b *ExecutorBuilder) buildSet(v *planner.Set) (Executor, error) {219 return &SetExec{220 baseExecutor: newBaseExecutor(b.Ctx, v.Schema(), v.ExplainID()),221 vars: v.VarAssigns,222 }, nil223}224func (b *ExecutorBuilder) buildAgg(v *planner.LogicalAggregation) (Executor, error) {225 //childExec, err := b.build(v.Children()[0])226 //if err != nil {227 // return nil, err228 //}229 childExec := &ParallelScanExecutor{230 baseExecutor: newBaseExecutor(b.Ctx, v.Schema(), v.ExplainID()),231 tbl: v.Table,232 }233 aggFuncs := make([]aggfuncs.AggFunc, len(v.AggFuncs))234 for i, agg := range v.AggFuncs {235 af := aggfuncs.Build(b.Ctx, agg, i)236 aggFuncs[i] = af237 }238 return &AggExec{239 baseExecutor: newBaseExecutor(b.Ctx, v.Schema(), v.ExplainID(), childExec),240 AggFuncs: aggFuncs,241 }, nil242}243func (b *ExecutorBuilder) buildSort(v *planner.LogicalSort) (Executor, error) {244 childExec, err := b.build(v.Children()[0])245 if err != nil {246 return nil, err247 }248 sortExec := SortExec{249 baseExecutor: newBaseExecutor(b.Ctx, v.Schema(), v.ExplainID(), childExec),250 ByItems: v.ByItems,251 schema: v.Schema(),252 }253 return &sortExec, nil254}255func (b *ExecutorBuilder) buildLimit(v *planner.LogicalLimit) (Executor, error) {256 childExec, err := b.build(v.Children()[0])257 if err != nil {258 return nil, err259 }260 n := int(mathutil.MinUint64(v.Count, uint64(b.Ctx.GetSessionVars().MaxChunkSize)))261 base := newBaseExecutor(b.Ctx, v.Schema(), v.ExplainID(), childExec)262 base.initCap = n263 e := &LimitExec{264 baseExecutor: base,265 begin: v.Offset,266 end: v.Offset + v.Count,267 }268 return e, nil269}270func (eb *ExecutorBuilder) buildUpdate(p *planner.LogicalUpdate) (Executor, error) {271 updateExec := &UpdateExec{272 baseExecutor: newBaseExecutor(eb.Ctx, p.Schema(), p.ExplainID()),273 tbl: p.Table,274 values: p.ColValues,275 }276 if p.DataSource != nil {277 selExec, err := eb.build(p.DataSource)278 if err != nil {279 return nil, err280 }281 updateExec.children = append(updateExec.children, selExec)282 }283 return updateExec, nil284}285func (eb *ExecutorBuilder) buildDelete(p *planner.Delete) (Executor, error) {286 delExec := &DeleteExec{287 baseExecutor: newBaseExecutor(eb.Ctx, p.Schema(), p.ExplainID()),288 tbl: p.Table,289 }290 // SelectionPlan and SimplePointGetPlan can not be not nil at the same time.291 if p.SelectionPlan != nil {292 selExec, err := eb.build(p.SelectionPlan)293 if err != nil {294 return nil, err295 }296 delExec.children = append(delExec.children, selExec)297 }298 if p.SimplePointGetPlan != nil {299 spExec, err := eb.build(p.SimplePointGetPlan)300 if err != nil {301 return nil, err302 }303 delExec.children = append(delExec.children, spExec)304 }305 return delExec, nil306}307func (eb *ExecutorBuilder) buildInsert(p *planner.Insert) (Executor, error) {308 return &InsertExec{309 baseExecutor: newBaseExecutor(eb.Ctx, p.Schema(), p.ExplainID()),310 tbl: p.Table,311 valueLists: p.Lists,312 insertCols: p.Columns,313 defaultCF: p.DefaultCF,314 }, nil315}316func (eb *ExecutorBuilder) buildDDL(p *planner.DDL) (Executor, error) {317 return &DDLExec{318 baseExecutor: newBaseExecutor(eb.Ctx, p.Schema(), p.ExplainID()),319 stmt: p.Statement,320 is: eb.is,321 }, nil322}323func (eb *ExecutorBuilder) buildSimple(p *planner.Simple) (Executor, error) {324 return &SimpleExec{325 baseExecutor: newBaseExecutor(eb.Ctx, p.Schema(), p.ExplainID()),326 Statement: p.Statement,327 is: eb.is,328 }, nil329}330func (eb *ExecutorBuilder) buildTableDual(p *planner.LogicalTableDual) (Executor, error) {331 return &TableDualExec{332 baseExecutor: newBaseExecutor(eb.Ctx, p.Schema(), p.ExplainID()),333 numDualRows: p.RowCount,334 }, nil335}336func (eb *ExecutorBuilder) buildProjection(p *planner.LogicalProjection) (Executor, error) {337 childExec, err := eb.build(p.Children()[0])338 if err != nil {339 return nil, err340 }341 return &ProjExec{342 baseExecutor: newBaseExecutor(eb.Ctx, p.Schema(), p.ExplainID(), childExec),343 Exprs: p.Exprs,344 colIndexMap: p.ColIndexMap,345 }, nil346}347func (eb *ExecutorBuilder) buildTableScan(p *planner.DataSource) (Executor, error) {348 if p.SimpleScanPlan != nil {349 return eb.buildSimpleScanPlan(p.SimpleScanPlan)350 }351 return &TableScanExec{352 baseExecutor: newBaseExecutor(eb.Ctx, p.Schema(), p.ExplainID()),353 columns: p.Columns,354 tbl: p.Table(),355 IdxVals: p.IdxVals,356 Index: p.Index,357 PkVals: p.PkVals,358 Limit: p.Limit,359 }, nil360}361func (eb *ExecutorBuilder) buildSelection(p *planner.LogicalSelection) (Executor, error) {362 childExec, err := eb.build(p.Children()[0])363 if err != nil {364 return nil, err365 }366 return &SelectionExec{367 baseExecutor: newBaseExecutor(eb.Ctx, p.Schema(), p.ExplainID(), childExec),368 filters: p.Conditions,369 }, nil370}371func (eb *ExecutorBuilder) buildShow(p *planner.LogicalShow) (Executor, error) {372 return &ShowExec{373 Tp: p.Tp,374 baseExecutor: newBaseExecutor(eb.Ctx, p.Schema(), p.ExplainID()),375 Plan: p,376 is: eb.is,377 }, nil378}379func (eb *ExecutorBuilder) buildFaker(p planner.Plan) (Executor, error) {380 id := stringutil.MemoizeStr(func() string {381 return "faker"382 })383 return &FakerExec{384 baseExecutor: newBaseExecutor(eb.Ctx, nil, id),385 }, nil386}...
executor.go
Source:executor.go
1package executor2import (3 "context"4 "strings"5 "sync"6 "zpd/pkg/dal"7 error_zpd "zpd/pkg/error"8 "zpd/pkg/global"9 "zpd/pkg/util"10 "github.com/bwmarrin/snowflake"11 "github.com/xwb1989/sqlparser"12)13//Executor executor interface14type Executor interface {15 Next() (interface{}, error)16}17// BaseExecutor base executor18type BaseExecutor struct {19 globalVar *global.GlobalVar20 schema *util.Schema21 childrenExec []Executor22 dal dal.DataAccessLayer23 generateID *snowflake.Node24 ctx context.Context25}26// BuilderExecutor base executor27type BuilderExecutor struct {28 dal dal.DataAccessLayer29 globalVar *global.GlobalVar30 Schema *util.Schema31 generateID *snowflake.Node32 mux sync.RWMutex33}34// NewBuilderExecutor new NewBuilderExecutor35func NewBuilderExecutor(dal dal.DataAccessLayer, schema *util.Schema, mux sync.RWMutex, globalVar *global.GlobalVar, ID uint64) (*BuilderExecutor, error) {36 node, err := snowflake.NewNode(int64(ID))37 if err != nil {38 return nil, err39 }40 return &BuilderExecutor{41 dal: dal,42 globalVar: globalVar,43 Schema: schema,44 generateID: node,45 mux: mux,46 }, nil47}48func (builder *BuilderExecutor) newBaseExecutor(ctx context.Context, childrenExec ...Executor) *BaseExecutor {49 return &BaseExecutor{50 globalVar: builder.globalVar,51 schema: builder.Schema,52 childrenExec: childrenExec,53 dal: builder.dal,54 generateID: builder.generateID,55 ctx: ctx,56 }57}58// Build build executor59func (builder *BuilderExecutor) Build(ctx context.Context, stmt sqlparser.SQLNode) (Executor, error) {60 switch stmt.(type) {61 case *sqlparser.DBDDL:62 return builder.buildDBDDLExec(ctx, stmt.(*sqlparser.DBDDL))63 case *sqlparser.Use:64 return builder.buildUseExec(ctx, stmt.(*sqlparser.Use))65 case *sqlparser.Show:66 if stmt.(*sqlparser.Show).Type == DATABASES {67 return builder.buildShowDatabaseExec(ctx, stmt.(*sqlparser.Show))68 } else {69 return builder.buildShowTableExec(ctx, stmt.(*sqlparser.Show))70 }71 case *sqlparser.TableIdent:72 return nil, nil73 case *sqlparser.DDL:74 return builder.buildDDLExec(ctx, stmt.(*sqlparser.DDL))75 case *sqlparser.Insert:76 return builder.buildInsertRowExec(ctx, stmt.(*sqlparser.Insert))77 case *sqlparser.Select:78 return builder.buildSelectRowExec(ctx, stmt.(*sqlparser.Select))79 case *sqlparser.Delete:80 return builder.buildDeleteExec(ctx, stmt.(*sqlparser.Delete))81 case *sqlparser.Update:82 return nil, nil83 }84 return nil, nil85}86// NewDBDDLExec new DBDDLExec87func (builder *BuilderExecutor) buildDBDDLExec(ctx context.Context, stmt *sqlparser.DBDDL) (*DBDDLExec, error) {88 baseExec := builder.newBaseExecutor(ctx)89 return &DBDDLExec{90 stmt: stmt,91 baseExecutor: baseExec,92 Action: stmt.Action,93 }, nil94}95func (builder *BuilderExecutor) buildUseExec(ctx context.Context, stmt *sqlparser.Use) (*UseExec, error) {96 tableIdentExec, err := builder.buildTableIdentExec(ctx, &stmt.DBName)97 if err != nil {98 return nil, err99 }100 baseExec := builder.newBaseExecutor(ctx, tableIdentExec)101 return &UseExec{102 stmt: stmt,103 baseExecutor: baseExec,104 }, nil105}106func (builder *BuilderExecutor) buildTableIdentExec(ctx context.Context, stmt *sqlparser.TableIdent) (*TableIdentExec, error) {107 baseExec := builder.newBaseExecutor(ctx)108 return &TableIdentExec{109 stmt: stmt,110 baseExecutor: baseExec,111 }, nil112}113func (builder *BuilderExecutor) buildShowDatabaseExec(ctx context.Context, stmt *sqlparser.Show) (*ShowDatabaseExec, error) {114 baseExec := builder.newBaseExecutor(ctx)115 return &ShowDatabaseExec{116 stmt: stmt,117 baseExecutor: baseExec,118 }, nil119}120func (builder *BuilderExecutor) checkSchemaIsExists() error {121 builder.mux.RLock()122 defer builder.mux.RUnlock()123 if builder.Schema == nil {124 return error_zpd.ErrDoNotUseDatabase125 }126 return nil127}128func (builder *BuilderExecutor) buildDDLExec(ctx context.Context, stmt *sqlparser.DDL) (*DDLExec, error) {129 err := builder.checkSchemaIsExists()130 if err != nil {131 return nil, err132 }133 baseExec := builder.newBaseExecutor(ctx)134 return &DDLExec{135 stmt: stmt,136 baseExecutor: baseExec,137 Action: stmt.Action,138 }, nil139}140func (builder *BuilderExecutor) buildShowTableExec(ctx context.Context, stmt *sqlparser.Show) (*ShowTableExec, error) {141 err := builder.checkSchemaIsExists()142 if err != nil {143 return nil, err144 }145 if stmt.ShowTablesOpt.DbName == "" && builder.Schema == nil {146 return nil, error_zpd.ErrDoNotUseDatabase147 }148 baseExec := builder.newBaseExecutor(ctx)149 return &ShowTableExec{150 stmt: stmt,151 baseExecutor: baseExec,152 }, nil153}154func (builder *BuilderExecutor) buildInsertRowExec(ctx context.Context, stmt *sqlparser.Insert) (*InsertExec, error) {155 err := builder.checkSchemaIsExists()156 if err != nil {157 return nil, err158 }159 baseExec := builder.newBaseExecutor(ctx)160 cols, err := builder.buildColumns(stmt.Columns)161 if err != nil {162 return nil, err163 }164 tmp := InsertExec{165 stmt: stmt,166 baseExecutor: baseExec,167 Action: stmt.Action,168 Columns: cols,169 Rows: builder.buildRows(stmt.Rows.(sqlparser.Values)),170 }171 return &tmp, nil172}173func (builder *BuilderExecutor) checkDuplicateColumn(nameCol string, cols []string) bool {174 for _, item := range cols {175 if item == nameCol {176 return true177 }178 }179 return false180}181func (builder *BuilderExecutor) buildColumns(columns []sqlparser.ColIdent) ([]string, error) {182 cols := make([]string, len(columns))183 for i, colName := range columns {184 if builder.checkDuplicateColumn(colName.String(), cols) {185 return nil, error_zpd.ErrDuplicateColumn186 }187 cols[i] = strings.ToLower(colName.String())188 }189 return cols, nil190}191func (builder *BuilderExecutor) buildRows(rowTuples []sqlparser.ValTuple) []*util.Row {192 rowTps := make([]*util.Row, len(rowTuples))193 for i, tuple := range rowTuples {194 rowTmp := &util.Row{}195 items := make([]*util.Item, len(tuple))196 for j, r := range tuple {197 item := &util.Item{}198 switch r.(sqlparser.Expr).(type) {199 case *sqlparser.SQLVal:200 switch r.(*sqlparser.SQLVal).Type {201 case sqlparser.StrVal:202 item.Type = STRVAL203 break204 case sqlparser.IntVal:205 item.Type = INTVAL206 break207 default:208 item.Type = DEFAULT209 break210 }211 item.Val = r.(*sqlparser.SQLVal).Val212 break213 case sqlparser.BoolVal:214 item.Type = BOOLVAL215 if r.(sqlparser.BoolVal) {216 item.Bool = true217 } else {218 item.Bool = false219 }220 break221 default:222 item.Type = DEFAULT223 item.Val = nil224 break225 }226 items[j] = item227 }228 rowTmp.Items = items229 rowTps[i] = rowTmp230 }231 return rowTps232}233func (builder *BuilderExecutor) buildColumnsSelect(selectExprs sqlparser.SelectExprs) []string {234 cols := make([]string, len(selectExprs))235 for i, col := range selectExprs {236 switch col.(type) {237 case *sqlparser.AliasedExpr:238 cols[i] = col.(*sqlparser.AliasedExpr).Expr.(*sqlparser.ColName).Name.String()239 break240 case *sqlparser.StarExpr:241 cols[i] = "*"242 break243 }244 }245 return cols246}247func (builder *BuilderExecutor) buildTableExprs(tableExprs sqlparser.TableExprs) []string {248 tbs := make([]string, len(tableExprs))249 for i, tb := range tableExprs {250 switch tb.(type) {251 case *sqlparser.AliasedTableExpr:252 tbs[i] = tb.(*sqlparser.AliasedTableExpr).Expr.(sqlparser.TableName).Name.String()253 break254 }255 }256 return tbs257}258func (builder *BuilderExecutor) buildComparisonExpr(expr *sqlparser.ComparisonExpr) *ComparisonExpr {259 comExpr := &ComparisonExpr{}260 comExpr.Operator = expr.Operator261 comExpr.Left = &LeftSide{262 Name: strings.ToLower(expr.Left.(*sqlparser.ColName).Name.String()),263 }264 rightSide := &RightSide{}265 switch expr.Right.(*sqlparser.SQLVal).Type {266 case sqlparser.StrVal:267 rightSide.Type = STRVAL268 break269 case sqlparser.IntVal:270 rightSide.Type = INTVAL271 break272 }273 comExpr.Right = rightSide274 comExpr.Right.Val = expr.Right.(*sqlparser.SQLVal).Val275 return comExpr276}277func (builder *BuilderExecutor) buildExprWhere(where *sqlparser.Where) *Where {278 w := &Where{}279 w.Type = where.Type280 switch where.Expr.(type) {281 case *sqlparser.ComparisonExpr:282 w.Expr = builder.buildComparisonExpr(where.Expr.(*sqlparser.ComparisonExpr))283 break284 default:285 return nil286 }287 return w288}289func (builder *BuilderExecutor) buildSelectRowExec(ctx context.Context, stmt *sqlparser.Select) (*SelectExec, error) {290 err := builder.checkSchemaIsExists()291 if err != nil {292 return nil, err293 }294 baseExec := builder.newBaseExecutor(ctx)295 if stmt.Where != nil {296 return &SelectExec{297 stmt: stmt,298 baseExecutor: baseExec,299 cols: builder.buildColumnsSelect(stmt.SelectExprs),300 tbNames: builder.buildTableExprs(stmt.From),301 where: builder.buildExprWhere(stmt.Where),302 }, nil303 }304 return &SelectExec{305 stmt: stmt,306 baseExecutor: baseExec,307 cols: builder.buildColumnsSelect(stmt.SelectExprs),308 tbNames: builder.buildTableExprs(stmt.From),309 where: nil,310 }, nil311}312func (builder *BuilderExecutor) buildDeleteExec(ctx context.Context, stmt *sqlparser.Delete) (*DeleteExec, error) {313 err := builder.checkSchemaIsExists()314 if err != nil {315 return nil, err316 }317 baseExec := builder.newBaseExecutor(ctx)318 if stmt.Where != nil {319 return &DeleteExec{320 stmt: stmt,321 baseExecutor: baseExec,322 tbNames: builder.buildTableExprs(stmt.TableExprs),323 where: builder.buildExprWhere(stmt.Where),324 }, nil325 }326 return &DeleteExec{327 stmt: stmt,328 baseExecutor: baseExec,329 tbNames: builder.buildTableExprs(stmt.TableExprs),330 where: nil,331 }, nil332}...
NewBaseExecutor
Using AI Code Generation
1import (2func main() {3 orm.RegisterDataBase("default", "mysql", "root:root@/test?charset=utf8")4 o := orm.NewOrm()5 num, err := o.Raw("SELECT * FROM user").Values(&maps)6 if err == nil && num > 0 {7 fmt.Println(maps)8 }9}10import (11func main() {12 orm.RegisterDataBase("default", "mysql", "root:root@/test?charset=utf8")13 o := orm.NewOrm()14 num, err := o.Raw("SELECT * FROM user").Values(&maps)15 if err == nil && num > 0 {16 fmt.Println(maps)17 }18}
NewBaseExecutor
Using AI Code Generation
1import (2type Executor interface {3 Execute()4}5type BaseExecutor struct {6}7func NewBaseExecutor(name string) BaseExecutor {8 return BaseExecutor{name}9}10func (e BaseExecutor) Execute() {11 fmt.Println("Executing base executor")12}13type CustomExecutor struct {14}15func NewCustomExecutor(name string) CustomExecutor {16 return CustomExecutor{NewBaseExecutor(name)}17}18func (e CustomExecutor) Execute() {19 fmt.Println("Executing custom executor")20}21func main() {22 e := NewCustomExecutor("custom")23 e.Execute()24}25The NewBaseExecutor() method is called when we call the NewCustomExecutor() method. The NewBaseExecutor() method returns a BaseExecutor object. The BaseExecutor object is assigned to the BaseExecutor field of the CustomExecutor struct. This is
NewBaseExecutor
Using AI Code Generation
1func main() {2 var baseExecutor = executor.NewBaseExecutor()3 baseExecutor.Execute()4}5func main() {6 var baseExecutor = executor.NewBaseExecutor()7 baseExecutor.Execute()8}9func main() {10 var baseExecutor = executor.NewBaseExecutor()11 baseExecutor.Execute()12}13func main() {14 var baseExecutor = executor.NewBaseExecutor()15 baseExecutor.Execute()16}17func main() {18 var baseExecutor = executor.NewBaseExecutor()19 baseExecutor.Execute()20}21func main() {22 var baseExecutor = executor.NewBaseExecutor()23 baseExecutor.Execute()24}25func main() {26 var baseExecutor = executor.NewBaseExecutor()27 baseExecutor.Execute()28}29func main() {30 var baseExecutor = executor.NewBaseExecutor()31 baseExecutor.Execute()32}33func main() {34 var baseExecutor = executor.NewBaseExecutor()35 baseExecutor.Execute()36}37func main() {38 var baseExecutor = executor.NewBaseExecutor()39 baseExecutor.Execute()40}41func main() {42 var baseExecutor = executor.NewBaseExecutor()43 baseExecutor.Execute()44}45func main() {46 var baseExecutor = executor.NewBaseExecutor()47 baseExecutor.Execute()48}49func main() {50 var baseExecutor = executor.NewBaseExecutor()51 baseExecutor.Execute()52}
NewBaseExecutor
Using AI Code Generation
1import (2type Executor struct {3}4type BaseExecutor struct {5}6func (e *BaseExecutor) Open() error {7 if e.Opened {8 return fmt.Errorf("base executor %d already opened", e.ID)9 }10}11func NewBaseExecutor(id int) *BaseExecutor {12 return &BaseExecutor{13 }14}15func main() {16 executor := Executor{17 BaseExecutor: *NewBaseExecutor(1),18 }19 fmt.Println(executor.BaseExecutor.Open())20 fmt.Println(reflect.TypeOf(executor))21}
NewBaseExecutor
Using AI Code Generation
1import (2func main() {3 executor.NewBaseExecutor().Execute()4}5import "fmt"6type BaseExecutor struct{}7func (b BaseExecutor) Execute() {8 fmt.Println("base executor")9}10func NewBaseExecutor() BaseExecutor {11 return BaseExecutor{}12}13import (14func main() {15 executor.NewBaseExecutor().Execute()16}17import "fmt"18type BaseExecutor struct{}19func (b BaseExecutor) Execute() {20 fmt.Println("base executor")21}22func NewBaseExecutor() *BaseExecutor {23 return &BaseExecutor{}24}25import (26func main() {27 executor.NewBaseExecutor().Execute()28}29import "fmt"30type BaseExecutor struct{}31func (b BaseExecutor) Execute() {32 fmt.Println("base executor")33}34func NewBaseExecutor() *BaseExecutor {35 return &BaseExecutor{}36}37import (38func main() {39 executor.NewBaseExecutor().Execute()40}41import "fmt"42type BaseExecutor struct{}43func (b *BaseExecutor) Execute() {44 fmt.Println("base executor")45}46func NewBaseExecutor() *BaseExecutor {47 return &BaseExecutor{}48}
NewBaseExecutor
Using AI Code Generation
1import (2func main() {3 executor.NewBaseExecutor()4 fmt.Println("Welcome to Go Training")5}6import (7func main() {8 executor.NewBaseExecutor()9 fmt.Println("Welcome to Go Training")10}11import (12func main() {13 executor.NewBaseExecutor()14 fmt.Println("Welcome to Go Training")15}16import (17func main() {18 executor.NewBaseExecutor()19 fmt.Println("Welcome to Go Training")20}21import (22func main() {23 executor.NewBaseExecutor()24 fmt.Println("Welcome to Go Training")25}26import (27func main() {28 executor.NewBaseExecutor()29 fmt.Println("Welcome to Go Training")30}31import (32func main() {33 executor.NewBaseExecutor()34 fmt.Println("Welcome to Go Training")35}
NewBaseExecutor
Using AI Code Generation
1import ( "fmt" "github.com/Go-Gang/Go/Go/Executor" )2func main() { executor := executor.NewBaseExecutor() executor.Run() }3import ( "fmt" "github.com/Go-Gang/Go/Go/Executor" )4func main() { executor := executor.NewBaseExecutor() executor.Run() }5import ( "fmt" "github.com/Go-Gang/Go/Go/Executor" )6func main() { executor := executor.NewBaseExecutor() executor.Run() }7import ( "fmt" "github.com/Go-Gang/Go/Go/Executor" )8func main() { executor := executor.NewBaseExecutor() executor.Run() }9import ( "fmt" "github.com/Go-Gang/Go/Go/Executor" )10func main() { executor := executor.NewBaseExecutor() executor.Run() }11import ( "fmt" "github.com/Go-Gang/Go/Go/Executor" )12func main() { executor := executor.NewBaseExecutor() executor.Run() }13import ( "fmt" "github.com/Go-Gang/Go/Go/Executor" )14func main() { executor := executor.NewBaseExecutor() executor.Run() }15import ( "fmt" "github.com/Go-Gang/Go/Go/Executor" )16func main() { executor := executor.NewBaseExecutor() executor.Run() }17import ( "fmt" "github.com/Go-Gang/Go/Go/Executor" )18func main() { executor := executor.NewBaseExecutor() executor.Run() }
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!