Best Testkube code snippet using client.ExecuteTest
table_test.go
Source:table_test.go
1package storageflux_test2import (3 "context"4 "io"5 "io/ioutil"6 "math"7 "math/rand"8 "os"9 "path/filepath"10 "sort"11 "strconv"12 "sync"13 "testing"14 "time"15 arrowmem "github.com/apache/arrow/go/arrow/memory"16 "github.com/google/go-cmp/cmp"17 "github.com/influxdata/flux"18 "github.com/influxdata/flux/execute"19 "github.com/influxdata/flux/execute/executetest"20 "github.com/influxdata/flux/execute/table"21 "github.com/influxdata/flux/execute/table/static"22 "github.com/influxdata/flux/memory"23 "github.com/influxdata/flux/plan"24 "github.com/influxdata/flux/values"25 "github.com/influxdata/influxdb/v2/inmem"26 "github.com/influxdata/influxdb/v2/internal/shard"27 "github.com/influxdata/influxdb/v2/kit/platform"28 "github.com/influxdata/influxdb/v2/mock"29 "github.com/influxdata/influxdb/v2/models"30 datagen "github.com/influxdata/influxdb/v2/pkg/data/gen"31 "github.com/influxdata/influxdb/v2/query"32 "github.com/influxdata/influxdb/v2/storage"33 storageflux "github.com/influxdata/influxdb/v2/storage/flux"34 storageproto "github.com/influxdata/influxdb/v2/storage/reads/datatypes"35 "github.com/influxdata/influxdb/v2/tsdb"36 "github.com/influxdata/influxdb/v2/tsdb/engine/tsm1"37 "github.com/influxdata/influxdb/v2/v1/services/meta"38 storagev1 "github.com/influxdata/influxdb/v2/v1/services/storage"39)40type SetupFunc func(org, bucket platform.ID) (datagen.SeriesGenerator, datagen.TimeRange)41type StorageReader struct {42 Org platform.ID43 Bucket platform.ID44 Bounds execute.Bounds45 Close func()46 query.StorageReader47}48func NewStorageReader(tb testing.TB, setupFn SetupFunc) *StorageReader {49 rootDir, err := ioutil.TempDir("", "storage-flux-test")50 if err != nil {51 tb.Fatal(err)52 }53 var closers []io.Closer54 close := func() {55 for _, c := range closers {56 if err := c.Close(); err != nil {57 tb.Errorf("close error: %s", err)58 }59 }60 _ = os.RemoveAll(rootDir)61 }62 // Create an underlying kv store. We use the inmem version to speed63 // up test runs.64 kvStore := inmem.NewKVStore()65 // Manually create the meta bucket.66 // This seems to be the only bucket used for the read path.67 // If, in the future, there are any "bucket not found" errors due to68 // a change in the storage code, then this section of code will need69 // to be changed to correctly configure the kv store.70 // We do this abbreviated setup instead of a full migration because71 // the full migration is both unnecessary and long.72 if err := kvStore.CreateBucket(context.Background(), meta.BucketName); err != nil {73 close()74 tb.Fatalf("failed to create meta bucket: %s", err)75 }76 // Use this kv store for the meta client. The storage reader77 // uses the meta client for shard information.78 metaClient := meta.NewClient(meta.NewConfig(), kvStore)79 if err := metaClient.Open(); err != nil {80 close()81 tb.Fatalf("failed to open meta client: %s", err)82 }83 closers = append(closers, metaClient)84 // Create the organization and the bucket.85 idgen := mock.NewMockIDGenerator()86 org, bucket := idgen.ID(), idgen.ID()87 // Run the setup function to create the series generator.88 sg, tr := setupFn(org, bucket)89 // Construct a database with a retention policy.90 // This would normally be done by the storage bucket service, but the storage91 // bucket service requires us to already have a storage engine and a fully migrated92 // kv store. Since we don't have either of those, we add the metadata necessary93 // for the storage reader to function.94 // We construct the database with a retention policy that is a year long95 // so that we do not have to generate more than one shard which can get complicated.96 rp := &meta.RetentionPolicySpec{97 Name: meta.DefaultRetentionPolicyName,98 ShardGroupDuration: 24 * 7 * time.Hour * 52,99 }100 if _, err := metaClient.CreateDatabaseWithRetentionPolicy(bucket.String(), rp); err != nil {101 close()102 tb.Fatalf("failed to create database: %s", err)103 }104 // Create the shard group for the data. There should only be one and105 // it should include the entire time range.106 sgi, err := metaClient.CreateShardGroup(bucket.String(), rp.Name, tr.Start)107 if err != nil {108 close()109 tb.Fatalf("failed to create shard group: %s", err)110 } else if sgi.StartTime.After(tr.Start) || sgi.EndTime.Before(tr.End) {111 close()112 tb.Fatal("shard data range exceeded the shard group range; please use a range for data that is within the same year")113 }114 // Open the series file and prepare the directory for the shard writer.115 enginePath := filepath.Join(rootDir, "engine")116 dbPath := filepath.Join(enginePath, "data", bucket.String())117 if err := os.MkdirAll(dbPath, 0700); err != nil {118 close()119 tb.Fatalf("failed to create data directory: %s", err)120 }121 sfile := tsdb.NewSeriesFile(filepath.Join(dbPath, tsdb.SeriesFileDirectory))122 if err := sfile.Open(); err != nil {123 close()124 tb.Fatalf("failed to open series file: %s", err)125 }126 // Ensure the series file is closed in case of failure.127 defer sfile.Close()128 // Disable compactions to speed up the shard writer.129 sfile.DisableCompactions()130 // Write the shard data.131 shardPath := filepath.Join(dbPath, rp.Name)132 if err := os.MkdirAll(filepath.Join(shardPath, strconv.FormatUint(sgi.Shards[0].ID, 10)), 0700); err != nil {133 close()134 tb.Fatalf("failed to create shard directory: %s", err)135 }136 if err := writeShard(sfile, sg, sgi.Shards[0].ID, shardPath); err != nil {137 close()138 tb.Fatalf("failed to write shard: %s", err)139 }140 // Run the partition compactor on the series file.141 for i, p := range sfile.Partitions() {142 c := tsdb.NewSeriesPartitionCompactor()143 if err := c.Compact(p); err != nil {144 close()145 tb.Fatalf("failed to compact series file %d: %s", i, err)146 }147 }148 // Close the series file as it will be opened by the storage engine.149 if err := sfile.Close(); err != nil {150 close()151 tb.Fatalf("failed to close series file: %s", err)152 }153 // Now load the engine.154 engine := storage.NewEngine(155 enginePath,156 storage.NewConfig(),157 storage.WithMetaClient(metaClient),158 )159 if err := engine.Open(context.Background()); err != nil {160 close()161 tb.Fatalf("failed to open storage engine: %s", err)162 }163 closers = append(closers, engine)164 store := storagev1.NewStore(engine.TSDBStore(), engine.MetaClient())165 reader := storageflux.NewReader(store)166 return &StorageReader{167 Org: org,168 Bucket: bucket,169 Bounds: execute.Bounds{170 Start: values.ConvertTime(tr.Start),171 Stop: values.ConvertTime(tr.End),172 },173 Close: close,174 StorageReader: reader,175 }176}177func (r *StorageReader) ReadWindowAggregate(ctx context.Context, spec query.ReadWindowAggregateSpec, alloc *memory.Allocator) (query.TableIterator, error) {178 return r.StorageReader.ReadWindowAggregate(ctx, spec, alloc)179}180func TestStorageReader_ReadFilter(t *testing.T) {181 reader := NewStorageReader(t, func(org, bucket platform.ID) (datagen.SeriesGenerator, datagen.TimeRange) {182 spec := Spec(org, bucket,183 MeasurementSpec("m0",184 FloatArrayValuesSequence("f0", 10*time.Second, []float64{1.0, 2.0, 3.0}),185 TagValuesSequence("t0", "a-%s", 0, 3),186 ),187 )188 tr := TimeRange("2019-11-25T00:00:00Z", "2019-11-25T00:00:30Z")189 return datagen.NewSeriesGeneratorFromSpec(spec, tr), tr190 })191 defer reader.Close()192 mem := arrowmem.NewCheckedAllocator(arrowmem.DefaultAllocator)193 defer mem.AssertSize(t, 0)194 alloc := &memory.Allocator{195 Allocator: mem,196 }197 ti, err := reader.ReadFilter(context.Background(), query.ReadFilterSpec{198 OrganizationID: reader.Org,199 BucketID: reader.Bucket,200 Bounds: reader.Bounds,201 }, alloc)202 if err != nil {203 t.Fatal(err)204 }205 makeTable := func(t0 string) *executetest.Table {206 start, stop := reader.Bounds.Start, reader.Bounds.Stop207 return &executetest.Table{208 KeyCols: []string{"_start", "_stop", "_field", "_measurement", "t0"},209 ColMeta: []flux.ColMeta{210 {Label: "_start", Type: flux.TTime},211 {Label: "_stop", Type: flux.TTime},212 {Label: "_time", Type: flux.TTime},213 {Label: "_value", Type: flux.TFloat},214 {Label: "_field", Type: flux.TString},215 {Label: "_measurement", Type: flux.TString},216 {Label: "t0", Type: flux.TString},217 },218 Data: [][]interface{}{219 {start, stop, Time("2019-11-25T00:00:00Z"), 1.0, "f0", "m0", t0},220 {start, stop, Time("2019-11-25T00:00:10Z"), 2.0, "f0", "m0", t0},221 {start, stop, Time("2019-11-25T00:00:20Z"), 3.0, "f0", "m0", t0},222 },223 }224 }225 want := []*executetest.Table{226 makeTable("a-0"),227 makeTable("a-1"),228 makeTable("a-2"),229 }230 executetest.NormalizeTables(want)231 sort.Sort(executetest.SortedTables(want))232 var got []*executetest.Table233 if err := ti.Do(func(table flux.Table) error {234 t, err := executetest.ConvertTable(table)235 if err != nil {236 return err237 }238 got = append(got, t)239 return nil240 }); err != nil {241 t.Fatal(err)242 }243 executetest.NormalizeTables(got)244 sort.Sort(executetest.SortedTables(got))245 // compare these two246 if diff := cmp.Diff(want, got); diff != "" {247 t.Errorf("unexpected results -want/+got:\n%s", diff)248 }249}250func TestStorageReader_Table(t *testing.T) {251 reader := NewStorageReader(t, func(org, bucket platform.ID) (datagen.SeriesGenerator, datagen.TimeRange) {252 spec := Spec(org, bucket,253 MeasurementSpec("m0",254 FloatArrayValuesSequence("f0", 10*time.Second, []float64{1.0, 2.0, 3.0}),255 TagValuesSequence("t0", "a-%s", 0, 3),256 ),257 )258 tr := TimeRange("2019-11-25T00:00:00Z", "2019-11-25T00:00:30Z")259 return datagen.NewSeriesGeneratorFromSpec(spec, tr), tr260 })261 defer reader.Close()262 for _, tc := range []struct {263 name string264 newFn func(ctx context.Context, alloc *memory.Allocator) flux.TableIterator265 }{266 {267 name: "ReadFilter",268 newFn: func(ctx context.Context, alloc *memory.Allocator) flux.TableIterator {269 ti, err := reader.ReadFilter(context.Background(), query.ReadFilterSpec{270 OrganizationID: reader.Org,271 BucketID: reader.Bucket,272 Bounds: reader.Bounds,273 }, alloc)274 if err != nil {275 t.Fatal(err)276 }277 return ti278 },279 },280 } {281 t.Run(tc.name, func(t *testing.T) {282 executetest.RunTableTests(t, executetest.TableTest{283 NewFn: tc.newFn,284 IsDone: func(table flux.Table) bool {285 return table.(interface {286 IsDone() bool287 }).IsDone()288 },289 })290 })291 }292}293func TestStorageReader_ReadWindowAggregate(t *testing.T) {294 reader := NewStorageReader(t, func(org, bucket platform.ID) (datagen.SeriesGenerator, datagen.TimeRange) {295 spec := Spec(org, bucket,296 MeasurementSpec("m0",297 FloatArrayValuesSequence("f0", 10*time.Second, []float64{1.0, 2.0, 3.0, 4.0}),298 TagValuesSequence("t0", "a-%s", 0, 3),299 ),300 )301 tr := TimeRange("2019-11-25T00:00:00Z", "2019-11-25T00:02:00Z")302 return datagen.NewSeriesGeneratorFromSpec(spec, tr), tr303 })304 defer reader.Close()305 for _, tt := range []struct {306 aggregate plan.ProcedureKind307 want flux.TableIterator308 }{309 {310 aggregate: storageflux.CountKind,311 want: static.TableGroup{312 static.StringKey("_measurement", "m0"),313 static.StringKey("_field", "f0"),314 static.TableMatrix{315 static.StringKeys("t0", "a-0", "a-1", "a-2"),316 {317 static.Table{318 static.TimeKey("_start", "2019-11-25T00:00:00Z"),319 static.TimeKey("_stop", "2019-11-25T00:00:30Z"),320 static.Ints("_value", 3),321 },322 static.Table{323 static.TimeKey("_start", "2019-11-25T00:00:30Z"),324 static.TimeKey("_stop", "2019-11-25T00:01:00Z"),325 static.Ints("_value", 3),326 },327 static.Table{328 static.TimeKey("_start", "2019-11-25T00:01:00Z"),329 static.TimeKey("_stop", "2019-11-25T00:01:30Z"),330 static.Ints("_value", 3),331 },332 static.Table{333 static.TimeKey("_start", "2019-11-25T00:01:30Z"),334 static.TimeKey("_stop", "2019-11-25T00:02:00Z"),335 static.Ints("_value", 3),336 },337 },338 },339 },340 },341 {342 aggregate: storageflux.MinKind,343 want: static.TableGroup{344 static.StringKey("_measurement", "m0"),345 static.StringKey("_field", "f0"),346 static.TableMatrix{347 static.StringKeys("t0", "a-0", "a-1", "a-2"),348 {349 static.Table{350 static.TimeKey("_start", "2019-11-25T00:00:00Z"),351 static.TimeKey("_stop", "2019-11-25T00:00:30Z"),352 static.Times("_time", "2019-11-25T00:00:00Z"),353 static.Floats("_value", 1),354 },355 static.Table{356 static.TimeKey("_start", "2019-11-25T00:00:30Z"),357 static.TimeKey("_stop", "2019-11-25T00:01:00Z"),358 static.Times("_time", "2019-11-25T00:00:40Z"),359 static.Floats("_value", 1),360 },361 static.Table{362 static.TimeKey("_start", "2019-11-25T00:01:00Z"),363 static.TimeKey("_stop", "2019-11-25T00:01:30Z"),364 static.Times("_time", "2019-11-25T00:01:20Z"),365 static.Floats("_value", 1),366 },367 static.Table{368 static.TimeKey("_start", "2019-11-25T00:01:30Z"),369 static.TimeKey("_stop", "2019-11-25T00:02:00Z"),370 static.Times("_time", "2019-11-25T00:01:30Z"),371 static.Floats("_value", 2),372 },373 },374 },375 },376 },377 {378 aggregate: storageflux.MaxKind,379 want: static.TableGroup{380 static.StringKey("_measurement", "m0"),381 static.StringKey("_field", "f0"),382 static.TableMatrix{383 static.StringKeys("t0", "a-0", "a-1", "a-2"),384 {385 static.Table{386 static.TimeKey("_start", "2019-11-25T00:00:00Z"),387 static.TimeKey("_stop", "2019-11-25T00:00:30Z"),388 static.Times("_time", "2019-11-25T00:00:20Z"),389 static.Floats("_value", 3),390 },391 static.Table{392 static.TimeKey("_start", "2019-11-25T00:00:30Z"),393 static.TimeKey("_stop", "2019-11-25T00:01:00Z"),394 static.Times("_time", "2019-11-25T00:00:30Z"),395 static.Floats("_value", 4),396 },397 static.Table{398 static.TimeKey("_start", "2019-11-25T00:01:00Z"),399 static.TimeKey("_stop", "2019-11-25T00:01:30Z"),400 static.Times("_time", "2019-11-25T00:01:10Z"),401 static.Floats("_value", 4),402 },403 static.Table{404 static.TimeKey("_start", "2019-11-25T00:01:30Z"),405 static.TimeKey("_stop", "2019-11-25T00:02:00Z"),406 static.Times("_time", "2019-11-25T00:01:50Z"),407 static.Floats("_value", 4),408 },409 },410 },411 },412 },413 } {414 t.Run(string(tt.aggregate), func(t *testing.T) {415 mem := arrowmem.NewCheckedAllocator(arrowmem.DefaultAllocator)416 defer mem.AssertSize(t, 0)417 alloc := &memory.Allocator{418 Allocator: mem,419 }420 got, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{421 ReadFilterSpec: query.ReadFilterSpec{422 OrganizationID: reader.Org,423 BucketID: reader.Bucket,424 Bounds: reader.Bounds,425 },426 Window: execute.Window{427 Every: flux.ConvertDuration(30 * time.Second),428 Period: flux.ConvertDuration(30 * time.Second),429 },430 Aggregates: []plan.ProcedureKind{431 tt.aggregate,432 },433 }, alloc)434 if err != nil {435 t.Fatal(err)436 }437 if diff := table.Diff(tt.want, got); diff != "" {438 t.Fatalf("unexpected output -want/+got:\n%s", diff)439 }440 })441 }442}443func TestStorageReader_ReadWindowAggregate_ByStopTime(t *testing.T) {444 reader := NewStorageReader(t, func(org, bucket platform.ID) (datagen.SeriesGenerator, datagen.TimeRange) {445 spec := Spec(org, bucket,446 MeasurementSpec("m0",447 FloatArrayValuesSequence("f0", 10*time.Second, []float64{1.0, 2.0, 3.0, 4.0}),448 TagValuesSequence("t0", "a-%s", 0, 3),449 ),450 )451 tr := TimeRange("2019-11-25T00:00:00Z", "2019-11-25T00:02:00Z")452 return datagen.NewSeriesGeneratorFromSpec(spec, tr), tr453 })454 defer reader.Close()455 for _, tt := range []struct {456 aggregate plan.ProcedureKind457 want flux.TableIterator458 }{459 {460 aggregate: storageflux.CountKind,461 want: static.TableGroup{462 static.StringKey("_measurement", "m0"),463 static.StringKey("_field", "f0"),464 static.TimeKey("_start", "2019-11-25T00:00:00Z"),465 static.TimeKey("_stop", "2019-11-25T00:02:00Z"),466 static.TableMatrix{467 static.StringKeys("t0", "a-0", "a-1", "a-2"),468 {469 static.Table{470 static.Times("_time", "2019-11-25T00:00:30Z", 30, 60, 90),471 static.Ints("_value", 3, 3, 3, 3),472 },473 },474 },475 },476 },477 {478 aggregate: storageflux.MinKind,479 want: static.TableGroup{480 static.StringKey("_measurement", "m0"),481 static.StringKey("_field", "f0"),482 static.TimeKey("_start", "2019-11-25T00:00:00Z"),483 static.TimeKey("_stop", "2019-11-25T00:02:00Z"),484 static.TableMatrix{485 static.StringKeys("t0", "a-0", "a-1", "a-2"),486 {487 static.Table{488 static.Times("_time", "2019-11-25T00:00:30Z", 30, 60, 90),489 static.Floats("_value", 1, 1, 1, 2),490 },491 },492 },493 },494 },495 {496 aggregate: storageflux.MaxKind,497 want: static.TableGroup{498 static.StringKey("_measurement", "m0"),499 static.StringKey("_field", "f0"),500 static.TimeKey("_start", "2019-11-25T00:00:00Z"),501 static.TimeKey("_stop", "2019-11-25T00:02:00Z"),502 static.TableMatrix{503 static.StringKeys("t0", "a-0", "a-1", "a-2"),504 {505 static.Table{506 static.Times("_time", "2019-11-25T00:00:30Z", 30, 60, 90),507 static.Floats("_value", 3, 4, 4, 4),508 },509 },510 },511 },512 },513 } {514 mem := &memory.Allocator{}515 got, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{516 ReadFilterSpec: query.ReadFilterSpec{517 OrganizationID: reader.Org,518 BucketID: reader.Bucket,519 Bounds: reader.Bounds,520 },521 TimeColumn: execute.DefaultStopColLabel,522 Window: execute.Window{523 Every: flux.ConvertDuration(30 * time.Second),524 Period: flux.ConvertDuration(30 * time.Second),525 },526 Aggregates: []plan.ProcedureKind{527 tt.aggregate,528 },529 }, mem)530 if err != nil {531 t.Fatal(err)532 }533 if diff := table.Diff(tt.want, got); diff != "" {534 t.Errorf("unexpected results -want/+got:\n%s", diff)535 }536 }537}538func TestStorageReader_ReadWindowAggregate_ByStartTime(t *testing.T) {539 reader := NewStorageReader(t, func(org, bucket platform.ID) (datagen.SeriesGenerator, datagen.TimeRange) {540 spec := Spec(org, bucket,541 MeasurementSpec("m0",542 FloatArrayValuesSequence("f0", 10*time.Second, []float64{1.0, 2.0, 3.0, 4.0}),543 TagValuesSequence("t0", "a-%s", 0, 3),544 ),545 )546 tr := TimeRange("2019-11-25T00:00:00Z", "2019-11-25T00:02:00Z")547 return datagen.NewSeriesGeneratorFromSpec(spec, tr), tr548 })549 defer reader.Close()550 for _, tt := range []struct {551 aggregate plan.ProcedureKind552 want flux.TableIterator553 }{554 {555 aggregate: storageflux.CountKind,556 want: static.TableGroup{557 static.StringKey("_measurement", "m0"),558 static.StringKey("_field", "f0"),559 static.TimeKey("_start", "2019-11-25T00:00:00Z"),560 static.TimeKey("_stop", "2019-11-25T00:02:00Z"),561 static.TableMatrix{562 static.StringKeys("t0", "a-0", "a-1", "a-2"),563 {564 static.Table{565 static.Times("_time", "2019-11-25T00:00:00Z", 30, 60, 90),566 static.Ints("_value", 3, 3, 3, 3),567 },568 },569 },570 },571 },572 {573 aggregate: storageflux.MinKind,574 want: static.TableGroup{575 static.StringKey("_measurement", "m0"),576 static.StringKey("_field", "f0"),577 static.TimeKey("_start", "2019-11-25T00:00:00Z"),578 static.TimeKey("_stop", "2019-11-25T00:02:00Z"),579 static.TableMatrix{580 static.StringKeys("t0", "a-0", "a-1", "a-2"),581 {582 static.Table{583 static.Times("_time", "2019-11-25T00:00:00Z", 30, 60, 90),584 static.Floats("_value", 1, 1, 1, 2),585 },586 },587 },588 },589 },590 {591 aggregate: storageflux.MaxKind,592 want: static.TableGroup{593 static.StringKey("_measurement", "m0"),594 static.StringKey("_field", "f0"),595 static.TimeKey("_start", "2019-11-25T00:00:00Z"),596 static.TimeKey("_stop", "2019-11-25T00:02:00Z"),597 static.TableMatrix{598 static.StringKeys("t0", "a-0", "a-1", "a-2"),599 {600 static.Table{601 static.Times("_time", "2019-11-25T00:00:00Z", 30, 60, 90),602 static.Floats("_value", 3, 4, 4, 4),603 },604 },605 },606 },607 },608 } {609 t.Run(string(tt.aggregate), func(t *testing.T) {610 mem := &memory.Allocator{}611 got, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{612 ReadFilterSpec: query.ReadFilterSpec{613 OrganizationID: reader.Org,614 BucketID: reader.Bucket,615 Bounds: reader.Bounds,616 },617 TimeColumn: execute.DefaultStartColLabel,618 Window: execute.Window{619 Every: flux.ConvertDuration(30 * time.Second),620 Period: flux.ConvertDuration(30 * time.Second),621 },622 Aggregates: []plan.ProcedureKind{623 tt.aggregate,624 },625 }, mem)626 if err != nil {627 t.Fatal(err)628 }629 if diff := table.Diff(tt.want, got); diff != "" {630 t.Fatalf("unexpected output -want/+got:\n%s", diff)631 }632 })633 }634}635func TestStorageReader_ReadWindowAggregate_CreateEmpty(t *testing.T) {636 reader := NewStorageReader(t, func(org, bucket platform.ID) (datagen.SeriesGenerator, datagen.TimeRange) {637 spec := Spec(org, bucket,638 MeasurementSpec("m0",639 FloatArrayValuesSequence("f0", 15*time.Second, []float64{1.0, 2.0, 3.0, 4.0}),640 TagValuesSequence("t0", "a-%s", 0, 3),641 ),642 )643 tr := TimeRange("2019-11-25T00:00:00Z", "2019-11-25T00:01:00Z")644 return datagen.NewSeriesGeneratorFromSpec(spec, tr), tr645 })646 defer reader.Close()647 for _, tt := range []struct {648 aggregate plan.ProcedureKind649 want flux.TableIterator650 }{651 {652 aggregate: storageflux.CountKind,653 want: static.TableGroup{654 static.StringKey("_measurement", "m0"),655 static.StringKey("_field", "f0"),656 static.TableMatrix{657 static.StringKeys("t0", "a-0", "a-1", "a-2"),658 {659 static.Table{660 static.TimeKey("_start", "2019-11-25T00:00:00Z"),661 static.TimeKey("_stop", "2019-11-25T00:00:10Z"),662 static.Ints("_value", 1),663 },664 static.Table{665 static.TimeKey("_start", "2019-11-25T00:00:10Z"),666 static.TimeKey("_stop", "2019-11-25T00:00:20Z"),667 static.Ints("_value", 1),668 },669 static.Table{670 static.TimeKey("_start", "2019-11-25T00:00:20Z"),671 static.TimeKey("_stop", "2019-11-25T00:00:30Z"),672 static.Ints("_value", 0),673 },674 static.Table{675 static.TimeKey("_start", "2019-11-25T00:00:30Z"),676 static.TimeKey("_stop", "2019-11-25T00:00:40Z"),677 static.Ints("_value", 1),678 },679 static.Table{680 static.TimeKey("_start", "2019-11-25T00:00:40Z"),681 static.TimeKey("_stop", "2019-11-25T00:00:50Z"),682 static.Ints("_value", 1),683 },684 static.Table{685 static.TimeKey("_start", "2019-11-25T00:00:50Z"),686 static.TimeKey("_stop", "2019-11-25T00:01:00Z"),687 static.Ints("_value", 0),688 },689 },690 },691 },692 },693 {694 aggregate: storageflux.MinKind,695 want: static.TableGroup{696 static.StringKey("_measurement", "m0"),697 static.StringKey("_field", "f0"),698 static.TableMatrix{699 static.StringKeys("t0", "a-0", "a-1", "a-2"),700 {701 static.Table{702 static.TimeKey("_start", "2019-11-25T00:00:00Z"),703 static.TimeKey("_stop", "2019-11-25T00:00:10Z"),704 static.Times("_time", "2019-11-25T00:00:00Z"),705 static.Floats("_value", 1),706 },707 static.Table{708 static.TimeKey("_start", "2019-11-25T00:00:10Z"),709 static.TimeKey("_stop", "2019-11-25T00:00:20Z"),710 static.Times("_time", "2019-11-25T00:00:15Z"),711 static.Floats("_value", 2),712 },713 static.Table{714 static.TimeKey("_start", "2019-11-25T00:00:20Z"),715 static.TimeKey("_stop", "2019-11-25T00:00:30Z"),716 static.Times("_time"),717 static.Floats("_value"),718 },719 static.Table{720 static.TimeKey("_start", "2019-11-25T00:00:30Z"),721 static.TimeKey("_stop", "2019-11-25T00:00:40Z"),722 static.Times("_time", "2019-11-25T00:00:30Z"),723 static.Floats("_value", 3),724 },725 static.Table{726 static.TimeKey("_start", "2019-11-25T00:00:40Z"),727 static.TimeKey("_stop", "2019-11-25T00:00:50Z"),728 static.Times("_time", "2019-11-25T00:00:45Z"),729 static.Floats("_value", 4),730 },731 static.Table{732 static.TimeKey("_start", "2019-11-25T00:00:50Z"),733 static.TimeKey("_stop", "2019-11-25T00:01:00Z"),734 static.Times("_time"),735 static.Floats("_value"),736 },737 },738 },739 },740 },741 {742 aggregate: storageflux.MaxKind,743 want: static.TableGroup{744 static.StringKey("_measurement", "m0"),745 static.StringKey("_field", "f0"),746 static.TableMatrix{747 static.StringKeys("t0", "a-0", "a-1", "a-2"),748 {749 static.Table{750 static.TimeKey("_start", "2019-11-25T00:00:00Z"),751 static.TimeKey("_stop", "2019-11-25T00:00:10Z"),752 static.Times("_time", "2019-11-25T00:00:00Z"),753 static.Floats("_value", 1),754 },755 static.Table{756 static.TimeKey("_start", "2019-11-25T00:00:10Z"),757 static.TimeKey("_stop", "2019-11-25T00:00:20Z"),758 static.Times("_time", "2019-11-25T00:00:15Z"),759 static.Floats("_value", 2),760 },761 static.Table{762 static.TimeKey("_start", "2019-11-25T00:00:20Z"),763 static.TimeKey("_stop", "2019-11-25T00:00:30Z"),764 static.Times("_time"),765 static.Floats("_value"),766 },767 static.Table{768 static.TimeKey("_start", "2019-11-25T00:00:30Z"),769 static.TimeKey("_stop", "2019-11-25T00:00:40Z"),770 static.Times("_time", "2019-11-25T00:00:30Z"),771 static.Floats("_value", 3),772 },773 static.Table{774 static.TimeKey("_start", "2019-11-25T00:00:40Z"),775 static.TimeKey("_stop", "2019-11-25T00:00:50Z"),776 static.Times("_time", "2019-11-25T00:00:45Z"),777 static.Floats("_value", 4),778 },779 static.Table{780 static.TimeKey("_start", "2019-11-25T00:00:50Z"),781 static.TimeKey("_stop", "2019-11-25T00:01:00Z"),782 static.Times("_time"),783 static.Floats("_value"),784 },785 },786 },787 },788 },789 } {790 t.Run(string(tt.aggregate), func(t *testing.T) {791 mem := &memory.Allocator{}792 got, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{793 ReadFilterSpec: query.ReadFilterSpec{794 OrganizationID: reader.Org,795 BucketID: reader.Bucket,796 Bounds: reader.Bounds,797 },798 Window: execute.Window{799 Every: flux.ConvertDuration(10 * time.Second),800 Period: flux.ConvertDuration(10 * time.Second),801 },802 Aggregates: []plan.ProcedureKind{803 tt.aggregate,804 },805 CreateEmpty: true,806 }, mem)807 if err != nil {808 t.Fatal(err)809 }810 if diff := table.Diff(tt.want, got); diff != "" {811 t.Fatalf("unexpected output -want/+got:\n%s", diff)812 }813 })814 }815}816func TestStorageReader_ReadWindowAggregate_CreateEmptyByStopTime(t *testing.T) {817 reader := NewStorageReader(t, func(org, bucket platform.ID) (datagen.SeriesGenerator, datagen.TimeRange) {818 spec := Spec(org, bucket,819 MeasurementSpec("m0",820 FloatArrayValuesSequence("f0", 15*time.Second, []float64{1.0, 2.0, 3.0, 4.0}),821 TagValuesSequence("t0", "a-%s", 0, 3),822 ),823 )824 tr := TimeRange("2019-11-25T00:00:00Z", "2019-11-25T00:01:00Z")825 return datagen.NewSeriesGeneratorFromSpec(spec, tr), tr826 })827 defer reader.Close()828 for _, tt := range []struct {829 aggregate plan.ProcedureKind830 want flux.TableIterator831 }{832 {833 aggregate: storageflux.CountKind,834 want: static.TableGroup{835 static.StringKey("_measurement", "m0"),836 static.StringKey("_field", "f0"),837 static.TimeKey("_start", "2019-11-25T00:00:00Z"),838 static.TimeKey("_stop", "2019-11-25T00:01:00Z"),839 static.TableMatrix{840 static.StringKeys("t0", "a-0", "a-1", "a-2"),841 {842 static.Table{843 static.Times("_time", "2019-11-25T00:00:10Z", 10, 20, 30, 40, 50),844 static.Ints("_value", 1, 1, 0, 1, 1, 0),845 },846 },847 },848 },849 },850 {851 aggregate: storageflux.MinKind,852 want: static.TableGroup{853 static.StringKey("_measurement", "m0"),854 static.StringKey("_field", "f0"),855 static.TimeKey("_start", "2019-11-25T00:00:00Z"),856 static.TimeKey("_stop", "2019-11-25T00:01:00Z"),857 static.TableMatrix{858 static.StringKeys("t0", "a-0", "a-1", "a-2"),859 {860 static.Table{861 static.Times("_time", "2019-11-25T00:00:10Z", 10, 30, 40),862 static.Floats("_value", 1, 2, 3, 4),863 },864 },865 },866 },867 },868 {869 aggregate: storageflux.MaxKind,870 want: static.TableGroup{871 static.StringKey("_measurement", "m0"),872 static.StringKey("_field", "f0"),873 static.TimeKey("_start", "2019-11-25T00:00:00Z"),874 static.TimeKey("_stop", "2019-11-25T00:01:00Z"),875 static.TableMatrix{876 static.StringKeys("t0", "a-0", "a-1", "a-2"),877 {878 static.Table{879 static.Times("_time", "2019-11-25T00:00:10Z", 10, 30, 40),880 static.Floats("_value", 1, 2, 3, 4),881 },882 },883 },884 },885 },886 } {887 t.Run(string(tt.aggregate), func(t *testing.T) {888 mem := &memory.Allocator{}889 got, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{890 ReadFilterSpec: query.ReadFilterSpec{891 OrganizationID: reader.Org,892 BucketID: reader.Bucket,893 Bounds: reader.Bounds,894 },895 TimeColumn: execute.DefaultStopColLabel,896 Window: execute.Window{897 Every: flux.ConvertDuration(10 * time.Second),898 Period: flux.ConvertDuration(10 * time.Second),899 },900 Aggregates: []plan.ProcedureKind{901 tt.aggregate,902 },903 CreateEmpty: true,904 }, mem)905 if err != nil {906 t.Fatal(err)907 }908 if diff := table.Diff(tt.want, got); diff != "" {909 t.Errorf("unexpected results -want/+got:\n%s", diff)910 }911 })912 }913}914func TestStorageReader_ReadWindowAggregate_CreateEmptyByStartTime(t *testing.T) {915 reader := NewStorageReader(t, func(org, bucket platform.ID) (datagen.SeriesGenerator, datagen.TimeRange) {916 spec := Spec(org, bucket,917 MeasurementSpec("m0",918 FloatArrayValuesSequence("f0", 15*time.Second, []float64{1.0, 2.0, 3.0, 4.0}),919 TagValuesSequence("t0", "a-%s", 0, 3),920 ),921 )922 tr := TimeRange("2019-11-25T00:00:00Z", "2019-11-25T00:01:00Z")923 return datagen.NewSeriesGeneratorFromSpec(spec, tr), tr924 })925 defer reader.Close()926 for _, tt := range []struct {927 aggregate plan.ProcedureKind928 want flux.TableIterator929 }{930 {931 aggregate: storageflux.CountKind,932 want: static.TableGroup{933 static.StringKey("_measurement", "m0"),934 static.StringKey("_field", "f0"),935 static.TimeKey("_start", "2019-11-25T00:00:00Z"),936 static.TimeKey("_stop", "2019-11-25T00:01:00Z"),937 static.TableMatrix{938 static.StringKeys("t0", "a-0", "a-1", "a-2"),939 {940 static.Table{941 static.Times("_time", "2019-11-25T00:00:00Z", 10, 20, 30, 40, 50),942 static.Ints("_value", 1, 1, 0, 1, 1, 0),943 },944 },945 },946 },947 },948 {949 aggregate: storageflux.MinKind,950 want: static.TableGroup{951 static.StringKey("_measurement", "m0"),952 static.StringKey("_field", "f0"),953 static.TimeKey("_start", "2019-11-25T00:00:00Z"),954 static.TimeKey("_stop", "2019-11-25T00:01:00Z"),955 static.TableMatrix{956 static.StringKeys("t0", "a-0", "a-1", "a-2"),957 {958 static.Table{959 static.Times("_time", "2019-11-25T00:00:00Z", 10, 30, 40),960 static.Floats("_value", 1, 2, 3, 4),961 },962 },963 },964 },965 },966 {967 aggregate: storageflux.MaxKind,968 want: static.TableGroup{969 static.StringKey("_measurement", "m0"),970 static.StringKey("_field", "f0"),971 static.TimeKey("_start", "2019-11-25T00:00:00Z"),972 static.TimeKey("_stop", "2019-11-25T00:01:00Z"),973 static.TableMatrix{974 static.StringKeys("t0", "a-0", "a-1", "a-2"),975 {976 static.Table{977 static.Times("_time", "2019-11-25T00:00:00Z", 10, 30, 40),978 static.Floats("_value", 1, 2, 3, 4),979 },980 },981 },982 },983 },984 } {985 t.Run(string(tt.aggregate), func(t *testing.T) {986 mem := &memory.Allocator{}987 got, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{988 ReadFilterSpec: query.ReadFilterSpec{989 OrganizationID: reader.Org,990 BucketID: reader.Bucket,991 Bounds: reader.Bounds,992 },993 TimeColumn: execute.DefaultStartColLabel,994 Window: execute.Window{995 Every: flux.ConvertDuration(10 * time.Second),996 Period: flux.ConvertDuration(10 * time.Second),997 },998 Aggregates: []plan.ProcedureKind{999 tt.aggregate,1000 },1001 CreateEmpty: true,1002 }, mem)1003 if err != nil {1004 t.Fatal(err)1005 }1006 if diff := table.Diff(tt.want, got); diff != "" {1007 t.Errorf("unexpected results -want/+got:\n%s", diff)1008 }1009 })1010 }1011}1012func TestStorageReader_ReadWindowAggregate_TruncatedBounds(t *testing.T) {1013 reader := NewStorageReader(t, func(org, bucket platform.ID) (datagen.SeriesGenerator, datagen.TimeRange) {1014 spec := Spec(org, bucket,1015 MeasurementSpec("m0",1016 FloatArrayValuesSequence("f0", 5*time.Second, []float64{1.0, 2.0, 3.0, 4.0}),1017 TagValuesSequence("t0", "a-%s", 0, 3),1018 ),1019 )1020 tr := TimeRange("2019-11-25T00:00:00Z", "2019-11-25T00:01:00Z")1021 return datagen.NewSeriesGeneratorFromSpec(spec, tr), tr1022 })1023 defer reader.Close()1024 for _, tt := range []struct {1025 aggregate plan.ProcedureKind1026 want flux.TableIterator1027 }{1028 {1029 aggregate: storageflux.CountKind,1030 want: static.TableGroup{1031 static.StringKey("_measurement", "m0"),1032 static.StringKey("_field", "f0"),1033 static.TableMatrix{1034 static.StringKeys("t0", "a-0", "a-1", "a-2"),1035 {1036 static.Table{1037 static.TimeKey("_start", "2019-11-25T00:00:05Z"),1038 static.TimeKey("_stop", "2019-11-25T00:00:10Z"),1039 static.Ints("_value", 1),1040 },1041 static.Table{1042 static.TimeKey("_start", "2019-11-25T00:00:10Z"),1043 static.TimeKey("_stop", "2019-11-25T00:00:20Z"),1044 static.Ints("_value", 2),1045 },1046 static.Table{1047 static.TimeKey("_start", "2019-11-25T00:00:20Z"),1048 static.TimeKey("_stop", "2019-11-25T00:00:25Z"),1049 static.Ints("_value", 1),1050 },1051 },1052 },1053 },1054 },1055 {1056 aggregate: storageflux.MinKind,1057 want: static.TableGroup{1058 static.StringKey("_measurement", "m0"),1059 static.StringKey("_field", "f0"),1060 static.TableMatrix{1061 static.StringKeys("t0", "a-0", "a-1", "a-2"),1062 {1063 static.Table{1064 static.TimeKey("_start", "2019-11-25T00:00:05Z"),1065 static.TimeKey("_stop", "2019-11-25T00:00:10Z"),1066 static.Times("_time", "2019-11-25T00:00:05Z"),1067 static.Floats("_value", 2),1068 },1069 static.Table{1070 static.TimeKey("_start", "2019-11-25T00:00:10Z"),1071 static.TimeKey("_stop", "2019-11-25T00:00:20Z"),1072 static.Times("_time", "2019-11-25T00:00:10Z"),1073 static.Floats("_value", 3),1074 },1075 static.Table{1076 static.TimeKey("_start", "2019-11-25T00:00:20Z"),1077 static.TimeKey("_stop", "2019-11-25T00:00:25Z"),1078 static.Times("_time", "2019-11-25T00:00:20Z"),1079 static.Floats("_value", 1),1080 },1081 },1082 },1083 },1084 },1085 {1086 aggregate: storageflux.MaxKind,1087 want: static.TableGroup{1088 static.StringKey("_measurement", "m0"),1089 static.StringKey("_field", "f0"),1090 static.TableMatrix{1091 static.StringKeys("t0", "a-0", "a-1", "a-2"),1092 {1093 static.Table{1094 static.TimeKey("_start", "2019-11-25T00:00:05Z"),1095 static.TimeKey("_stop", "2019-11-25T00:00:10Z"),1096 static.Times("_time", "2019-11-25T00:00:05Z"),1097 static.Floats("_value", 2),1098 },1099 static.Table{1100 static.TimeKey("_start", "2019-11-25T00:00:10Z"),1101 static.TimeKey("_stop", "2019-11-25T00:00:20Z"),1102 static.Times("_time", "2019-11-25T00:00:15Z"),1103 static.Floats("_value", 4),1104 },1105 static.Table{1106 static.TimeKey("_start", "2019-11-25T00:00:20Z"),1107 static.TimeKey("_stop", "2019-11-25T00:00:25Z"),1108 static.Times("_time", "2019-11-25T00:00:20Z"),1109 static.Floats("_value", 1),1110 },1111 },1112 },1113 },1114 },1115 } {1116 t.Run(string(tt.aggregate), func(t *testing.T) {1117 mem := &memory.Allocator{}1118 got, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{1119 ReadFilterSpec: query.ReadFilterSpec{1120 OrganizationID: reader.Org,1121 BucketID: reader.Bucket,1122 Bounds: execute.Bounds{1123 Start: values.ConvertTime(mustParseTime("2019-11-25T00:00:05Z")),1124 Stop: values.ConvertTime(mustParseTime("2019-11-25T00:00:25Z")),1125 },1126 },1127 Window: execute.Window{1128 Every: flux.ConvertDuration(10 * time.Second),1129 Period: flux.ConvertDuration(10 * time.Second),1130 },1131 Aggregates: []plan.ProcedureKind{1132 tt.aggregate,1133 },1134 }, mem)1135 if err != nil {1136 t.Fatal(err)1137 }1138 if diff := table.Diff(tt.want, got); diff != "" {1139 t.Errorf("unexpected results -want/+got:\n%s", diff)1140 }1141 })1142 }1143}1144func TestStorageReader_ReadWindowAggregate_TruncatedBoundsCreateEmpty(t *testing.T) {1145 reader := NewStorageReader(t, func(org, bucket platform.ID) (datagen.SeriesGenerator, datagen.TimeRange) {1146 spec := Spec(org, bucket,1147 MeasurementSpec("m0",1148 FloatArrayValuesSequence("f0", 15*time.Second, []float64{1.0, 2.0, 3.0, 4.0}),1149 TagValuesSequence("t0", "a-%s", 0, 3),1150 ),1151 )1152 tr := TimeRange("2019-11-25T00:00:00Z", "2019-11-25T00:01:00Z")1153 return datagen.NewSeriesGeneratorFromSpec(spec, tr), tr1154 })1155 defer reader.Close()1156 for _, tt := range []struct {1157 aggregate plan.ProcedureKind1158 want flux.TableIterator1159 }{1160 {1161 aggregate: storageflux.CountKind,1162 want: static.TableGroup{1163 static.StringKey("_measurement", "m0"),1164 static.StringKey("_field", "f0"),1165 static.TableMatrix{1166 static.StringKeys("t0", "a-0", "a-1", "a-2"),1167 {1168 static.Table{1169 static.TimeKey("_start", "2019-11-25T00:00:05Z"),1170 static.TimeKey("_stop", "2019-11-25T00:00:10Z"),1171 static.Ints("_value", 0),1172 },1173 static.Table{1174 static.TimeKey("_start", "2019-11-25T00:00:10Z"),1175 static.TimeKey("_stop", "2019-11-25T00:00:20Z"),1176 static.Ints("_value", 1),1177 },1178 static.Table{1179 static.TimeKey("_start", "2019-11-25T00:00:20Z"),1180 static.TimeKey("_stop", "2019-11-25T00:00:25Z"),1181 static.Ints("_value", 0),1182 },1183 },1184 },1185 },1186 },1187 {1188 aggregate: storageflux.MinKind,1189 want: static.TableGroup{1190 static.StringKey("_measurement", "m0"),1191 static.StringKey("_field", "f0"),1192 static.TableMatrix{1193 static.StringKeys("t0", "a-0", "a-1", "a-2"),1194 {1195 static.Table{1196 static.TimeKey("_start", "2019-11-25T00:00:05Z"),1197 static.TimeKey("_stop", "2019-11-25T00:00:10Z"),1198 static.Times("_time"),1199 static.Floats("_value"),1200 },1201 static.Table{1202 static.TimeKey("_start", "2019-11-25T00:00:10Z"),1203 static.TimeKey("_stop", "2019-11-25T00:00:20Z"),1204 static.Times("_time", "2019-11-25T00:00:15Z"),1205 static.Floats("_value", 2),1206 },1207 static.Table{1208 static.TimeKey("_start", "2019-11-25T00:00:20Z"),1209 static.TimeKey("_stop", "2019-11-25T00:00:25Z"),1210 static.Times("_time"),1211 static.Floats("_value"),1212 },1213 },1214 },1215 },1216 },1217 {1218 aggregate: storageflux.MaxKind,1219 want: static.TableGroup{1220 static.StringKey("_measurement", "m0"),1221 static.StringKey("_field", "f0"),1222 static.TableMatrix{1223 static.StringKeys("t0", "a-0", "a-1", "a-2"),1224 {1225 static.Table{1226 static.TimeKey("_start", "2019-11-25T00:00:05Z"),1227 static.TimeKey("_stop", "2019-11-25T00:00:10Z"),1228 static.Times("_time"),1229 static.Floats("_value"),1230 },1231 static.Table{1232 static.TimeKey("_start", "2019-11-25T00:00:10Z"),1233 static.TimeKey("_stop", "2019-11-25T00:00:20Z"),1234 static.Times("_time", "2019-11-25T00:00:15Z"),1235 static.Floats("_value", 2),1236 },1237 static.Table{1238 static.TimeKey("_start", "2019-11-25T00:00:20Z"),1239 static.TimeKey("_stop", "2019-11-25T00:00:25Z"),1240 static.Times("_time"),1241 static.Floats("_value"),1242 },1243 },1244 },1245 },1246 },1247 } {1248 t.Run(string(tt.aggregate), func(t *testing.T) {1249 mem := &memory.Allocator{}1250 got, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{1251 ReadFilterSpec: query.ReadFilterSpec{1252 OrganizationID: reader.Org,1253 BucketID: reader.Bucket,1254 Bounds: execute.Bounds{1255 Start: values.ConvertTime(mustParseTime("2019-11-25T00:00:05Z")),1256 Stop: values.ConvertTime(mustParseTime("2019-11-25T00:00:25Z")),1257 },1258 },1259 Window: execute.Window{1260 Every: flux.ConvertDuration(10 * time.Second),1261 Period: flux.ConvertDuration(10 * time.Second),1262 },1263 Aggregates: []plan.ProcedureKind{1264 tt.aggregate,1265 },1266 CreateEmpty: true,1267 }, mem)1268 if err != nil {1269 t.Fatal(err)1270 }1271 if diff := table.Diff(tt.want, got); diff != "" {1272 t.Errorf("unexpected results -want/+got:\n%s", diff)1273 }1274 })1275 }1276}1277func TestStorageReader_ReadWindowAggregate_Mean(t *testing.T) {1278 reader := NewStorageReader(t, func(org, bucket platform.ID) (datagen.SeriesGenerator, datagen.TimeRange) {1279 tagsSpec := &datagen.TagsSpec{1280 Tags: []*datagen.TagValuesSpec{1281 {1282 TagKey: "t0",1283 Values: func() datagen.CountableSequence {1284 return datagen.NewCounterByteSequence("a%s", 0, 1)1285 },1286 },1287 },1288 }1289 spec := datagen.Spec{1290 Measurements: []datagen.MeasurementSpec{1291 {1292 Name: "m0",1293 TagsSpec: tagsSpec,1294 FieldValuesSpec: &datagen.FieldValuesSpec{1295 Name: "f0",1296 TimeSequenceSpec: datagen.TimeSequenceSpec{1297 Count: math.MaxInt32,1298 Delta: 5 * time.Second,1299 },1300 DataType: models.Integer,1301 Values: func(spec datagen.TimeSequenceSpec) datagen.TimeValuesSequence {1302 return datagen.NewTimeIntegerValuesSequence(1303 spec.Count,1304 datagen.NewTimestampSequenceFromSpec(spec),1305 datagen.NewIntegerArrayValuesSequence([]int64{1, 2, 3, 4}),1306 )1307 },1308 },1309 },1310 },1311 }1312 tr := datagen.TimeRange{1313 Start: mustParseTime("2019-11-25T00:00:00Z"),1314 End: mustParseTime("2019-11-25T00:01:00Z"),1315 }1316 return datagen.NewSeriesGeneratorFromSpec(&spec, tr), tr1317 })1318 defer reader.Close()1319 t.Run("unwindowed mean", func(t *testing.T) {1320 mem := &memory.Allocator{}1321 ti, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{1322 ReadFilterSpec: query.ReadFilterSpec{1323 OrganizationID: reader.Org,1324 BucketID: reader.Bucket,1325 Bounds: reader.Bounds,1326 },1327 Window: execute.Window{1328 Every: flux.ConvertDuration(math.MaxInt64 * time.Nanosecond),1329 Period: flux.ConvertDuration(math.MaxInt64 * time.Nanosecond),1330 },1331 Aggregates: []plan.ProcedureKind{1332 storageflux.MeanKind,1333 },1334 }, mem)1335 if err != nil {1336 t.Fatal(err)1337 }1338 want := static.Table{1339 static.StringKey("_measurement", "m0"),1340 static.StringKey("_field", "f0"),1341 static.StringKey("t0", "a0"),1342 static.TimeKey("_start", "2019-11-25T00:00:00Z"),1343 static.TimeKey("_stop", "2019-11-25T00:01:00Z"),1344 static.Floats("_value", 2.5),1345 }1346 if diff := table.Diff(want, ti); diff != "" {1347 t.Fatalf("table iterators do not match; -want/+got:\n%s", diff)1348 }1349 })1350 t.Run("windowed mean", func(t *testing.T) {1351 mem := &memory.Allocator{}1352 ti, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{1353 ReadFilterSpec: query.ReadFilterSpec{1354 OrganizationID: reader.Org,1355 BucketID: reader.Bucket,1356 Bounds: reader.Bounds,1357 },1358 Window: execute.Window{1359 Every: flux.ConvertDuration(10 * time.Second),1360 Period: flux.ConvertDuration(10 * time.Second),1361 },1362 Aggregates: []plan.ProcedureKind{1363 storageflux.MeanKind,1364 },1365 }, mem)1366 if err != nil {1367 t.Fatal(err)1368 }1369 want := static.TableGroup{1370 static.StringKey("_measurement", "m0"),1371 static.StringKey("_field", "f0"),1372 static.StringKey("t0", "a0"),1373 static.Table{1374 static.TimeKey("_start", "2019-11-25T00:00:00Z"),1375 static.TimeKey("_stop", "2019-11-25T00:00:10Z"),1376 static.Floats("_value", 1.5),1377 },1378 static.Table{1379 static.TimeKey("_start", "2019-11-25T00:00:10Z"),1380 static.TimeKey("_stop", "2019-11-25T00:00:20Z"),1381 static.Floats("_value", 3.5),1382 },1383 static.Table{1384 static.TimeKey("_start", "2019-11-25T00:00:20Z"),1385 static.TimeKey("_stop", "2019-11-25T00:00:30Z"),1386 static.Floats("_value", 1.5),1387 },1388 static.Table{1389 static.TimeKey("_start", "2019-11-25T00:00:30Z"),1390 static.TimeKey("_stop", "2019-11-25T00:00:40Z"),1391 static.Floats("_value", 3.5),1392 },1393 static.Table{1394 static.TimeKey("_start", "2019-11-25T00:00:40Z"),1395 static.TimeKey("_stop", "2019-11-25T00:00:50Z"),1396 static.Floats("_value", 1.5),1397 },1398 static.Table{1399 static.TimeKey("_start", "2019-11-25T00:00:50Z"),1400 static.TimeKey("_stop", "2019-11-25T00:01:00Z"),1401 static.Floats("_value", 3.5),1402 },1403 }1404 if diff := table.Diff(want, ti); diff != "" {1405 t.Fatalf("table iterators do not match; -want/+got:\n%s", diff)1406 }1407 })1408 t.Run("windowed mean with offset", func(t *testing.T) {1409 mem := &memory.Allocator{}1410 ti, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{1411 ReadFilterSpec: query.ReadFilterSpec{1412 OrganizationID: reader.Org,1413 BucketID: reader.Bucket,1414 Bounds: reader.Bounds,1415 },1416 Window: execute.Window{1417 Every: flux.ConvertDuration(10 * time.Second),1418 Period: flux.ConvertDuration(10 * time.Second),1419 Offset: flux.ConvertDuration(2 * time.Second),1420 },1421 Aggregates: []plan.ProcedureKind{1422 storageflux.MeanKind,1423 },1424 }, mem)1425 if err != nil {1426 t.Fatal(err)1427 }1428 want := static.TableGroup{1429 static.StringKey("_measurement", "m0"),1430 static.StringKey("_field", "f0"),1431 static.StringKey("t0", "a0"),1432 static.Table{1433 static.TimeKey("_start", "2019-11-25T00:00:00Z"),1434 static.TimeKey("_stop", "2019-11-25T00:00:02Z"),1435 static.Floats("_value", 1.0),1436 },1437 static.Table{1438 static.TimeKey("_start", "2019-11-25T00:00:02Z"),1439 static.TimeKey("_stop", "2019-11-25T00:00:12Z"),1440 static.Floats("_value", 2.5),1441 },1442 static.Table{1443 static.TimeKey("_start", "2019-11-25T00:00:12Z"),1444 static.TimeKey("_stop", "2019-11-25T00:00:22Z"),1445 static.Floats("_value", 2.5),1446 },1447 static.Table{1448 static.TimeKey("_start", "2019-11-25T00:00:22Z"),1449 static.TimeKey("_stop", "2019-11-25T00:00:32Z"),1450 static.Floats("_value", 2.5),1451 },1452 static.Table{1453 static.TimeKey("_start", "2019-11-25T00:00:32Z"),1454 static.TimeKey("_stop", "2019-11-25T00:00:42Z"),1455 static.Floats("_value", 2.5),1456 },1457 static.Table{1458 static.TimeKey("_start", "2019-11-25T00:00:42Z"),1459 static.TimeKey("_stop", "2019-11-25T00:00:52Z"),1460 static.Floats("_value", 2.5),1461 },1462 static.Table{1463 static.TimeKey("_start", "2019-11-25T00:00:52Z"),1464 static.TimeKey("_stop", "2019-11-25T00:01:00Z"),1465 static.Floats("_value", 4),1466 },1467 }1468 if diff := table.Diff(want, ti); diff != "" {1469 t.Fatalf("table iterators do not match; -want/+got:\n%s", diff)1470 }1471 })1472}1473func TestStorageReader_ReadWindowFirst(t *testing.T) {1474 reader := NewStorageReader(t, func(org, bucket platform.ID) (datagen.SeriesGenerator, datagen.TimeRange) {1475 tagsSpec := &datagen.TagsSpec{1476 Tags: []*datagen.TagValuesSpec{1477 {1478 TagKey: "t0",1479 Values: func() datagen.CountableSequence {1480 return datagen.NewCounterByteSequence("a%s", 0, 1)1481 },1482 },1483 },1484 }1485 spec := datagen.Spec{1486 Measurements: []datagen.MeasurementSpec{1487 {1488 Name: "m0",1489 TagsSpec: tagsSpec,1490 FieldValuesSpec: &datagen.FieldValuesSpec{1491 Name: "f0",1492 TimeSequenceSpec: datagen.TimeSequenceSpec{1493 Count: math.MaxInt32,1494 Delta: 5 * time.Second,1495 },1496 DataType: models.Integer,1497 Values: func(spec datagen.TimeSequenceSpec) datagen.TimeValuesSequence {1498 return datagen.NewTimeIntegerValuesSequence(1499 spec.Count,1500 datagen.NewTimestampSequenceFromSpec(spec),1501 datagen.NewIntegerArrayValuesSequence([]int64{1, 2, 3, 4}),1502 )1503 },1504 },1505 },1506 },1507 }1508 tr := datagen.TimeRange{1509 Start: mustParseTime("2019-11-25T00:00:00Z"),1510 End: mustParseTime("2019-11-25T00:01:00Z"),1511 }1512 return datagen.NewSeriesGeneratorFromSpec(&spec, tr), tr1513 })1514 defer reader.Close()1515 mem := &memory.Allocator{}1516 ti, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{1517 ReadFilterSpec: query.ReadFilterSpec{1518 OrganizationID: reader.Org,1519 BucketID: reader.Bucket,1520 Bounds: reader.Bounds,1521 },1522 Window: execute.Window{1523 Every: flux.ConvertDuration(10 * time.Second),1524 Period: flux.ConvertDuration(10 * time.Second),1525 },1526 Aggregates: []plan.ProcedureKind{1527 storageflux.FirstKind,1528 },1529 }, mem)1530 if err != nil {1531 t.Fatal(err)1532 }1533 makeWindowTable := func(start, stop, time values.Time, v int64) *executetest.Table {1534 return &executetest.Table{1535 KeyCols: []string{"_start", "_stop", "_field", "_measurement", "t0"},1536 ColMeta: []flux.ColMeta{1537 {Label: "_start", Type: flux.TTime},1538 {Label: "_stop", Type: flux.TTime},1539 {Label: "_time", Type: flux.TTime},1540 {Label: "_value", Type: flux.TInt},1541 {Label: "_field", Type: flux.TString},1542 {Label: "_measurement", Type: flux.TString},1543 {Label: "t0", Type: flux.TString},1544 },1545 Data: [][]interface{}{1546 {start, stop, time, v, "f0", "m0", "a0"},1547 },1548 }1549 }1550 want := []*executetest.Table{1551 makeWindowTable(Time("2019-11-25T00:00:00Z"), Time("2019-11-25T00:00:10Z"), Time("2019-11-25T00:00:00Z"), 1),1552 makeWindowTable(Time("2019-11-25T00:00:10Z"), Time("2019-11-25T00:00:20Z"), Time("2019-11-25T00:00:10Z"), 3),1553 makeWindowTable(Time("2019-11-25T00:00:20Z"), Time("2019-11-25T00:00:30Z"), Time("2019-11-25T00:00:20Z"), 1),1554 makeWindowTable(Time("2019-11-25T00:00:30Z"), Time("2019-11-25T00:00:40Z"), Time("2019-11-25T00:00:30Z"), 3),1555 makeWindowTable(Time("2019-11-25T00:00:40Z"), Time("2019-11-25T00:00:50Z"), Time("2019-11-25T00:00:40Z"), 1),1556 makeWindowTable(Time("2019-11-25T00:00:50Z"), Time("2019-11-25T00:01:00Z"), Time("2019-11-25T00:00:50Z"), 3),1557 }1558 executetest.NormalizeTables(want)1559 sort.Sort(executetest.SortedTables(want))1560 var got []*executetest.Table1561 if err := ti.Do(func(table flux.Table) error {1562 t, err := executetest.ConvertTable(table)1563 if err != nil {1564 return err1565 }1566 got = append(got, t)1567 return nil1568 }); err != nil {1569 t.Fatal(err)1570 }1571 executetest.NormalizeTables(got)1572 sort.Sort(executetest.SortedTables(got))1573 // compare these two1574 if diff := cmp.Diff(want, got); diff != "" {1575 t.Errorf("unexpected results -want/+got:\n%s", diff)1576 }1577}1578func TestStorageReader_WindowFirstOffset(t *testing.T) {1579 reader := NewStorageReader(t, func(org, bucket platform.ID) (datagen.SeriesGenerator, datagen.TimeRange) {1580 tagsSpec := &datagen.TagsSpec{1581 Tags: []*datagen.TagValuesSpec{1582 {1583 TagKey: "t0",1584 Values: func() datagen.CountableSequence {1585 return datagen.NewCounterByteSequence("a%s", 0, 1)1586 },1587 },1588 },1589 }1590 spec := datagen.Spec{1591 Measurements: []datagen.MeasurementSpec{1592 {1593 Name: "m0",1594 TagsSpec: tagsSpec,1595 FieldValuesSpec: &datagen.FieldValuesSpec{1596 Name: "f0",1597 TimeSequenceSpec: datagen.TimeSequenceSpec{1598 Count: math.MaxInt32,1599 Delta: 5 * time.Second,1600 },1601 DataType: models.Integer,1602 Values: func(spec datagen.TimeSequenceSpec) datagen.TimeValuesSequence {1603 return datagen.NewTimeIntegerValuesSequence(1604 spec.Count,1605 datagen.NewTimestampSequenceFromSpec(spec),1606 datagen.NewIntegerArrayValuesSequence([]int64{1, 2, 3, 4}),1607 )1608 },1609 },1610 },1611 },1612 }1613 tr := datagen.TimeRange{1614 Start: mustParseTime("2019-11-25T00:00:00Z"),1615 End: mustParseTime("2019-11-25T00:01:00Z"),1616 }1617 return datagen.NewSeriesGeneratorFromSpec(&spec, tr), tr1618 })1619 defer reader.Close()1620 mem := &memory.Allocator{}1621 ti, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{1622 ReadFilterSpec: query.ReadFilterSpec{1623 OrganizationID: reader.Org,1624 BucketID: reader.Bucket,1625 Bounds: reader.Bounds,1626 },1627 Window: execute.Window{1628 Every: flux.ConvertDuration(10 * time.Second),1629 Period: flux.ConvertDuration(10 * time.Second),1630 Offset: flux.ConvertDuration(5 * time.Second),1631 },1632 Aggregates: []plan.ProcedureKind{1633 storageflux.FirstKind,1634 },1635 }, mem)1636 if err != nil {1637 t.Fatal(err)1638 }1639 makeWindowTable := func(start, stop, time values.Time, v int64) *executetest.Table {1640 return &executetest.Table{1641 KeyCols: []string{"_start", "_stop", "_field", "_measurement", "t0"},1642 ColMeta: []flux.ColMeta{1643 {Label: "_start", Type: flux.TTime},1644 {Label: "_stop", Type: flux.TTime},1645 {Label: "_time", Type: flux.TTime},1646 {Label: "_value", Type: flux.TInt},1647 {Label: "_field", Type: flux.TString},1648 {Label: "_measurement", Type: flux.TString},1649 {Label: "t0", Type: flux.TString},1650 },1651 Data: [][]interface{}{1652 {start, stop, time, v, "f0", "m0", "a0"},1653 },1654 }1655 }1656 want := []*executetest.Table{1657 makeWindowTable(Time("2019-11-25T00:00:00Z"), Time("2019-11-25T00:00:05Z"), Time("2019-11-25T00:00:00Z"), 1),1658 makeWindowTable(Time("2019-11-25T00:00:05Z"), Time("2019-11-25T00:00:15Z"), Time("2019-11-25T00:00:05Z"), 2),1659 makeWindowTable(Time("2019-11-25T00:00:15Z"), Time("2019-11-25T00:00:25Z"), Time("2019-11-25T00:00:15Z"), 4),1660 makeWindowTable(Time("2019-11-25T00:00:25Z"), Time("2019-11-25T00:00:35Z"), Time("2019-11-25T00:00:25Z"), 2),1661 makeWindowTable(Time("2019-11-25T00:00:35Z"), Time("2019-11-25T00:00:45Z"), Time("2019-11-25T00:00:35Z"), 4),1662 makeWindowTable(Time("2019-11-25T00:00:45Z"), Time("2019-11-25T00:00:55Z"), Time("2019-11-25T00:00:45Z"), 2),1663 makeWindowTable(Time("2019-11-25T00:00:55Z"), Time("2019-11-25T00:01:00Z"), Time("2019-11-25T00:00:55Z"), 4),1664 }1665 executetest.NormalizeTables(want)1666 sort.Sort(executetest.SortedTables(want))1667 var got []*executetest.Table1668 if err := ti.Do(func(table flux.Table) error {1669 t, err := executetest.ConvertTable(table)1670 if err != nil {1671 return err1672 }1673 got = append(got, t)1674 return nil1675 }); err != nil {1676 t.Fatal(err)1677 }1678 executetest.NormalizeTables(got)1679 sort.Sort(executetest.SortedTables(got))1680 // compare these two1681 if diff := cmp.Diff(want, got); diff != "" {1682 t.Errorf("unexpected results -want/+got:\n%s", diff)1683 }1684}1685func TestStorageReader_WindowSumOffset(t *testing.T) {1686 reader := NewStorageReader(t, func(org, bucket platform.ID) (datagen.SeriesGenerator, datagen.TimeRange) {1687 tagsSpec := &datagen.TagsSpec{1688 Tags: []*datagen.TagValuesSpec{1689 {1690 TagKey: "t0",1691 Values: func() datagen.CountableSequence {1692 return datagen.NewCounterByteSequence("a%s", 0, 1)1693 },1694 },1695 },1696 }1697 spec := datagen.Spec{1698 Measurements: []datagen.MeasurementSpec{1699 {1700 Name: "m0",1701 TagsSpec: tagsSpec,1702 FieldValuesSpec: &datagen.FieldValuesSpec{1703 Name: "f0",1704 TimeSequenceSpec: datagen.TimeSequenceSpec{1705 Count: math.MaxInt32,1706 Delta: 5 * time.Second,1707 },1708 DataType: models.Integer,1709 Values: func(spec datagen.TimeSequenceSpec) datagen.TimeValuesSequence {1710 return datagen.NewTimeIntegerValuesSequence(1711 spec.Count,1712 datagen.NewTimestampSequenceFromSpec(spec),1713 datagen.NewIntegerArrayValuesSequence([]int64{1, 2, 3, 4}),1714 )1715 },1716 },1717 },1718 },1719 }1720 tr := datagen.TimeRange{1721 Start: mustParseTime("2019-11-25T00:00:00Z"),1722 End: mustParseTime("2019-11-25T00:01:00Z"),1723 }1724 return datagen.NewSeriesGeneratorFromSpec(&spec, tr), tr1725 })1726 defer reader.Close()1727 mem := &memory.Allocator{}1728 ti, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{1729 ReadFilterSpec: query.ReadFilterSpec{1730 OrganizationID: reader.Org,1731 BucketID: reader.Bucket,1732 Bounds: reader.Bounds,1733 },1734 Window: execute.Window{1735 Every: flux.ConvertDuration(10 * time.Second),1736 Period: flux.ConvertDuration(10 * time.Second),1737 Offset: flux.ConvertDuration(2 * time.Second),1738 },1739 Aggregates: []plan.ProcedureKind{1740 storageflux.SumKind,1741 },1742 }, mem)1743 if err != nil {1744 t.Fatal(err)1745 }1746 makeWindowTable := func(start, stop values.Time, v int64) *executetest.Table {1747 return &executetest.Table{1748 KeyCols: []string{"_start", "_stop", "_field", "_measurement", "t0"},1749 ColMeta: []flux.ColMeta{1750 {Label: "_start", Type: flux.TTime},1751 {Label: "_stop", Type: flux.TTime},1752 {Label: "_value", Type: flux.TInt},1753 {Label: "_field", Type: flux.TString},1754 {Label: "_measurement", Type: flux.TString},1755 {Label: "t0", Type: flux.TString},1756 },1757 Data: [][]interface{}{1758 {start, stop, v, "f0", "m0", "a0"},1759 },1760 }1761 }1762 want := []*executetest.Table{1763 makeWindowTable(Time("2019-11-25T00:00:00Z"), Time("2019-11-25T00:00:02Z"), 1),1764 makeWindowTable(Time("2019-11-25T00:00:02Z"), Time("2019-11-25T00:00:12Z"), 5),1765 makeWindowTable(Time("2019-11-25T00:00:12Z"), Time("2019-11-25T00:00:22Z"), 5),1766 makeWindowTable(Time("2019-11-25T00:00:22Z"), Time("2019-11-25T00:00:32Z"), 5),1767 makeWindowTable(Time("2019-11-25T00:00:32Z"), Time("2019-11-25T00:00:42Z"), 5),1768 makeWindowTable(Time("2019-11-25T00:00:42Z"), Time("2019-11-25T00:00:52Z"), 5),1769 makeWindowTable(Time("2019-11-25T00:00:52Z"), Time("2019-11-25T00:01:00Z"), 4),1770 }1771 executetest.NormalizeTables(want)1772 sort.Sort(executetest.SortedTables(want))1773 var got []*executetest.Table1774 if err := ti.Do(func(table flux.Table) error {1775 t, err := executetest.ConvertTable(table)1776 if err != nil {1777 return err1778 }1779 got = append(got, t)1780 return nil1781 }); err != nil {1782 t.Fatal(err)1783 }1784 executetest.NormalizeTables(got)1785 sort.Sort(executetest.SortedTables(got))1786 // compare these two1787 if diff := cmp.Diff(want, got); diff != "" {1788 t.Errorf("unexpected results -want/+got:\n%s", diff)1789 }1790}1791func TestStorageReader_ReadWindowFirstCreateEmpty(t *testing.T) {1792 reader := NewStorageReader(t, func(org, bucket platform.ID) (datagen.SeriesGenerator, datagen.TimeRange) {1793 tagsSpec := &datagen.TagsSpec{1794 Tags: []*datagen.TagValuesSpec{1795 {1796 TagKey: "t0",1797 Values: func() datagen.CountableSequence {1798 return datagen.NewCounterByteSequence("a%s", 0, 1)1799 },1800 },1801 },1802 }1803 spec := datagen.Spec{1804 Measurements: []datagen.MeasurementSpec{1805 {1806 Name: "m0",1807 TagsSpec: tagsSpec,1808 FieldValuesSpec: &datagen.FieldValuesSpec{1809 Name: "f0",1810 TimeSequenceSpec: datagen.TimeSequenceSpec{1811 Count: math.MaxInt32,1812 Delta: 20 * time.Second,1813 },1814 DataType: models.Integer,1815 Values: func(spec datagen.TimeSequenceSpec) datagen.TimeValuesSequence {1816 return datagen.NewTimeIntegerValuesSequence(1817 spec.Count,1818 datagen.NewTimestampSequenceFromSpec(spec),1819 datagen.NewIntegerArrayValuesSequence([]int64{1, 2}),1820 )1821 },1822 },1823 },1824 },1825 }1826 tr := datagen.TimeRange{1827 Start: mustParseTime("2019-11-25T00:00:00Z"),1828 End: mustParseTime("2019-11-25T00:01:00Z"),1829 }1830 return datagen.NewSeriesGeneratorFromSpec(&spec, tr), tr1831 })1832 defer reader.Close()1833 mem := &memory.Allocator{}1834 ti, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{1835 ReadFilterSpec: query.ReadFilterSpec{1836 OrganizationID: reader.Org,1837 BucketID: reader.Bucket,1838 Bounds: reader.Bounds,1839 },1840 Window: execute.Window{1841 Every: flux.ConvertDuration(10 * time.Second),1842 Period: flux.ConvertDuration(10 * time.Second),1843 },1844 Aggregates: []plan.ProcedureKind{1845 storageflux.FirstKind,1846 },1847 CreateEmpty: true,1848 }, mem)1849 if err != nil {1850 t.Fatal(err)1851 }1852 makeEmptyTable := func(start, stop values.Time) *executetest.Table {1853 return &executetest.Table{1854 KeyCols: []string{"_start", "_stop", "_field", "_measurement", "t0"},1855 KeyValues: []interface{}{start, stop, "f0", "m0", "a0"},1856 ColMeta: []flux.ColMeta{1857 {Label: "_start", Type: flux.TTime},1858 {Label: "_stop", Type: flux.TTime},1859 {Label: "_time", Type: flux.TTime},1860 {Label: "_value", Type: flux.TInt},1861 {Label: "_field", Type: flux.TString},1862 {Label: "_measurement", Type: flux.TString},1863 {Label: "t0", Type: flux.TString},1864 },1865 Data: nil,1866 }1867 }1868 makeWindowTable := func(start, stop, time values.Time, v int64) *executetest.Table {1869 return &executetest.Table{1870 KeyCols: []string{"_start", "_stop", "_field", "_measurement", "t0"},1871 ColMeta: []flux.ColMeta{1872 {Label: "_start", Type: flux.TTime},1873 {Label: "_stop", Type: flux.TTime},1874 {Label: "_time", Type: flux.TTime},1875 {Label: "_value", Type: flux.TInt},1876 {Label: "_field", Type: flux.TString},1877 {Label: "_measurement", Type: flux.TString},1878 {Label: "t0", Type: flux.TString},1879 },1880 Data: [][]interface{}{1881 {start, stop, time, v, "f0", "m0", "a0"},1882 },1883 }1884 }1885 want := []*executetest.Table{1886 makeWindowTable(1887 Time("2019-11-25T00:00:00Z"), Time("2019-11-25T00:00:10Z"), Time("2019-11-25T00:00:00Z"), 1,1888 ),1889 makeEmptyTable(1890 Time("2019-11-25T00:00:10Z"), Time("2019-11-25T00:00:20Z"),1891 ),1892 makeWindowTable(1893 Time("2019-11-25T00:00:20Z"), Time("2019-11-25T00:00:30Z"), Time("2019-11-25T00:00:20Z"), 2,1894 ),1895 makeEmptyTable(1896 Time("2019-11-25T00:00:30Z"), Time("2019-11-25T00:00:40Z"),1897 ),1898 makeWindowTable(1899 Time("2019-11-25T00:00:40Z"), Time("2019-11-25T00:00:50Z"), Time("2019-11-25T00:00:40Z"), 1,1900 ),1901 makeEmptyTable(1902 Time("2019-11-25T00:00:50Z"), Time("2019-11-25T00:01:00Z"),1903 ),1904 }1905 executetest.NormalizeTables(want)1906 sort.Sort(executetest.SortedTables(want))1907 var got []*executetest.Table1908 if err := ti.Do(func(table flux.Table) error {1909 t, err := executetest.ConvertTable(table)1910 if err != nil {1911 return err1912 }1913 got = append(got, t)1914 return nil1915 }); err != nil {1916 t.Fatal(err)1917 }1918 executetest.NormalizeTables(got)1919 sort.Sort(executetest.SortedTables(got))1920 // compare these two1921 if diff := cmp.Diff(want, got); diff != "" {1922 t.Errorf("unexpected results -want/+got:\n%s", diff)1923 }1924}1925func TestStorageReader_WindowFirstOffsetCreateEmpty(t *testing.T) {1926 reader := NewStorageReader(t, func(org, bucket platform.ID) (datagen.SeriesGenerator, datagen.TimeRange) {1927 tagsSpec := &datagen.TagsSpec{1928 Tags: []*datagen.TagValuesSpec{1929 {1930 TagKey: "t0",1931 Values: func() datagen.CountableSequence {1932 return datagen.NewCounterByteSequence("a%s", 0, 1)1933 },1934 },1935 },1936 }1937 spec := datagen.Spec{1938 Measurements: []datagen.MeasurementSpec{1939 {1940 Name: "m0",1941 TagsSpec: tagsSpec,1942 FieldValuesSpec: &datagen.FieldValuesSpec{1943 Name: "f0",1944 TimeSequenceSpec: datagen.TimeSequenceSpec{1945 Count: math.MaxInt32,1946 Delta: 20 * time.Second,1947 },1948 DataType: models.Integer,1949 Values: func(spec datagen.TimeSequenceSpec) datagen.TimeValuesSequence {1950 return datagen.NewTimeIntegerValuesSequence(1951 spec.Count,1952 datagen.NewTimestampSequenceFromSpec(spec),1953 datagen.NewIntegerArrayValuesSequence([]int64{1, 2}),1954 )1955 },1956 },1957 },1958 },1959 }1960 tr := datagen.TimeRange{1961 Start: mustParseTime("2019-11-25T00:00:00Z"),1962 End: mustParseTime("2019-11-25T00:01:00Z"),1963 }1964 return datagen.NewSeriesGeneratorFromSpec(&spec, tr), tr1965 })1966 defer reader.Close()1967 mem := &memory.Allocator{}1968 ti, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{1969 ReadFilterSpec: query.ReadFilterSpec{1970 OrganizationID: reader.Org,1971 BucketID: reader.Bucket,1972 Bounds: reader.Bounds,1973 },1974 Window: execute.Window{1975 Every: flux.ConvertDuration(10 * time.Second),1976 Period: flux.ConvertDuration(10 * time.Second),1977 Offset: flux.ConvertDuration(5 * time.Second),1978 },1979 Aggregates: []plan.ProcedureKind{1980 storageflux.FirstKind,1981 },1982 CreateEmpty: true,1983 }, mem)1984 if err != nil {1985 t.Fatal(err)1986 }1987 makeEmptyTable := func(start, stop values.Time) *executetest.Table {1988 return &executetest.Table{1989 KeyCols: []string{"_start", "_stop", "_field", "_measurement", "t0"},1990 KeyValues: []interface{}{start, stop, "f0", "m0", "a0"},1991 ColMeta: []flux.ColMeta{1992 {Label: "_start", Type: flux.TTime},1993 {Label: "_stop", Type: flux.TTime},1994 {Label: "_time", Type: flux.TTime},1995 {Label: "_value", Type: flux.TInt},1996 {Label: "_field", Type: flux.TString},1997 {Label: "_measurement", Type: flux.TString},1998 {Label: "t0", Type: flux.TString},1999 },2000 Data: nil,2001 }2002 }2003 makeWindowTable := func(start, stop, time values.Time, v int64) *executetest.Table {2004 return &executetest.Table{2005 KeyCols: []string{"_start", "_stop", "_field", "_measurement", "t0"},2006 ColMeta: []flux.ColMeta{2007 {Label: "_start", Type: flux.TTime},2008 {Label: "_stop", Type: flux.TTime},2009 {Label: "_time", Type: flux.TTime},2010 {Label: "_value", Type: flux.TInt},2011 {Label: "_field", Type: flux.TString},2012 {Label: "_measurement", Type: flux.TString},2013 {Label: "t0", Type: flux.TString},2014 },2015 Data: [][]interface{}{2016 {start, stop, time, v, "f0", "m0", "a0"},2017 },2018 }2019 }2020 want := []*executetest.Table{2021 makeWindowTable(2022 Time("2019-11-25T00:00:00Z"), Time("2019-11-25T00:00:05Z"), Time("2019-11-25T00:00:00Z"), 1,2023 ),2024 makeEmptyTable(2025 Time("2019-11-25T00:00:05Z"), Time("2019-11-25T00:00:15Z"),2026 ),2027 makeWindowTable(2028 Time("2019-11-25T00:00:15Z"), Time("2019-11-25T00:00:25Z"), Time("2019-11-25T00:00:20Z"), 2,2029 ),2030 makeEmptyTable(2031 Time("2019-11-25T00:00:25Z"), Time("2019-11-25T00:00:35Z"),2032 ),2033 makeWindowTable(2034 Time("2019-11-25T00:00:35Z"), Time("2019-11-25T00:00:45Z"), Time("2019-11-25T00:00:40Z"), 1,2035 ),2036 makeEmptyTable(2037 Time("2019-11-25T00:00:45Z"), Time("2019-11-25T00:00:55Z"),2038 ),2039 makeEmptyTable(2040 Time("2019-11-25T00:00:55Z"), Time("2019-11-25T00:01:00Z"),2041 ),2042 }2043 executetest.NormalizeTables(want)2044 sort.Sort(executetest.SortedTables(want))2045 var got []*executetest.Table2046 if err := ti.Do(func(table flux.Table) error {2047 t, err := executetest.ConvertTable(table)2048 if err != nil {2049 return err2050 }2051 got = append(got, t)2052 return nil2053 }); err != nil {2054 t.Fatal(err)2055 }2056 executetest.NormalizeTables(got)2057 sort.Sort(executetest.SortedTables(got))2058 // compare these two2059 if diff := cmp.Diff(want, got); diff != "" {2060 t.Errorf("unexpected results -want/+got:\n%s", diff)2061 }2062}2063func TestStorageReader_WindowSumOffsetCreateEmpty(t *testing.T) {2064 reader := NewStorageReader(t, func(org, bucket platform.ID) (datagen.SeriesGenerator, datagen.TimeRange) {2065 tagsSpec := &datagen.TagsSpec{2066 Tags: []*datagen.TagValuesSpec{2067 {2068 TagKey: "t0",2069 Values: func() datagen.CountableSequence {2070 return datagen.NewCounterByteSequence("a%s", 0, 1)2071 },2072 },2073 },2074 }2075 spec := datagen.Spec{2076 Measurements: []datagen.MeasurementSpec{2077 {2078 Name: "m0",2079 TagsSpec: tagsSpec,2080 FieldValuesSpec: &datagen.FieldValuesSpec{2081 Name: "f0",2082 TimeSequenceSpec: datagen.TimeSequenceSpec{2083 Count: math.MaxInt32,2084 Delta: 20 * time.Second,2085 },2086 DataType: models.Integer,2087 Values: func(spec datagen.TimeSequenceSpec) datagen.TimeValuesSequence {2088 return datagen.NewTimeIntegerValuesSequence(2089 spec.Count,2090 datagen.NewTimestampSequenceFromSpec(spec),2091 datagen.NewIntegerArrayValuesSequence([]int64{1, 2}),2092 )2093 },2094 },2095 },2096 },2097 }2098 tr := datagen.TimeRange{2099 Start: mustParseTime("2019-11-25T00:00:00Z"),2100 End: mustParseTime("2019-11-25T00:01:00Z"),2101 }2102 return datagen.NewSeriesGeneratorFromSpec(&spec, tr), tr2103 })2104 defer reader.Close()2105 mem := &memory.Allocator{}2106 ti, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{2107 ReadFilterSpec: query.ReadFilterSpec{2108 OrganizationID: reader.Org,2109 BucketID: reader.Bucket,2110 Bounds: reader.Bounds,2111 },2112 Window: execute.Window{2113 Every: flux.ConvertDuration(10 * time.Second),2114 Period: flux.ConvertDuration(10 * time.Second),2115 Offset: flux.ConvertDuration(5 * time.Second),2116 },2117 Aggregates: []plan.ProcedureKind{2118 storageflux.SumKind,2119 },2120 CreateEmpty: true,2121 }, mem)2122 if err != nil {2123 t.Fatal(err)2124 }2125 makeEmptyTable := func(start, stop values.Time) *executetest.Table {2126 return &executetest.Table{2127 KeyCols: []string{"_start", "_stop", "_field", "_measurement", "t0"},2128 KeyValues: []interface{}{start, stop, "f0", "m0", "a0"},2129 ColMeta: []flux.ColMeta{2130 {Label: "_start", Type: flux.TTime},2131 {Label: "_stop", Type: flux.TTime},2132 {Label: "_value", Type: flux.TInt},2133 {Label: "_field", Type: flux.TString},2134 {Label: "_measurement", Type: flux.TString},2135 {Label: "t0", Type: flux.TString},2136 },2137 Data: [][]interface{}{2138 {start, stop, nil, "f0", "m0", "a0"},2139 },2140 }2141 }2142 makeWindowTable := func(start, stop values.Time, v int64) *executetest.Table {2143 return &executetest.Table{2144 KeyCols: []string{"_start", "_stop", "_field", "_measurement", "t0"},2145 ColMeta: []flux.ColMeta{2146 {Label: "_start", Type: flux.TTime},2147 {Label: "_stop", Type: flux.TTime},2148 {Label: "_value", Type: flux.TInt},2149 {Label: "_field", Type: flux.TString},2150 {Label: "_measurement", Type: flux.TString},2151 {Label: "t0", Type: flux.TString},2152 },2153 Data: [][]interface{}{2154 {start, stop, v, "f0", "m0", "a0"},2155 },2156 }2157 }2158 want := []*executetest.Table{2159 makeWindowTable(2160 Time("2019-11-25T00:00:00Z"), Time("2019-11-25T00:00:05Z"), 1,2161 ),2162 makeEmptyTable(2163 Time("2019-11-25T00:00:05Z"), Time("2019-11-25T00:00:15Z"),2164 ),2165 makeWindowTable(2166 Time("2019-11-25T00:00:15Z"), Time("2019-11-25T00:00:25Z"), 2,2167 ),2168 makeEmptyTable(2169 Time("2019-11-25T00:00:25Z"), Time("2019-11-25T00:00:35Z"),2170 ),2171 makeWindowTable(2172 Time("2019-11-25T00:00:35Z"), Time("2019-11-25T00:00:45Z"), 1,2173 ),2174 makeEmptyTable(2175 Time("2019-11-25T00:00:45Z"), Time("2019-11-25T00:00:55Z"),2176 ),2177 makeEmptyTable(2178 Time("2019-11-25T00:00:55Z"), Time("2019-11-25T00:01:00Z"),2179 ),2180 }2181 executetest.NormalizeTables(want)2182 sort.Sort(executetest.SortedTables(want))2183 var got []*executetest.Table2184 if err := ti.Do(func(table flux.Table) error {2185 t, err := executetest.ConvertTable(table)2186 if err != nil {2187 return err2188 }2189 got = append(got, t)2190 return nil2191 }); err != nil {2192 t.Fatal(err)2193 }2194 executetest.NormalizeTables(got)2195 sort.Sort(executetest.SortedTables(got))2196 // compare these two2197 if diff := cmp.Diff(want, got); diff != "" {2198 t.Errorf("unexpected results -want/+got:\n%s", diff)2199 }2200}2201func TestStorageReader_ReadWindowFirstTimeColumn(t *testing.T) {2202 reader := NewStorageReader(t, func(org, bucket platform.ID) (datagen.SeriesGenerator, datagen.TimeRange) {2203 tagsSpec := &datagen.TagsSpec{2204 Tags: []*datagen.TagValuesSpec{2205 {2206 TagKey: "t0",2207 Values: func() datagen.CountableSequence {2208 return datagen.NewCounterByteSequence("a%s", 0, 1)2209 },2210 },2211 },2212 }2213 spec := datagen.Spec{2214 Measurements: []datagen.MeasurementSpec{2215 {2216 Name: "m0",2217 TagsSpec: tagsSpec,2218 FieldValuesSpec: &datagen.FieldValuesSpec{2219 Name: "f0",2220 TimeSequenceSpec: datagen.TimeSequenceSpec{2221 Count: math.MaxInt32,2222 Delta: 20 * time.Second,2223 },2224 DataType: models.Integer,2225 Values: func(spec datagen.TimeSequenceSpec) datagen.TimeValuesSequence {2226 return datagen.NewTimeIntegerValuesSequence(2227 spec.Count,2228 datagen.NewTimestampSequenceFromSpec(spec),2229 datagen.NewIntegerArrayValuesSequence([]int64{1, 2}),2230 )2231 },2232 },2233 },2234 },2235 }2236 tr := datagen.TimeRange{2237 Start: mustParseTime("2019-11-25T00:00:00Z"),2238 End: mustParseTime("2019-11-25T00:01:00Z"),2239 }2240 return datagen.NewSeriesGeneratorFromSpec(&spec, tr), tr2241 })2242 defer reader.Close()2243 mem := &memory.Allocator{}2244 ti, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{2245 ReadFilterSpec: query.ReadFilterSpec{2246 OrganizationID: reader.Org,2247 BucketID: reader.Bucket,2248 Bounds: reader.Bounds,2249 },2250 Window: execute.Window{2251 Every: flux.ConvertDuration(10 * time.Second),2252 Period: flux.ConvertDuration(10 * time.Second),2253 },2254 Aggregates: []plan.ProcedureKind{2255 storageflux.FirstKind,2256 },2257 CreateEmpty: true,2258 TimeColumn: execute.DefaultStopColLabel,2259 }, mem)2260 if err != nil {2261 t.Fatal(err)2262 }2263 want := []*executetest.Table{{2264 KeyCols: []string{"_start", "_stop", "_field", "_measurement", "t0"},2265 ColMeta: []flux.ColMeta{2266 {Label: "_start", Type: flux.TTime},2267 {Label: "_stop", Type: flux.TTime},2268 {Label: "_time", Type: flux.TTime},2269 {Label: "_value", Type: flux.TInt},2270 {Label: "_field", Type: flux.TString},2271 {Label: "_measurement", Type: flux.TString},2272 {Label: "t0", Type: flux.TString},2273 },2274 Data: [][]interface{}{2275 {Time("2019-11-25T00:00:00Z"), Time("2019-11-25T00:01:00Z"), Time("2019-11-25T00:00:10Z"), int64(1), "f0", "m0", "a0"},2276 {Time("2019-11-25T00:00:00Z"), Time("2019-11-25T00:01:00Z"), Time("2019-11-25T00:00:30Z"), int64(2), "f0", "m0", "a0"},2277 {Time("2019-11-25T00:00:00Z"), Time("2019-11-25T00:01:00Z"), Time("2019-11-25T00:00:50Z"), int64(1), "f0", "m0", "a0"},2278 },2279 }}2280 executetest.NormalizeTables(want)2281 sort.Sort(executetest.SortedTables(want))2282 var got []*executetest.Table2283 if err := ti.Do(func(table flux.Table) error {2284 t, err := executetest.ConvertTable(table)2285 if err != nil {2286 return err2287 }2288 got = append(got, t)2289 return nil2290 }); err != nil {2291 t.Fatal(err)2292 }2293 executetest.NormalizeTables(got)2294 sort.Sort(executetest.SortedTables(got))2295 // compare these two2296 if diff := cmp.Diff(want, got); diff != "" {2297 t.Errorf("unexpected results -want/+got:\n%s", diff)2298 }2299}2300func TestStorageReader_WindowFirstOffsetTimeColumn(t *testing.T) {2301 reader := NewStorageReader(t, func(org, bucket platform.ID) (datagen.SeriesGenerator, datagen.TimeRange) {2302 tagsSpec := &datagen.TagsSpec{2303 Tags: []*datagen.TagValuesSpec{2304 {2305 TagKey: "t0",2306 Values: func() datagen.CountableSequence {2307 return datagen.NewCounterByteSequence("a%s", 0, 1)2308 },2309 },2310 },2311 }2312 spec := datagen.Spec{2313 Measurements: []datagen.MeasurementSpec{2314 {2315 Name: "m0",2316 TagsSpec: tagsSpec,2317 FieldValuesSpec: &datagen.FieldValuesSpec{2318 Name: "f0",2319 TimeSequenceSpec: datagen.TimeSequenceSpec{2320 Count: math.MaxInt32,2321 Delta: 20 * time.Second,2322 },2323 DataType: models.Integer,2324 Values: func(spec datagen.TimeSequenceSpec) datagen.TimeValuesSequence {2325 return datagen.NewTimeIntegerValuesSequence(2326 spec.Count,2327 datagen.NewTimestampSequenceFromSpec(spec),2328 datagen.NewIntegerArrayValuesSequence([]int64{1, 2}),2329 )2330 },2331 },2332 },2333 },2334 }2335 tr := datagen.TimeRange{2336 Start: mustParseTime("2019-11-25T00:00:00Z"),2337 End: mustParseTime("2019-11-25T00:01:00Z"),2338 }2339 return datagen.NewSeriesGeneratorFromSpec(&spec, tr), tr2340 })2341 defer reader.Close()2342 mem := &memory.Allocator{}2343 ti, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{2344 ReadFilterSpec: query.ReadFilterSpec{2345 OrganizationID: reader.Org,2346 BucketID: reader.Bucket,2347 Bounds: reader.Bounds,2348 },2349 Window: execute.Window{2350 Every: flux.ConvertDuration(10 * time.Second),2351 Period: flux.ConvertDuration(10 * time.Second),2352 Offset: flux.ConvertDuration(18 * time.Second),2353 },2354 Aggregates: []plan.ProcedureKind{2355 storageflux.FirstKind,2356 },2357 CreateEmpty: true,2358 TimeColumn: execute.DefaultStopColLabel,2359 }, mem)2360 if err != nil {2361 t.Fatal(err)2362 }2363 want := []*executetest.Table{{2364 KeyCols: []string{"_start", "_stop", "_field", "_measurement", "t0"},2365 ColMeta: []flux.ColMeta{2366 {Label: "_start", Type: flux.TTime},2367 {Label: "_stop", Type: flux.TTime},2368 {Label: "_time", Type: flux.TTime},2369 {Label: "_value", Type: flux.TInt},2370 {Label: "_field", Type: flux.TString},2371 {Label: "_measurement", Type: flux.TString},2372 {Label: "t0", Type: flux.TString},2373 },2374 Data: [][]interface{}{2375 {Time("2019-11-25T00:00:00Z"), Time("2019-11-25T00:01:00Z"), Time("2019-11-25T00:00:08Z"), int64(1), "f0", "m0", "a0"},2376 {Time("2019-11-25T00:00:00Z"), Time("2019-11-25T00:01:00Z"), Time("2019-11-25T00:00:28Z"), int64(2), "f0", "m0", "a0"},2377 {Time("2019-11-25T00:00:00Z"), Time("2019-11-25T00:01:00Z"), Time("2019-11-25T00:00:48Z"), int64(1), "f0", "m0", "a0"},2378 },2379 }}2380 executetest.NormalizeTables(want)2381 sort.Sort(executetest.SortedTables(want))2382 var got []*executetest.Table2383 if err := ti.Do(func(table flux.Table) error {2384 t, err := executetest.ConvertTable(table)2385 if err != nil {2386 return err2387 }2388 got = append(got, t)2389 return nil2390 }); err != nil {2391 t.Fatal(err)2392 }2393 executetest.NormalizeTables(got)2394 sort.Sort(executetest.SortedTables(got))2395 // compare these two2396 if diff := cmp.Diff(want, got); diff != "" {2397 t.Errorf("unexpected results -want/+got:\n%s", diff)2398 }2399}2400func TestStorageReader_WindowSumOffsetTimeColumn(t *testing.T) {2401 reader := NewStorageReader(t, func(org, bucket platform.ID) (datagen.SeriesGenerator, datagen.TimeRange) {2402 tagsSpec := &datagen.TagsSpec{2403 Tags: []*datagen.TagValuesSpec{2404 {2405 TagKey: "t0",2406 Values: func() datagen.CountableSequence {2407 return datagen.NewCounterByteSequence("a%s", 0, 1)2408 },2409 },2410 },2411 }2412 spec := datagen.Spec{2413 Measurements: []datagen.MeasurementSpec{2414 {2415 Name: "m0",2416 TagsSpec: tagsSpec,2417 FieldValuesSpec: &datagen.FieldValuesSpec{2418 Name: "f0",2419 TimeSequenceSpec: datagen.TimeSequenceSpec{2420 Count: math.MaxInt32,2421 Delta: 20 * time.Second,2422 },2423 DataType: models.Integer,2424 Values: func(spec datagen.TimeSequenceSpec) datagen.TimeValuesSequence {2425 return datagen.NewTimeIntegerValuesSequence(2426 spec.Count,2427 datagen.NewTimestampSequenceFromSpec(spec),2428 datagen.NewIntegerArrayValuesSequence([]int64{1, 2}),2429 )2430 },2431 },2432 },2433 },2434 }2435 tr := datagen.TimeRange{2436 Start: mustParseTime("2019-11-25T00:00:00Z"),2437 End: mustParseTime("2019-11-25T00:01:00Z"),2438 }2439 return datagen.NewSeriesGeneratorFromSpec(&spec, tr), tr2440 })2441 defer reader.Close()2442 mem := &memory.Allocator{}2443 ti, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{2444 ReadFilterSpec: query.ReadFilterSpec{2445 OrganizationID: reader.Org,2446 BucketID: reader.Bucket,2447 Bounds: reader.Bounds,2448 },2449 Window: execute.Window{2450 Every: flux.ConvertDuration(10 * time.Second),2451 Period: flux.ConvertDuration(10 * time.Second),2452 Offset: flux.ConvertDuration(18 * time.Second),2453 },2454 Aggregates: []plan.ProcedureKind{2455 storageflux.SumKind,2456 },2457 CreateEmpty: true,2458 TimeColumn: execute.DefaultStopColLabel,2459 }, mem)2460 if err != nil {2461 t.Fatal(err)2462 }2463 want := []*executetest.Table{{2464 KeyCols: []string{"_start", "_stop", "_field", "_measurement", "t0"},2465 ColMeta: []flux.ColMeta{2466 {Label: "_start", Type: flux.TTime},2467 {Label: "_stop", Type: flux.TTime},2468 {Label: "_time", Type: flux.TTime},2469 {Label: "_value", Type: flux.TInt},2470 {Label: "_field", Type: flux.TString},2471 {Label: "_measurement", Type: flux.TString},2472 {Label: "t0", Type: flux.TString},2473 },2474 Data: [][]interface{}{2475 {Time("2019-11-25T00:00:00Z"), Time("2019-11-25T00:01:00Z"), Time("2019-11-25T00:00:08Z"), int64(1), "f0", "m0", "a0"},2476 {Time("2019-11-25T00:00:00Z"), Time("2019-11-25T00:01:00Z"), Time("2019-11-25T00:00:18Z"), nil, "f0", "m0", "a0"},2477 {Time("2019-11-25T00:00:00Z"), Time("2019-11-25T00:01:00Z"), Time("2019-11-25T00:00:28Z"), int64(2), "f0", "m0", "a0"},2478 {Time("2019-11-25T00:00:00Z"), Time("2019-11-25T00:01:00Z"), Time("2019-11-25T00:00:38Z"), nil, "f0", "m0", "a0"},2479 {Time("2019-11-25T00:00:00Z"), Time("2019-11-25T00:01:00Z"), Time("2019-11-25T00:00:48Z"), int64(1), "f0", "m0", "a0"},2480 {Time("2019-11-25T00:00:00Z"), Time("2019-11-25T00:01:00Z"), Time("2019-11-25T00:00:58Z"), nil, "f0", "m0", "a0"},2481 {Time("2019-11-25T00:00:00Z"), Time("2019-11-25T00:01:00Z"), Time("2019-11-25T00:01:00Z"), nil, "f0", "m0", "a0"},2482 },2483 }}2484 executetest.NormalizeTables(want)2485 sort.Sort(executetest.SortedTables(want))2486 var got []*executetest.Table2487 if err := ti.Do(func(table flux.Table) error {2488 t, err := executetest.ConvertTable(table)2489 if err != nil {2490 return err2491 }2492 got = append(got, t)2493 return nil2494 }); err != nil {2495 t.Fatal(err)2496 }2497 executetest.NormalizeTables(got)2498 sort.Sort(executetest.SortedTables(got))2499 // compare these two2500 if diff := cmp.Diff(want, got); diff != "" {2501 t.Errorf("unexpected results -want/+got:\n%s", diff)2502 }2503}2504func TestStorageReader_EmptyTableNoEmptyWindows(t *testing.T) {2505 reader := NewStorageReader(t, func(org, bucket platform.ID) (datagen.SeriesGenerator, datagen.TimeRange) {2506 tagsSpec := &datagen.TagsSpec{2507 Tags: []*datagen.TagValuesSpec{2508 {2509 TagKey: "t0",2510 Values: func() datagen.CountableSequence {2511 return datagen.NewCounterByteSequence("a%s", 0, 1)2512 },2513 },2514 },2515 }2516 spec := datagen.Spec{2517 Measurements: []datagen.MeasurementSpec{2518 {2519 Name: "m0",2520 TagsSpec: tagsSpec,2521 FieldValuesSpec: &datagen.FieldValuesSpec{2522 Name: "f0",2523 TimeSequenceSpec: datagen.TimeSequenceSpec{2524 Count: math.MaxInt32,2525 Delta: 10 * time.Second,2526 },2527 DataType: models.Integer,2528 Values: func(spec datagen.TimeSequenceSpec) datagen.TimeValuesSequence {2529 return datagen.NewTimeIntegerValuesSequence(2530 spec.Count,2531 datagen.NewTimestampSequenceFromSpec(spec),2532 datagen.NewIntegerArrayValuesSequence([]int64{1}),2533 )2534 },2535 },2536 },2537 },2538 }2539 tr := datagen.TimeRange{2540 Start: mustParseTime("2019-11-25T00:00:10Z"),2541 End: mustParseTime("2019-11-25T00:00:30Z"),2542 }2543 return datagen.NewSeriesGeneratorFromSpec(&spec, tr), tr2544 })2545 defer reader.Close()2546 mem := &memory.Allocator{}2547 ti, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{2548 ReadFilterSpec: query.ReadFilterSpec{2549 OrganizationID: reader.Org,2550 BucketID: reader.Bucket,2551 Bounds: reader.Bounds,2552 },2553 Window: execute.Window{2554 Every: flux.ConvertDuration(10 * time.Second),2555 Period: flux.ConvertDuration(10 * time.Second),2556 },2557 Aggregates: []plan.ProcedureKind{2558 storageflux.FirstKind,2559 },2560 CreateEmpty: true,2561 }, mem)2562 if err != nil {2563 t.Fatal(err)2564 }2565 makeWindowTable := func(start, stop, time values.Time, v int64) *executetest.Table {2566 return &executetest.Table{2567 KeyCols: []string{"_start", "_stop", "_field", "_measurement", "t0"},2568 ColMeta: []flux.ColMeta{2569 {Label: "_start", Type: flux.TTime},2570 {Label: "_stop", Type: flux.TTime},2571 {Label: "_time", Type: flux.TTime},2572 {Label: "_value", Type: flux.TInt},2573 {Label: "_field", Type: flux.TString},2574 {Label: "_measurement", Type: flux.TString},2575 {Label: "t0", Type: flux.TString},2576 },2577 Data: [][]interface{}{2578 {start, stop, time, v, "f0", "m0", "a0"},2579 },2580 }2581 }2582 want := []*executetest.Table{2583 makeWindowTable(2584 Time("2019-11-25T00:00:10Z"), Time("2019-11-25T00:00:20Z"), Time("2019-11-25T00:00:10Z"), 1,2585 ),2586 makeWindowTable(2587 Time("2019-11-25T00:00:20Z"), Time("2019-11-25T00:00:30Z"), Time("2019-11-25T00:00:20Z"), 1,2588 ),2589 }2590 executetest.NormalizeTables(want)2591 sort.Sort(executetest.SortedTables(want))2592 var got []*executetest.Table2593 if err := ti.Do(func(table flux.Table) error {2594 t, err := executetest.ConvertTable(table)2595 if err != nil {2596 return err2597 }2598 got = append(got, t)2599 return nil2600 }); err != nil {2601 t.Fatal(err)2602 }2603 executetest.NormalizeTables(got)2604 sort.Sort(executetest.SortedTables(got))2605 // compare these two2606 if diff := cmp.Diff(want, got); diff != "" {2607 t.Errorf("unexpected results -want/+got:\n%s", diff)2608 }2609}2610func getStorageEqPred(lhsTagKey, rhsTagValue string) *storageproto.Predicate {2611 return &storageproto.Predicate{2612 Root: &storageproto.Node{2613 NodeType: storageproto.NodeTypeComparisonExpression,2614 Value: &storageproto.Node_Comparison_{2615 Comparison: storageproto.ComparisonEqual,2616 },2617 Children: []*storageproto.Node{2618 {2619 NodeType: storageproto.NodeTypeTagRef,2620 Value: &storageproto.Node_TagRefValue{2621 TagRefValue: lhsTagKey,2622 },2623 },2624 {2625 NodeType: storageproto.NodeTypeLiteral,2626 Value: &storageproto.Node_StringValue{2627 StringValue: rhsTagValue,2628 },2629 },2630 },2631 },2632 }2633}2634func TestStorageReader_ReadGroup(t *testing.T) {2635 reader := NewStorageReader(t, func(org, bucket platform.ID) (datagen.SeriesGenerator, datagen.TimeRange) {2636 spec := Spec(org, bucket,2637 MeasurementSpec("m0",2638 FloatArrayValuesSequence("f0", 10*time.Second, []float64{1.0, 2.0, 3.0, 4.0}),2639 TagValuesSequence("t0", "a-%s", 0, 3),2640 ),2641 )2642 tr := TimeRange("2019-11-25T00:00:00Z", "2019-11-25T00:02:00Z")2643 return datagen.NewSeriesGeneratorFromSpec(spec, tr), tr2644 })2645 defer reader.Close()2646 for _, tt := range []struct {2647 aggregate string2648 filter *storageproto.Predicate2649 want flux.TableIterator2650 }{2651 {2652 aggregate: storageflux.CountKind,2653 want: static.TableGroup{2654 static.StringKey("_measurement", "m0"),2655 static.StringKey("_field", "f0"),2656 static.TimeKey("_start", "2019-11-25T00:00:00Z"),2657 static.TimeKey("_stop", "2019-11-25T00:02:00Z"),2658 static.TableMatrix{2659 static.StringKeys("t0", "a-0", "a-1", "a-2"),2660 {2661 static.Table{2662 static.Ints("_value", 12),2663 },2664 },2665 },2666 },2667 },2668 {2669 aggregate: storageflux.SumKind,2670 want: static.TableGroup{2671 static.StringKey("_measurement", "m0"),2672 static.StringKey("_field", "f0"),2673 static.TimeKey("_start", "2019-11-25T00:00:00Z"),2674 static.TimeKey("_stop", "2019-11-25T00:02:00Z"),2675 static.TableMatrix{2676 static.StringKeys("t0", "a-0", "a-1", "a-2"),2677 {2678 static.Table{2679 static.Floats("_value", 30),2680 },2681 },2682 },2683 },2684 },2685 {2686 aggregate: storageflux.SumKind,2687 filter: getStorageEqPred("t0", "z-9"),2688 want: static.TableGroup{},2689 },2690 {2691 aggregate: storageflux.MinKind,2692 want: static.TableGroup{2693 static.StringKey("_measurement", "m0"),2694 static.StringKey("_field", "f0"),2695 static.TimeKey("_start", "2019-11-25T00:00:00Z"),2696 static.TimeKey("_stop", "2019-11-25T00:02:00Z"),2697 static.TableMatrix{2698 static.StringKeys("t0", "a-0", "a-1", "a-2"),2699 {2700 static.Table{2701 static.Times("_time", "2019-11-25T00:00:00Z"),2702 static.Floats("_value", 1),2703 },2704 },2705 },2706 },2707 },2708 {2709 aggregate: storageflux.MaxKind,2710 want: static.TableGroup{2711 static.StringKey("_measurement", "m0"),2712 static.StringKey("_field", "f0"),2713 static.TimeKey("_start", "2019-11-25T00:00:00Z"),2714 static.TimeKey("_stop", "2019-11-25T00:02:00Z"),2715 static.TableMatrix{2716 static.StringKeys("t0", "a-0", "a-1", "a-2"),2717 {2718 static.Table{2719 static.Times("_time", "2019-11-25T00:00:30Z"),2720 static.Floats("_value", 4),2721 },2722 },2723 },2724 },2725 },2726 {2727 aggregate: storageflux.MaxKind,2728 filter: getStorageEqPred("t0", "z-9"),2729 want: static.TableGroup{},2730 },2731 } {2732 t.Run(tt.aggregate, func(t *testing.T) {2733 mem := arrowmem.NewCheckedAllocator(arrowmem.DefaultAllocator)2734 defer mem.AssertSize(t, 0)2735 alloc := &memory.Allocator{2736 Allocator: mem,2737 }2738 got, err := reader.ReadGroup(context.Background(), query.ReadGroupSpec{2739 ReadFilterSpec: query.ReadFilterSpec{2740 OrganizationID: reader.Org,2741 BucketID: reader.Bucket,2742 Bounds: reader.Bounds,2743 Predicate: tt.filter,2744 },2745 GroupMode: query.GroupModeBy,2746 GroupKeys: []string{"_measurement", "_field", "t0"},2747 AggregateMethod: tt.aggregate,2748 }, alloc)2749 if err != nil {2750 t.Fatal(err)2751 }2752 if diff := table.Diff(tt.want, got); diff != "" {2753 t.Errorf("unexpected results -want/+got:\n%s", diff)2754 }2755 })2756 }2757}2758// TestStorageReader_ReadGroupSelectTags exercises group-selects where the tag2759// values vary among the candidate items for select and the read-group2760// operation must track and return the correct set of tags.2761func TestStorageReader_ReadGroupSelectTags(t *testing.T) {2762 reader := NewStorageReader(t, func(org, bucket platform.ID) (datagen.SeriesGenerator, datagen.TimeRange) {2763 spec := Spec(org, bucket,2764 MeasurementSpec("m0",2765 FloatArrayValuesSequence("f0", 10*time.Second, []float64{1.0, 2.0, 3.0, 4.0}),2766 TagValuesSequence("t0", "a-%s", 0, 3),2767 TagValuesSequence("t1", "b-%s", 0, 1),2768 ),2769 MeasurementSpec("m0",2770 FloatArrayValuesSequence("f0", 10*time.Second, []float64{5.0, 6.0, 7.0, 8.0}),2771 TagValuesSequence("t0", "a-%s", 0, 3),2772 TagValuesSequence("t1", "b-%s", 1, 2),2773 ),2774 )2775 tr := TimeRange("2019-11-25T00:00:00Z", "2019-11-25T00:02:00Z")2776 return datagen.NewSeriesGeneratorFromSpec(spec, tr), tr2777 })2778 defer reader.Close()2779 cases := []struct {2780 aggregate string2781 want flux.TableIterator2782 }{2783 {2784 aggregate: storageflux.MinKind,2785 want: static.TableGroup{2786 static.TimeKey("_start", "2019-11-25T00:00:00Z"),2787 static.TimeKey("_stop", "2019-11-25T00:02:00Z"),2788 static.TableMatrix{2789 static.StringKeys("t0", "a-0", "a-1", "a-2"),2790 {2791 static.Table{2792 static.Strings("t1", "b-0"),2793 static.Strings("_measurement", "m0"),2794 static.Strings("_field", "f0"),2795 static.Times("_time", "2019-11-25T00:00:00Z"),2796 static.Floats("_value", 1.0),2797 },2798 },2799 },2800 },2801 },2802 {2803 aggregate: storageflux.MaxKind,2804 want: static.TableGroup{2805 static.TimeKey("_start", "2019-11-25T00:00:00Z"),2806 static.TimeKey("_stop", "2019-11-25T00:02:00Z"),2807 static.TableMatrix{2808 static.StringKeys("t0", "a-0", "a-1", "a-2"),2809 {2810 static.Table{2811 static.Strings("t1", "b-1"),2812 static.Strings("_measurement", "m0"),2813 static.Strings("_field", "f0"),2814 static.Times("_time", "2019-11-25T00:00:30Z"),2815 static.Floats("_value", 8.0),2816 },2817 },2818 },2819 },2820 },2821 }2822 for _, tt := range cases {2823 t.Run("", func(t *testing.T) {2824 mem := &memory.Allocator{}2825 got, err := reader.ReadGroup(context.Background(), query.ReadGroupSpec{2826 ReadFilterSpec: query.ReadFilterSpec{2827 OrganizationID: reader.Org,2828 BucketID: reader.Bucket,2829 Bounds: reader.Bounds,2830 },2831 GroupMode: query.GroupModeBy,2832 GroupKeys: []string{"t0"},2833 AggregateMethod: tt.aggregate,2834 }, mem)2835 if err != nil {2836 t.Fatal(err)2837 }2838 if diff := table.Diff(tt.want, got); diff != "" {2839 t.Errorf("unexpected results -want/+got:\n%s", diff)2840 }2841 })2842 }2843}2844// TestStorageReader_ReadGroupNoAgg exercises the path where no aggregate is specified2845func TestStorageReader_ReadGroupNoAgg(t *testing.T) {2846 reader := NewStorageReader(t, func(org, bucket platform.ID) (datagen.SeriesGenerator, datagen.TimeRange) {2847 spec := Spec(org, bucket,2848 MeasurementSpec("m0",2849 FloatArrayValuesSequence("f0", 10*time.Second, []float64{1.0, 2.0, 3.0, 4.0}),2850 TagValuesSequence("t1", "b-%s", 0, 2),2851 ),2852 )2853 tr := TimeRange("2019-11-25T00:00:00Z", "2019-11-25T00:00:40Z")2854 return datagen.NewSeriesGeneratorFromSpec(spec, tr), tr2855 })2856 defer reader.Close()2857 cases := []struct {2858 aggregate string2859 want flux.TableIterator2860 }{2861 {2862 want: static.TableGroup{2863 static.TableMatrix{2864 {2865 static.Table{2866 static.StringKey("t1", "b-0"),2867 static.Strings("_measurement", "m0", "m0", "m0", "m0"),2868 static.Strings("_field", "f0", "f0", "f0", "f0"),2869 static.TimeKey("_start", "2019-11-25T00:00:00Z"),2870 static.TimeKey("_stop", "2019-11-25T00:00:40Z"),2871 static.Times("_time", "2019-11-25T00:00:00Z", "2019-11-25T00:00:10Z", "2019-11-25T00:00:20Z", "2019-11-25T00:00:30Z"),2872 static.Floats("_value", 1.0, 2.0, 3.0, 4.0),2873 },2874 },2875 {2876 static.Table{2877 static.StringKey("t1", "b-1"),2878 static.Strings("_measurement", "m0", "m0", "m0", "m0"),2879 static.Strings("_field", "f0", "f0", "f0", "f0"),2880 static.TimeKey("_start", "2019-11-25T00:00:00Z"),2881 static.TimeKey("_stop", "2019-11-25T00:00:40Z"),2882 static.Times("_time", "2019-11-25T00:00:00Z", "2019-11-25T00:00:10Z", "2019-11-25T00:00:20Z", "2019-11-25T00:00:30Z"),2883 static.Floats("_value", 1.0, 2.0, 3.0, 4.0),2884 },2885 },2886 },2887 },2888 },2889 }2890 for _, tt := range cases {2891 t.Run("", func(t *testing.T) {2892 mem := &memory.Allocator{}2893 got, err := reader.ReadGroup(context.Background(), query.ReadGroupSpec{2894 ReadFilterSpec: query.ReadFilterSpec{2895 OrganizationID: reader.Org,2896 BucketID: reader.Bucket,2897 Bounds: reader.Bounds,2898 },2899 GroupMode: query.GroupModeBy,2900 GroupKeys: []string{"t1"},2901 }, mem)2902 if err != nil {2903 t.Fatal(err)2904 }2905 if diff := table.Diff(tt.want, got); diff != "" {2906 t.Errorf("unexpected results -want/+got:\n%s", diff)2907 }2908 })2909 }2910}2911func TestStorageReader_ReadWindowAggregateMonths(t *testing.T) {2912 reader := NewStorageReader(t, func(org, bucket platform.ID) (datagen.SeriesGenerator, datagen.TimeRange) {2913 spec := Spec(org, bucket,2914 MeasurementSpec("m0",2915 FloatArrayValuesSequence("f0", 24*time.Hour, []float64{1.0, 2.0, 3.0, 4.0}),2916 TagValuesSequence("t0", "a-%s", 0, 3),2917 ),2918 )2919 tr := TimeRange("2019-09-01T00:00:00Z", "2019-12-01T00:00:00Z")2920 return datagen.NewSeriesGeneratorFromSpec(spec, tr), tr2921 })2922 defer reader.Close()2923 for _, tt := range []struct {2924 aggregate plan.ProcedureKind2925 want flux.TableIterator2926 }{2927 {2928 aggregate: storageflux.CountKind,2929 want: static.TableGroup{2930 static.StringKey("_measurement", "m0"),2931 static.StringKey("_field", "f0"),2932 static.TableMatrix{2933 static.StringKeys("t0", "a-0", "a-1", "a-2"),2934 {2935 static.Table{2936 static.TimeKey("_start", "2019-09-01T00:00:00Z"),2937 static.TimeKey("_stop", "2019-10-01T00:00:00Z"),2938 static.Ints("_value", 30),2939 },2940 },2941 {2942 static.Table{2943 static.TimeKey("_start", "2019-10-01T00:00:00Z"),2944 static.TimeKey("_stop", "2019-11-01T00:00:00Z"),2945 static.Ints("_value", 31),2946 },2947 },2948 {2949 static.Table{2950 static.TimeKey("_start", "2019-11-01T00:00:00Z"),2951 static.TimeKey("_stop", "2019-12-01T00:00:00Z"),2952 static.Ints("_value", 30),2953 },2954 },2955 },2956 },2957 },2958 {2959 aggregate: storageflux.MinKind,2960 want: static.TableGroup{2961 static.StringKey("_measurement", "m0"),2962 static.StringKey("_field", "f0"),2963 static.TableMatrix{2964 static.StringKeys("t0", "a-0", "a-1", "a-2"),2965 {2966 static.Table{2967 static.TimeKey("_start", "2019-09-01T00:00:00Z"),2968 static.TimeKey("_stop", "2019-10-01T00:00:00Z"),2969 static.Times("_time", "2019-09-01T00:00:00Z"),2970 static.Floats("_value", 1),2971 },2972 static.Table{2973 static.TimeKey("_start", "2019-10-01T00:00:00Z"),2974 static.TimeKey("_stop", "2019-11-01T00:00:00Z"),2975 static.Times("_time", "2019-10-03T00:00:00Z"),2976 static.Floats("_value", 1),2977 },2978 static.Table{2979 static.TimeKey("_start", "2019-11-01T00:00:00Z"),2980 static.TimeKey("_stop", "2019-12-01T00:00:00Z"),2981 static.Times("_time", "2019-11-04T00:00:00Z"),2982 static.Floats("_value", 1),2983 },2984 },2985 },2986 },2987 },2988 {2989 aggregate: storageflux.MaxKind,2990 want: static.TableGroup{2991 static.StringKey("_measurement", "m0"),2992 static.StringKey("_field", "f0"),2993 static.TableMatrix{2994 static.StringKeys("t0", "a-0", "a-1", "a-2"),2995 {2996 static.Table{2997 static.TimeKey("_start", "2019-09-01T00:00:00Z"),2998 static.TimeKey("_stop", "2019-10-01T00:00:00Z"),2999 static.Times("_time", "2019-09-04T00:00:00Z"),3000 static.Floats("_value", 4),3001 },3002 static.Table{3003 static.TimeKey("_start", "2019-10-01T00:00:00Z"),3004 static.TimeKey("_stop", "2019-11-01T00:00:00Z"),3005 static.Times("_time", "2019-10-02T00:00:00Z"),3006 static.Floats("_value", 4),3007 },3008 static.Table{3009 static.TimeKey("_start", "2019-11-01T00:00:00Z"),3010 static.TimeKey("_stop", "2019-12-01T00:00:00Z"),3011 static.Times("_time", "2019-11-03T00:00:00Z"),3012 static.Floats("_value", 4),3013 },3014 },3015 },3016 },3017 },3018 } {3019 mem := arrowmem.NewCheckedAllocator(arrowmem.DefaultAllocator)3020 defer mem.AssertSize(t, 0)3021 alloc := &memory.Allocator{3022 Allocator: mem,3023 }3024 got, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{3025 ReadFilterSpec: query.ReadFilterSpec{3026 OrganizationID: reader.Org,3027 BucketID: reader.Bucket,3028 Bounds: reader.Bounds,3029 },3030 Window: execute.Window{3031 Every: values.MakeDuration(0, 1, false),3032 Period: values.MakeDuration(0, 1, false),3033 },3034 Aggregates: []plan.ProcedureKind{3035 tt.aggregate,3036 },3037 }, alloc)3038 if err != nil {3039 t.Fatal(err)3040 }3041 if diff := table.Diff(tt.want, got); diff != "" {3042 t.Errorf("unexpected results for %v aggregate -want/+got:\n%s", tt.aggregate, diff)3043 }3044 }3045}3046// TestStorageReader_Backoff will invoke the read function3047// and then send the table to a separate goroutine so it doesn't3048// block the table iterator. The table iterator should be blocked3049// until it is read by the other goroutine.3050func TestStorageReader_Backoff(t *testing.T) {3051 t.Skip("memory allocations are not tracked properly")3052 reader := NewStorageReader(t, func(org, bucket platform.ID) (datagen.SeriesGenerator, datagen.TimeRange) {3053 spec := Spec(org, bucket,3054 MeasurementSpec("m0",3055 FloatArrayValuesSequence("f0", 10*time.Second, []float64{1.0, 2.0, 3.0, 4.0}),3056 TagValuesSequence("t0", "a-%s", 0, 3),3057 ),3058 )3059 tr := TimeRange("2019-09-01T00:00:00Z", "2019-09-02T00:00:00Z")3060 return datagen.NewSeriesGeneratorFromSpec(spec, tr), tr3061 })3062 defer reader.Close()3063 for _, tt := range []struct {3064 name string3065 read func(reader *StorageReader, mem *memory.Allocator) (flux.TableIterator, error)3066 }{3067 {3068 name: "ReadFilter",3069 read: func(reader *StorageReader, mem *memory.Allocator) (flux.TableIterator, error) {3070 return reader.ReadFilter(context.Background(), query.ReadFilterSpec{3071 OrganizationID: reader.Org,3072 BucketID: reader.Bucket,3073 Bounds: reader.Bounds,3074 }, mem)3075 },3076 },3077 {3078 name: "ReadGroup",3079 read: func(reader *StorageReader, mem *memory.Allocator) (flux.TableIterator, error) {3080 return reader.ReadGroup(context.Background(), query.ReadGroupSpec{3081 ReadFilterSpec: query.ReadFilterSpec{3082 OrganizationID: reader.Org,3083 BucketID: reader.Bucket,3084 Bounds: reader.Bounds,3085 },3086 GroupMode: query.GroupModeBy,3087 GroupKeys: []string{"_measurement", "_field"},3088 }, mem)3089 },3090 },3091 {3092 name: "ReadWindowAggregate",3093 read: func(reader *StorageReader, mem *memory.Allocator) (flux.TableIterator, error) {3094 return reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{3095 ReadFilterSpec: query.ReadFilterSpec{3096 OrganizationID: reader.Org,3097 BucketID: reader.Bucket,3098 Bounds: reader.Bounds,3099 },3100 Aggregates: []plan.ProcedureKind{3101 storageflux.MeanKind,3102 },3103 TimeColumn: execute.DefaultStopColLabel,3104 Window: execute.Window{3105 Every: values.ConvertDurationNsecs(20 * time.Second),3106 Period: values.ConvertDurationNsecs(20 * time.Second),3107 },3108 }, mem)3109 },3110 },3111 } {3112 t.Run(tt.name, func(t *testing.T) {3113 // Read the table and learn what the maximum allocated3114 // value is. We don't want to exceed this.3115 mem := &memory.Allocator{}3116 tables, err := tt.read(reader, mem)3117 if err != nil {3118 t.Fatal(err)3119 }3120 if err := tables.Do(func(t flux.Table) error {3121 return t.Do(func(cr flux.ColReader) error {3122 return nil3123 })3124 }); err != nil {3125 t.Fatal(err)3126 }3127 // The total allocated should not be the same3128 // as the max allocated. If this is the case, we3129 // either had one buffer or did not correctly3130 // release memory for each buffer.3131 if mem.MaxAllocated() == mem.TotalAllocated() {3132 t.Fatal("max allocated is the same as total allocated, they must be different for this test to be meaningful")3133 }3134 // Recreate the memory allocator and set the limit3135 // to the max allocated. This will cause a panic3136 // if the next buffer attempts to be allocated3137 // before the first.3138 limit := mem.MaxAllocated()3139 mem = &memory.Allocator{Limit: &limit}3140 tables, err = tt.read(reader, mem)3141 if err != nil {3142 t.Fatal(err)3143 }3144 var wg sync.WaitGroup3145 _ = tables.Do(func(t flux.Table) error {3146 wg.Add(1)3147 go func() {3148 defer wg.Done()3149 _ = t.Do(func(cr flux.ColReader) error {3150 return nil3151 })3152 }()3153 return nil3154 })3155 wg.Wait()3156 })3157 }3158}3159func BenchmarkReadFilter(b *testing.B) {3160 setupFn := func(org, bucket platform.ID) (datagen.SeriesGenerator, datagen.TimeRange) {3161 tagsSpec := &datagen.TagsSpec{3162 Tags: []*datagen.TagValuesSpec{3163 {3164 TagKey: "t0",3165 Values: func() datagen.CountableSequence {3166 return datagen.NewCounterByteSequence("a-%s", 0, 5)3167 },3168 },3169 {3170 TagKey: "t1",3171 Values: func() datagen.CountableSequence {3172 return datagen.NewCounterByteSequence("b-%s", 0, 1000)3173 },3174 },3175 },3176 }3177 spec := datagen.Spec{3178 Measurements: []datagen.MeasurementSpec{3179 {3180 Name: "m0",3181 TagsSpec: tagsSpec,3182 FieldValuesSpec: &datagen.FieldValuesSpec{3183 Name: "f0",3184 TimeSequenceSpec: datagen.TimeSequenceSpec{3185 Count: math.MaxInt32,3186 Delta: time.Minute,3187 },3188 DataType: models.Float,3189 Values: func(spec datagen.TimeSequenceSpec) datagen.TimeValuesSequence {3190 r := rand.New(rand.NewSource(10))3191 return datagen.NewTimeFloatValuesSequence(3192 spec.Count,3193 datagen.NewTimestampSequenceFromSpec(spec),3194 datagen.NewFloatRandomValuesSequence(0, 90, r),3195 )3196 },3197 },3198 },3199 {3200 Name: "m0",3201 TagsSpec: tagsSpec,3202 FieldValuesSpec: &datagen.FieldValuesSpec{3203 Name: "f1",3204 TimeSequenceSpec: datagen.TimeSequenceSpec{3205 Count: math.MaxInt32,3206 Delta: time.Minute,3207 },3208 DataType: models.Float,3209 Values: func(spec datagen.TimeSequenceSpec) datagen.TimeValuesSequence {3210 r := rand.New(rand.NewSource(11))3211 return datagen.NewTimeFloatValuesSequence(3212 spec.Count,3213 datagen.NewTimestampSequenceFromSpec(spec),3214 datagen.NewFloatRandomValuesSequence(0, 180, r),3215 )3216 },3217 },3218 },3219 {3220 Name: "m0",3221 TagsSpec: tagsSpec,3222 FieldValuesSpec: &datagen.FieldValuesSpec{3223 Name: "f1",3224 TimeSequenceSpec: datagen.TimeSequenceSpec{3225 Count: math.MaxInt32,3226 Delta: time.Minute,3227 },3228 DataType: models.Float,3229 Values: func(spec datagen.TimeSequenceSpec) datagen.TimeValuesSequence {3230 r := rand.New(rand.NewSource(12))3231 return datagen.NewTimeFloatValuesSequence(3232 spec.Count,3233 datagen.NewTimestampSequenceFromSpec(spec),3234 datagen.NewFloatRandomValuesSequence(10, 10000, r),3235 )3236 },3237 },3238 },3239 },3240 }3241 tr := datagen.TimeRange{3242 Start: mustParseTime("2019-11-25T00:00:00Z"),3243 End: mustParseTime("2019-11-26T00:00:00Z"),3244 }3245 return datagen.NewSeriesGeneratorFromSpec(&spec, tr), tr3246 }3247 benchmarkRead(b, setupFn, func(r *StorageReader) error {3248 mem := &memory.Allocator{}3249 tables, err := r.ReadFilter(context.Background(), query.ReadFilterSpec{3250 OrganizationID: r.Org,3251 BucketID: r.Bucket,3252 Bounds: r.Bounds,3253 }, mem)3254 if err != nil {3255 return err3256 }3257 return tables.Do(func(table flux.Table) error {3258 table.Done()3259 return nil3260 })3261 })3262}3263func BenchmarkReadGroup(b *testing.B) {3264 setupFn := func(org, bucket platform.ID) (datagen.SeriesGenerator, datagen.TimeRange) {3265 tagsSpec := &datagen.TagsSpec{3266 Tags: []*datagen.TagValuesSpec{3267 {3268 TagKey: "t0",3269 Values: func() datagen.CountableSequence {3270 return datagen.NewCounterByteSequence("a-%s", 0, 5)3271 },3272 },3273 {3274 TagKey: "t1",3275 Values: func() datagen.CountableSequence {3276 return datagen.NewCounterByteSequence("b-%s", 0, 1000)3277 },3278 },3279 },3280 }3281 spec := datagen.Spec{3282 Measurements: []datagen.MeasurementSpec{3283 {3284 Name: "m0",3285 TagsSpec: tagsSpec,3286 FieldValuesSpec: &datagen.FieldValuesSpec{3287 Name: "f0",3288 TimeSequenceSpec: datagen.TimeSequenceSpec{3289 Count: math.MaxInt32,3290 Delta: time.Minute,3291 },3292 DataType: models.Float,3293 Values: func(spec datagen.TimeSequenceSpec) datagen.TimeValuesSequence {3294 r := rand.New(rand.NewSource(10))3295 return datagen.NewTimeFloatValuesSequence(3296 spec.Count,3297 datagen.NewTimestampSequenceFromSpec(spec),3298 datagen.NewFloatRandomValuesSequence(0, 90, r),3299 )3300 },3301 },3302 },3303 {3304 Name: "m0",3305 TagsSpec: tagsSpec,3306 FieldValuesSpec: &datagen.FieldValuesSpec{3307 Name: "f1",3308 TimeSequenceSpec: datagen.TimeSequenceSpec{3309 Count: math.MaxInt32,3310 Delta: time.Minute,3311 },3312 DataType: models.Float,3313 Values: func(spec datagen.TimeSequenceSpec) datagen.TimeValuesSequence {3314 r := rand.New(rand.NewSource(11))3315 return datagen.NewTimeFloatValuesSequence(3316 spec.Count,3317 datagen.NewTimestampSequenceFromSpec(spec),3318 datagen.NewFloatRandomValuesSequence(0, 180, r),3319 )3320 },3321 },3322 },3323 {3324 Name: "m0",3325 TagsSpec: tagsSpec,3326 FieldValuesSpec: &datagen.FieldValuesSpec{3327 Name: "f1",3328 TimeSequenceSpec: datagen.TimeSequenceSpec{3329 Count: math.MaxInt32,3330 Delta: time.Minute,3331 },3332 DataType: models.Float,3333 Values: func(spec datagen.TimeSequenceSpec) datagen.TimeValuesSequence {3334 r := rand.New(rand.NewSource(12))3335 return datagen.NewTimeFloatValuesSequence(3336 spec.Count,3337 datagen.NewTimestampSequenceFromSpec(spec),3338 datagen.NewFloatRandomValuesSequence(10, 100, r),3339 )3340 },3341 },3342 },3343 },3344 }3345 tr := datagen.TimeRange{3346 Start: mustParseTime("2019-11-25T00:00:00Z"),3347 End: mustParseTime("2019-11-25T00:10:00Z"),3348 }3349 return datagen.NewSeriesGeneratorFromSpec(&spec, tr), tr3350 }3351 benchmarkRead(b, setupFn, func(r *StorageReader) error {3352 mem := &memory.Allocator{}3353 tables, err := r.ReadGroup(context.Background(), query.ReadGroupSpec{3354 ReadFilterSpec: query.ReadFilterSpec{3355 OrganizationID: r.Org,3356 BucketID: r.Bucket,3357 Bounds: r.Bounds,3358 },3359 GroupMode: query.GroupModeBy,3360 GroupKeys: []string{"_start", "_stop", "t0"},3361 AggregateMethod: storageflux.MinKind,3362 }, mem)3363 if err != nil {3364 return err3365 }3366 err = tables.Do(func(table flux.Table) error {3367 table.Done()3368 return nil3369 })3370 return err3371 })3372}3373func benchmarkRead(b *testing.B, setupFn SetupFunc, f func(r *StorageReader) error) {3374 reader := NewStorageReader(b, setupFn)3375 defer reader.Close()3376 b.ResetTimer()3377 b.ReportAllocs()3378 for i := 0; i < b.N; i++ {3379 if err := f(reader); err != nil {3380 b.Fatal(err)3381 }3382 }3383}3384func Time(s string) execute.Time {3385 ts := mustParseTime(s)3386 return execute.Time(ts.UnixNano())3387}3388func mustParseTime(s string) time.Time {3389 ts, err := time.Parse(time.RFC3339, s)3390 if err != nil {3391 panic(err)3392 }3393 return ts3394}3395func Spec(org, bucket platform.ID, measurements ...datagen.MeasurementSpec) *datagen.Spec {3396 return &datagen.Spec{3397 Measurements: measurements,3398 }3399}3400func MeasurementSpec(name string, field *datagen.FieldValuesSpec, tags ...*datagen.TagValuesSpec) datagen.MeasurementSpec {3401 return datagen.MeasurementSpec{3402 Name: name,3403 TagsSpec: TagsSpec(tags...),3404 FieldValuesSpec: field,3405 }3406}3407func FloatArrayValuesSequence(name string, delta time.Duration, values []float64) *datagen.FieldValuesSpec {3408 return &datagen.FieldValuesSpec{3409 Name: name,3410 TimeSequenceSpec: datagen.TimeSequenceSpec{3411 Count: math.MaxInt32,3412 Delta: delta,3413 },3414 DataType: models.Float,3415 Values: func(spec datagen.TimeSequenceSpec) datagen.TimeValuesSequence {3416 return datagen.NewTimeFloatValuesSequence(3417 spec.Count,3418 datagen.NewTimestampSequenceFromSpec(spec),3419 datagen.NewFloatArrayValuesSequence(values),3420 )3421 },3422 }3423}3424func TagsSpec(specs ...*datagen.TagValuesSpec) *datagen.TagsSpec {3425 return &datagen.TagsSpec{Tags: specs}3426}3427func TagValuesSequence(key, format string, start, stop int) *datagen.TagValuesSpec {3428 return &datagen.TagValuesSpec{3429 TagKey: key,3430 Values: func() datagen.CountableSequence {3431 return datagen.NewCounterByteSequence(format, start, stop)3432 },3433 }3434}3435func TimeRange(start, end string) datagen.TimeRange {3436 return datagen.TimeRange{3437 Start: mustParseTime(start),3438 End: mustParseTime(end),3439 }3440}3441// seriesBatchSize specifies the number of series keys passed to the index.3442const seriesBatchSize = 10003443func writeShard(sfile *tsdb.SeriesFile, sg datagen.SeriesGenerator, id uint64, path string) error {3444 sw := shard.NewWriter(id, path)3445 defer sw.Close()3446 var (3447 keys [][]byte3448 names [][]byte3449 tags []models.Tags3450 )3451 for sg.Next() {3452 seriesKey := sg.Key()3453 keys = append(keys, seriesKey)3454 names = append(names, sg.Name())3455 tags = append(tags, sg.Tags())3456 if len(keys) == seriesBatchSize {3457 if _, err := sfile.CreateSeriesListIfNotExists(names, tags); err != nil {3458 return err3459 }3460 keys = keys[:0]3461 names = names[:0]3462 tags = tags[:0]3463 }3464 vg := sg.TimeValuesGenerator()3465 key := tsm1.SeriesFieldKeyBytes(string(seriesKey), string(sg.Field()))3466 for vg.Next() {3467 sw.WriteV(key, vg.Values())3468 }3469 if err := sw.Err(); err != nil {3470 return err3471 }3472 }3473 if len(keys) > seriesBatchSize {3474 if _, err := sfile.CreateSeriesListIfNotExists(names, tags); err != nil {3475 return err3476 }3477 }3478 return nil3479}...
client_pact_test.go
Source:client_pact_test.go
...45 WithJSONBody(Like(pacticipant)).46 WillRespondWith(200).47 WithHeader("Content-Type", S("application/hal+json")).48 WithJSONBody(Like(pacticipant))49 err = mockProvider.ExecuteTest(t, func(config MockServerConfig) error {50 client := clientForPact(config)51 res, e := client.CreatePacticipant(pacticipant)52 assert.NoError(t, e)53 assert.Equal(t, "terraform-client", res.Name)54 return e55 })56 assert.NoError(t, err)57 })58 t.Run("ReadPacticipant", func(t *testing.T) {59 mockProvider.60 AddInteraction().61 Given("a pacticipant with name terraform-client exists").62 UponReceiving("a request to get a pacticipant").63 WithRequest("GET", S("/pacticipants/terraform-client")).64 WithHeader("Authorization", Like("Bearer 1234")).65 WillRespondWith(200).66 WithHeader("Content-Type", S("application/hal+json")).67 WithBodyMatch(&broker.Pacticipant{})68 err = mockProvider.ExecuteTest(t, func(config MockServerConfig) error {69 client := clientForPact(config)70 res, e := client.ReadPacticipant("terraform-client")71 assert.NoError(t, e)72 assert.Equal(t, "terraform-client", res.Name)73 assert.NotEmpty(t, res.RepositoryURL)74 return e75 })76 assert.NoError(t, err)77 })78 t.Run("UpdatePacticipant", func(t *testing.T) {79 mockProvider.80 AddInteraction().81 Given("a pacticipant with name terraform-client exists").82 UponReceiving("a request to update a pacticipant").83 WithRequest("PATCH", S("/pacticipants/terraform-client")).84 WithHeader("Content-Type", S("application/json")).85 WithHeader("Authorization", Like("Bearer 1234")).86 WithJSONBody(Like(pacticipant)).87 WillRespondWith(200).88 WithHeader("Content-Type", S("application/hal+json")).89 WithJSONBody(Like(pacticipant))90 err = mockProvider.ExecuteTest(t, func(config MockServerConfig) error {91 client := clientForPact(config)92 res, e := client.UpdatePacticipant(pacticipant)93 assert.NoError(t, e)94 assert.Equal(t, "terraform-client", res.Name)95 return e96 })97 assert.NoError(t, err)98 })99 t.Run("DeletePacticipant", func(t *testing.T) {100 newPacticipant := broker.Pacticipant{101 Name: "terraform-client",102 RepositoryURL: "https://github.com/pactflow/new-terraform-provider-pact",103 }104 mockProvider.105 AddInteraction().106 Given("a pacticipant with name terraform-client exists").107 UponReceiving("a request to delete a pacticipant").108 WithRequest("DELETE", S("/pacticipants/terraform-client")).109 WithHeader("Authorization", Like("Bearer 1234")).110 WillRespondWith(200)111 err = mockProvider.ExecuteTest(t, func(config MockServerConfig) error {112 client := clientForPact(config)113 return client.DeletePacticipant(newPacticipant)114 })115 assert.NoError(t, err)116 })117 })118 t.Run("Team", func(t *testing.T) {119 create := broker.TeamCreateOrUpdateRequest{120 Name: "terraform-team",121 PacticipantNames: []string{pacticipant.Name},122 }123 created := broker.Team{124 Name: create.Name,125 UUID: "99643109-adb0-4e68-b25f-7b14d6bcae16",126 }127 updated := broker.Team{128 Name: create.Name,129 UUID: created.UUID,130 Embedded: broker.TeamEmbeddedItems{131 Members: []broker.User{132 {133 UUID: "4c260344-b170-41eb-b01e-c0ff10c72f25",134 Active: true,135 },136 },137 Administrators: []broker.User{138 {139 UUID: "4c260344-b170-41eb-b01e-c0ff10c72f25",140 },141 },142 Environments: []broker.Environment{143 {144 UUID: "8000883c-abf0-4b4c-b993-426f607092a9",145 },146 },147 },148 }149 update := broker.TeamCreateOrUpdateRequest{150 UUID: updated.UUID,151 Name: updated.Name,152 PacticipantNames: []string{pacticipant.Name},153 AdministratorUUIDs: []string{"4c260344-b170-41eb-b01e-c0ff10c72f25"},154 EnvironmentUUIDs: []string{"8000883c-abf0-4b4c-b993-426f607092a9"},155 }156 t.Run("ReadTeam", func(t *testing.T) {157 mockProvider.158 AddInteraction().159 Given("a team with uuid 99643109-adb0-4e68-b25f-7b14d6bcae16 exists").160 UponReceiving("a request to get a team").161 WithRequest("GET", S("/admin/teams/99643109-adb0-4e68-b25f-7b14d6bcae16")).162 WithHeader("Authorization", Like("Bearer 1234")).163 WillRespondWith(200).164 WithHeader("Content-Type", S("application/hal+json")).165 WithJSONBody(Like(updated))166 err = mockProvider.ExecuteTest(t, func(config MockServerConfig) error {167 client := clientForPact(config)168 res, e := client.ReadTeam(updated)169 assert.NoError(t, e)170 assert.NotNil(t, res)171 assert.Equal(t, "terraform-team", res.Name)172 assert.Equal(t, "99643109-adb0-4e68-b25f-7b14d6bcae16", res.UUID)173 assert.Len(t, res.Embedded.Members, 1)174 return e175 })176 assert.NoError(t, err)177 })178 t.Run("CreateTeam", func(t *testing.T) {179 mockProvider.180 AddInteraction().181 Given("a pacticipant with name terraform-client exists").182 UponReceiving("a request to create a team").183 WithRequest("POST", S("/admin/teams")).184 WithHeader("Content-Type", S("application/json")).185 WithHeader("Authorization", Like("Bearer 1234")).186 WithJSONBody(Like(create)).187 WillRespondWith(200).188 WithHeader("Content-Type", S("application/hal+json")).189 WithJSONBody(Like(broker.TeamsResponse{190 Teams: []broker.Team{191 created,192 },193 }))194 err = mockProvider.ExecuteTest(t, func(config MockServerConfig) error {195 client := clientForPact(config)196 res, e := client.CreateTeam(create)197 assert.NoError(t, e)198 assert.NotNil(t, res)199 assert.Equal(t, "terraform-team", res.Name)200 assert.Equal(t, updated.UUID, res.UUID)201 return e202 })203 assert.NoError(t, err)204 })205 t.Run("UpdateTeam", func(t *testing.T) {206 mockProvider.207 AddInteraction().208 Given("a team with uuid 99643109-adb0-4e68-b25f-7b14d6bcae16 exists").209 UponReceiving("a request to update a team").210 WithRequest("PUT", S("/admin/teams/99643109-adb0-4e68-b25f-7b14d6bcae16")).211 WithHeader("Content-Type", S("application/json")).212 WithHeader("Authorization", Like("Bearer 1234")).213 WithJSONBody(Like(update)).214 WillRespondWith(200).215 WithHeader("Content-Type", S("application/hal+json")).216 WithJSONBody(Like(updated))217 err = mockProvider.ExecuteTest(t, func(config MockServerConfig) error {218 client := clientForPact(config)219 res, e := client.UpdateTeam(update)220 assert.NoError(t, e)221 assert.Equal(t, "terraform-team", res.Name)222 assert.Len(t, res.Embedded.Administrators, 1)223 assert.Len(t, res.Embedded.Environments, 1)224 assert.Len(t, res.Embedded.Members, 1)225 return e226 })227 assert.NoError(t, err)228 })229 t.Run("DeleteTeam", func(t *testing.T) {230 mockProvider.231 AddInteraction().232 Given("a team with name terraform-team exists").233 UponReceiving("a request to delete a team").234 WithRequest("DELETE", S("/admin/teams/99643109-adb0-4e68-b25f-7b14d6bcae16")).235 WithHeader("Authorization", Like("Bearer 1234")).236 WillRespondWith(200)237 err = mockProvider.ExecuteTest(t, func(config MockServerConfig) error {238 client := clientForPact(config)239 return client.DeleteTeam(updated)240 })241 assert.NoError(t, err)242 })243 t.Run("UpdateTeamAssignments", func(t *testing.T) {244 req := broker.TeamsAssignmentRequest{245 UUID: updated.UUID,246 Users: []string{247 "05064a18-229d-4dfd-b37c-f00ec9673a49",248 },249 }250 mockProvider.251 AddInteraction().252 Given("a team with name terraform-team and user with uuid 05064a18-229d-4dfd-b37c-f00ec9673a49 exists").253 UponReceiving("a request to update team assignments").254 WithRequest("PUT", S("/admin/teams/99643109-adb0-4e68-b25f-7b14d6bcae16/users")).255 WithHeader("Authorization", Like("Bearer 1234")).256 WithJSONBody(Like(req)).257 WillRespondWith(200).258 WithJSONBody(broker.TeamsAssignmentResponse{259 Embedded: broker.EmbeddedUsers{260 Users: []broker.User{261 {262 UUID: "4c260344-b170-41eb-b01e-c0ff10c72f25",263 Active: true,264 },265 },266 },267 })268 err = mockProvider.ExecuteTest(t, func(config MockServerConfig) error {269 client := clientForPact(config)270 res, err := client.UpdateTeamAssignments(req)271 assert.Len(t, res.Embedded.Users, 1)272 return err273 })274 assert.NoError(t, err)275 })276 })277 t.Run("Secret", func(t *testing.T) {278 secret := broker.Secret{279 Name: "terraform-secret",280 Description: "terraform secret",281 Value: "supersecret",282 TeamUUID: "1da4bc0e-8031-473f-880b-3b3951683284",283 }284 created := broker.Secret{285 UUID: "b6af03cd-018c-4f1b-9546-c778d214f305",286 Name: secret.Name,287 Description: secret.Description,288 }289 update := broker.Secret{290 UUID: created.UUID,291 Name: secret.Name,292 Description: "updated description",293 Value: "supersecret",294 }295 t.Run("CreateSecret", func(t *testing.T) {296 mockProvider.297 AddInteraction().298 Given("a team with uuid 1da4bc0e-8031-473f-880b-3b3951683284 exists").299 UponReceiving("a request to create a secret").300 WithRequest("POST", S("/secrets")).301 WithHeader("Content-Type", S("application/json")).302 WithHeader("Authorization", Like("Bearer 1234")).303 WithJSONBody(Like(secret)).304 WillRespondWith(200).305 WithHeader("Content-Type", S("application/hal+json")).306 WithJSONBody(Like(created))307 err = mockProvider.ExecuteTest(t, func(config MockServerConfig) error {308 client := clientForPact(config)309 res, e := client.CreateSecret(secret)310 assert.NoError(t, e)311 assert.Equal(t, "terraform-secret", res.Name)312 return e313 })314 assert.NoError(t, err)315 })316 t.Run("UpdateSecret", func(t *testing.T) {317 mockProvider.318 AddInteraction().319 Given("a secret with uuid b6af03cd-018c-4f1b-9546-c778d214f305 exists").320 UponReceiving("a request to update a secret").321 WithRequest("PUT", S("/secrets/b6af03cd-018c-4f1b-9546-c778d214f305")).322 WithHeader("Content-Type", S("application/json")).323 WithHeader("Authorization", Like("Bearer 1234")).324 WithJSONBody(Like(update)).325 WillRespondWith(200).326 WithHeader("Content-Type", S("application/hal+json")).327 WithJSONBody(Like(update))328 err = mockProvider.ExecuteTest(t, func(config MockServerConfig) error {329 client := clientForPact(config)330 res, e := client.UpdateSecret(update)331 assert.NoError(t, e)332 assert.Equal(t, "terraform-secret", res.Name)333 return e334 })335 assert.NoError(t, err)336 })337 t.Run("DeleteSecret", func(t *testing.T) {338 mockProvider.339 AddInteraction().340 Given("a secret with uuid b6af03cd-018c-4f1b-9546-c778d214f305 exists").341 UponReceiving("a request to delete a secret").342 WithRequest("DELETE", S("/secrets/b6af03cd-018c-4f1b-9546-c778d214f305")).343 WithHeader("Authorization", Like("Bearer 1234")).344 WillRespondWith(200)345 err = mockProvider.ExecuteTest(t, func(config MockServerConfig) error {346 client := clientForPact(config)347 return client.DeleteSecret(created)348 })349 assert.NoError(t, err)350 })351 })352 t.Run("Role", func(t *testing.T) {353 role := broker.Role{354 Name: "terraform-role",355 Permissions: []broker.Permission{356 {357 Name: "role name",358 Scope: "user:manage:*",359 Label: "role label",360 Description: "role description",361 },362 },363 }364 created := broker.Role{365 UUID: "e1407277-2a25-4559-8fed-4214dd12a1e8",366 Name: role.Name,367 Permissions: role.Permissions,368 }369 update := broker.Role{370 UUID: created.UUID,371 Name: role.Name,372 Permissions: created.Permissions,373 }374 t.Run("CreateRole", func(t *testing.T) {375 mockProvider.376 AddInteraction().377 UponReceiving("a request to create a role").378 WithRequest("POST", S("/admin/roles")).379 WithHeader("Content-Type", S("application/json")).380 WithHeader("Authorization", Like("Bearer 1234")).381 WithJSONBody(Like(role)).382 WillRespondWith(200).383 WithHeader("Content-Type", S("application/hal+json")).384 WithJSONBody(Like(created))385 err = mockProvider.ExecuteTest(t, func(config MockServerConfig) error {386 client := clientForPact(config)387 res, e := client.CreateRole(role)388 assert.NoError(t, e)389 assert.Equal(t, "terraform-role", res.Name)390 assert.Len(t, res.Permissions, 1)391 return e392 })393 assert.NoError(t, err)394 })395 t.Run("ReadRole", func(t *testing.T) {396 mockProvider.397 AddInteraction().398 Given("a role with uuid e1407277-2a25-4559-8fed-4214dd12a1e8 exists").399 UponReceiving("a request to get a role").400 WithRequest("GET", S("/admin/roles/e1407277-2a25-4559-8fed-4214dd12a1e8")).401 WithHeader("Authorization", Like("Bearer 1234")).402 WillRespondWith(200).403 WithHeader("Content-Type", S("application/hal+json")).404 WithJSONBody(Like(created))405 err = mockProvider.ExecuteTest(t, func(config MockServerConfig) error {406 client := clientForPact(config)407 res, e := client.ReadRole(created.UUID)408 assert.NoError(t, e)409 assert.Equal(t, "terraform-role", res.Name)410 assert.Len(t, res.Permissions, 1)411 return e412 })413 assert.NoError(t, err)414 })415 t.Run("UpdateRole", func(t *testing.T) {416 mockProvider.417 AddInteraction().418 Given("a role with uuid e1407277-2a25-4559-8fed-4214dd12a1e8 exists").419 UponReceiving("a request to update a role").420 WithRequest("PUT", S("/admin/roles/e1407277-2a25-4559-8fed-4214dd12a1e8")).421 WithHeader("Content-Type", S("application/json")).422 WithHeader("Authorization", Like("Bearer 1234")).423 WithJSONBody(Like(update)).424 WillRespondWith(200).425 WithHeader("Content-Type", S("application/hal+json")).426 WithJSONBody(Like(update))427 err = mockProvider.ExecuteTest(t, func(config MockServerConfig) error {428 client := clientForPact(config)429 res, e := client.UpdateRole(update)430 assert.NoError(t, e)431 assert.Equal(t, "terraform-role", res.Name)432 assert.Len(t, res.Permissions, 1)433 return e434 })435 assert.NoError(t, err)436 })437 t.Run("DeleteRole", func(t *testing.T) {438 mockProvider.439 AddInteraction().440 Given("a role with uuid e1407277-2a25-4559-8fed-4214dd12a1e8 exists").441 UponReceiving("a request to delete a role").442 WithRequest("DELETE", S("/admin/roles/e1407277-2a25-4559-8fed-4214dd12a1e8")).443 WithHeader("Authorization", Like("Bearer 1234")).444 WillRespondWith(200)445 err = mockProvider.ExecuteTest(t, func(config MockServerConfig) error {446 client := clientForPact(config)447 return client.DeleteRole(created)448 })449 assert.NoError(t, err)450 })451 })452 t.Run("User", func(t *testing.T) {453 user := broker.User{454 Name: "terraform user",455 Email: "terraform.user@some.domain",456 Active: true,457 Type: broker.RegularUser,458 }459 created := broker.User{460 UUID: "819f6dbf-dd7a-47ff-b369-e3ed1d2578a0",461 Name: user.Name,462 Email: user.Email,463 Active: user.Active,464 Type: user.Type,465 Embedded: struct {466 Roles []broker.Role `json:"roles,omitempty"`467 Teams []broker.Team `json:"teams,omitempty"`468 }{469 Roles: []broker.Role{470 {471 Name: "terraform-role",472 UUID: "84f66fab-1c42-4351-96bf-88d3a09d7cd2",473 Permissions: []broker.Permission{474 {475 Name: "role name",476 Scope: "user:manage:*",477 Label: "role label",478 Description: "role description",479 },480 },481 },482 },483 },484 }485 update := created486 setUserRoles := broker.SetUserRolesRequest{487 Roles: []string{488 "84f66fab-1c42-4351-96bf-88d3a09d7cd2",489 },490 }491 t.Run("CreateUser", func(t *testing.T) {492 mockProvider.493 AddInteraction().494 UponReceiving("a request to create a user").495 WithRequest("POST", S("/admin/users/invite-user")).496 WithHeader("Content-Type", S("application/json")).497 WithHeader("Authorization", Like("Bearer 1234")).498 WithJSONBody(Like(user)).499 WillRespondWith(200).500 WithHeader("Content-Type", S("application/hal+json")).501 WithJSONBody(Like(created))502 err = mockProvider.ExecuteTest(t, func(config MockServerConfig) error {503 client := clientForPact(config)504 res, e := client.CreateUser(user)505 assert.NoError(t, e)506 assert.Equal(t, "terraform user", res.Name)507 return e508 })509 assert.NoError(t, err)510 })511 t.Run("ReadUser", func(t *testing.T) {512 mockProvider.513 AddInteraction().514 Given("a user with uuid 819f6dbf-dd7a-47ff-b369-e3ed1d2578a0 exists").515 UponReceiving("a request to get a user").516 WithRequest("GET", S("/admin/users/819f6dbf-dd7a-47ff-b369-e3ed1d2578a0")).517 WithHeader("Authorization", Like("Bearer 1234")).518 WillRespondWith(200).519 WithHeader("Content-Type", S("application/hal+json")).520 WithJSONBody(Like(created))521 err = mockProvider.ExecuteTest(t, func(config MockServerConfig) error {522 client := clientForPact(config)523 res, e := client.ReadUser(created.UUID)524 assert.NoError(t, e)525 assert.Equal(t, "terraform user", res.Name)526 assert.Len(t, res.Embedded.Roles, 1)527 return e528 })529 assert.NoError(t, err)530 })531 t.Run("UpdateUser", func(t *testing.T) {532 mockProvider.533 AddInteraction().534 Given("a user with uuid 819f6dbf-dd7a-47ff-b369-e3ed1d2578a0 exists").535 UponReceiving("a request to update a user").536 WithRequest("PUT", S("/admin/users/819f6dbf-dd7a-47ff-b369-e3ed1d2578a0")).537 WithHeader("Content-Type", S("application/json")).538 WithHeader("Authorization", Like("Bearer 1234")).539 WithJSONBody(Like(update)).540 WillRespondWith(200).541 WithHeader("Content-Type", S("application/hal+json")).542 WithJSONBody(Like(update))543 err = mockProvider.ExecuteTest(t, func(config MockServerConfig) error {544 client := clientForPact(config)545 res, e := client.UpdateUser(update)546 assert.NoError(t, e)547 assert.Equal(t, "terraform user", res.Name)548 assert.Len(t, res.Embedded.Roles, 1)549 return e550 })551 assert.NoError(t, err)552 })553 t.Run("DeleteUser", func(t *testing.T) {554 mockProvider.555 AddInteraction().556 Given("a user with uuid 819f6dbf-dd7a-47ff-b369-e3ed1d2578a0 exists").557 UponReceiving("a request to delete a user").558 WithRequest("PUT", S("/admin/users/819f6dbf-dd7a-47ff-b369-e3ed1d2578a0")).559 WithHeader("Content-Type", S("application/json")).560 WithHeader("Authorization", Like("Bearer 1234")).561 WithJSONBody(Like(update)).562 WillRespondWith(200).563 WithHeader("Content-Type", S("application/hal+json")).564 WithJSONBody(Like(update))565 err = mockProvider.ExecuteTest(t, func(config MockServerConfig) error {566 client := clientForPact(config)567 return client.DeleteUser(created)568 })569 assert.NoError(t, err)570 })571 t.Run("SetUserRoles", func(t *testing.T) {572 mockProvider.573 AddInteraction().574 Given("a user with uuid 819f6dbf-dd7a-47ff-b369-e3ed1d2578a0 exists").575 UponReceiving("a request to set user's roles").576 WithRequest("PUT", S("/admin/users/819f6dbf-dd7a-47ff-b369-e3ed1d2578a0/roles")).577 WithHeader("Authorization", Like("Bearer 1234")).578 WithJSONBody(Like(setUserRoles)).579 WillRespondWith(200)580 err = mockProvider.ExecuteTest(t, func(config MockServerConfig) error {581 client := clientForPact(config)582 return client.SetUserRoles(created.UUID, setUserRoles)583 })584 assert.NoError(t, err)585 })586 })587 t.Run("SystemAccount", func(t *testing.T) {588 user := broker.User{589 Name: "system account",590 Active: true,591 Type: broker.SystemAccount,592 }593 created := broker.User{594 UUID: "71a5be7d-bb9c-427b-ba49-ee8f1df0ae58",595 Name: user.Name,596 Active: user.Active,597 Type: user.Type,598 Embedded: struct {599 Roles []broker.Role `json:"roles,omitempty"`600 Teams []broker.Team `json:"teams,omitempty"`601 }{602 Roles: []broker.Role{603 {604 Name: "terraform-role",605 UUID: "84f66fab-1c42-4351-96bf-88d3a09d7cd2",606 Permissions: []broker.Permission{607 {608 Name: "role name",609 Scope: "user:manage:*",610 Label: "role label",611 Description: "role description",612 },613 },614 },615 },616 },617 }618 update := created619 setUserRoles := broker.SetUserRolesRequest{620 Roles: []string{621 "84f66fab-1c42-4351-96bf-88d3a09d7cd2",622 },623 }624 t.Run("CreateUser", func(t *testing.T) {625 mockProvider.626 AddInteraction().627 UponReceiving("a request to create a system account").628 WithRequest("POST", S("/admin/system-accounts")).629 WithHeader("Content-Type", S("application/json")).630 WithHeader("Authorization", Like("Bearer 1234")).631 WithJSONBody(Like(user)).632 WillRespondWith(201).633 WithHeader("Content-Type", S("application/hal+json"))634 err = mockProvider.ExecuteTest(t, func(config MockServerConfig) error {635 client := clientForPact(config)636 res, e := client.CreateSystemAccount(user)637 assert.NoError(t, e)638 assert.Equal(t, "system account", res.Name)639 return e640 })641 assert.NoError(t, err)642 })643 t.Run("ReadUser", func(t *testing.T) {644 mockProvider.645 AddInteraction().646 Given("a system account with uuid 71a5be7d-bb9c-427b-ba49-ee8f1df0ae58 exists").647 UponReceiving("a request to get a system account").648 WithRequest("GET", S("/admin/users/71a5be7d-bb9c-427b-ba49-ee8f1df0ae58")).649 WithHeader("Authorization", Like("Bearer 1234")).650 WillRespondWith(200).651 WithHeader("Content-Type", S("application/hal+json")).652 WithJSONBody(Like(created))653 err = mockProvider.ExecuteTest(t, func(config MockServerConfig) error {654 client := clientForPact(config)655 res, e := client.ReadUser(created.UUID)656 assert.NoError(t, e)657 assert.Equal(t, "system account", res.Name)658 assert.Len(t, res.Embedded.Roles, 1)659 return e660 })661 assert.NoError(t, err)662 })663 t.Run("UpdateUser", func(t *testing.T) {664 mockProvider.665 AddInteraction().666 Given("a system account with uuid 71a5be7d-bb9c-427b-ba49-ee8f1df0ae58 exists").667 UponReceiving("a request to update a system account").668 WithRequest("PUT", S("/admin/users/71a5be7d-bb9c-427b-ba49-ee8f1df0ae58")).669 WithHeader("Content-Type", S("application/json")).670 WithHeader("Authorization", Like("Bearer 1234")).671 WithJSONBody(Like(update)).672 WillRespondWith(200).673 WithHeader("Content-Type", S("application/hal+json")).674 WithJSONBody(Like(update))675 err = mockProvider.ExecuteTest(t, func(config MockServerConfig) error {676 client := clientForPact(config)677 res, e := client.UpdateUser(update)678 assert.NoError(t, e)679 assert.Equal(t, "system account", res.Name)680 assert.Len(t, res.Embedded.Roles, 1)681 return e682 })683 assert.NoError(t, err)684 })685 t.Run("DeleteUser", func(t *testing.T) {686 mockProvider.687 AddInteraction().688 Given("a system account with uuid 71a5be7d-bb9c-427b-ba49-ee8f1df0ae58 exists").689 UponReceiving("a request to delete a system account").690 WithRequest("PUT", S("/admin/users/71a5be7d-bb9c-427b-ba49-ee8f1df0ae58")).691 WithHeader("Content-Type", S("application/json")).692 WithHeader("Authorization", Like("Bearer 1234")).693 WithJSONBody(Like(update)).694 WillRespondWith(200).695 WithHeader("Content-Type", S("application/hal+json")).696 WithJSONBody(Like(update))697 err = mockProvider.ExecuteTest(t, func(config MockServerConfig) error {698 client := clientForPact(config)699 return client.DeleteUser(created)700 })701 assert.NoError(t, err)702 })703 t.Run("SetUserRoles", func(t *testing.T) {704 mockProvider.705 AddInteraction().706 Given("a system account with uuid 71a5be7d-bb9c-427b-ba49-ee8f1df0ae58 exists").707 UponReceiving("a request to set a system account's roles").708 WithRequest("PUT", S("/admin/users/71a5be7d-bb9c-427b-ba49-ee8f1df0ae58/roles")).709 WithHeader("Authorization", Like("Bearer 1234")).710 WithJSONBody(Like(setUserRoles)).711 WillRespondWith(200)712 err = mockProvider.ExecuteTest(t, func(config MockServerConfig) error {713 client := clientForPact(config)714 return client.SetUserRoles(created.UUID, setUserRoles)715 })716 assert.NoError(t, err)717 })718 })719 t.Run("Token", func(t *testing.T) {720 readOnlytoken := broker.APIToken{721 UUID: "e068b1ea-d064-4719-971f-98af49cdf3f7",722 Description: "Read only token (developer)",723 Value: "1234",724 }725 readWriteToken := broker.APIToken{726 UUID: "cb32752b-0f1b-4f6f-817f-4b12b7cc8592",727 Description: "Read/write token (CI)",728 Value: "5678",729 }730 t.Run("ReadTokens", func(t *testing.T) {731 mockProvider.732 AddInteraction().733 Given("a token with uuid e068b1ea-d064-4719-971f-98af49cdf3f7 exists").734 UponReceiving("a request to get the tokens for the current account").735 WithRequest("GET", S("/settings/tokens")).736 WithHeader("Authorization", Like("Bearer 1234")).737 WillRespondWith(200).738 WithHeader("Content-Type", S("application/hal+json")).739 WithJSONBody(map[string]interface{}{740 "_embedded": map[string]interface{}{741 "items": []interface{}{742 map[string]interface{}{743 "uuid": Like(readOnlytoken.UUID),744 "description": readOnlytoken.Description,745 "value": Like(readOnlytoken.Value),746 },747 map[string]interface{}{748 "uuid": Like(readWriteToken.UUID),749 "description": readWriteToken.Description,750 "value": Like(readWriteToken.Value),751 },752 },753 },754 })755 err = mockProvider.ExecuteTest(t, func(config MockServerConfig) error {756 client := clientForPact(config)757 res, e := client.ReadToken("e068b1ea-d064-4719-971f-98af49cdf3f7")758 assert.Equal(t, "1234", res.Value)759 return e760 })761 assert.NoError(t, err)762 })763 t.Run("RegenerateToken", func(t *testing.T) {764 mockProvider.765 AddInteraction().766 Given("a token with uuid e068b1ea-d064-4719-971f-98af49cdf3f7 exists").767 UponReceiving("a request to regenerate a token").768 WithRequest("POST", S("/settings/tokens/e068b1ea-d064-4719-971f-98af49cdf3f7/regenerate")).769 WithHeader("Authorization", Like("Bearer 1234")).770 WillRespondWith(200).771 WithHeader("Content-Type", S("application/hal+json")).772 WithJSONBody(Like(broker.APITokenResponse{773 APIToken: readOnlytoken,774 }))775 err = mockProvider.ExecuteTest(t, func(config MockServerConfig) error {776 client := clientForPact(config)777 res, e := client.RegenerateToken(readOnlytoken)778 assert.Equal(t, "1234", res.Value)779 return e780 })781 assert.NoError(t, err)782 })783 })784 t.Run("Webhook", func(t *testing.T) {785 webhook := broker.Webhook{786 Description: "terraform webhook",787 TeamUUID: "607fba87-8209-4aff-a7d2-d8e9f92b94a2",788 Enabled: true,789 Events: []broker.WebhookEvent{790 {Name: "contract_content_changed"},791 {Name: "contract_published"},792 {Name: "provider_verification_failed"},793 {Name: "provider_verification_published"},794 {Name: "provider_verification_succeeded"},795 },796 Provider: &broker.Pacticipant{797 Name: "terraform-provider",798 },799 Consumer: &broker.Pacticipant{800 Name: "terraform-consumer",801 },802 Request: broker.Request{803 Method: "POST",804 URL: "https://postman-echo.com/post",805 Username: "user",806 Password: "password",807 Headers: broker.Headers{808 "content-type": "application/json",809 },810 Body: map[string]string{811 "pact": "$${pactbroker.pactUrl}",812 },813 },814 }815 c, _ := copystructure.Copy(&webhook)816 created := c.(*broker.Webhook)817 created.ID = "2e4bf0e6-b0cf-451f-b05b-69048955f019"818 t.Run("CreateWebhook", func(t *testing.T) {819 mockProvider.820 AddInteraction().821 Given("a team with uuid 607fba87-8209-4aff-a7d2-d8e9f92b94a2 exists").822 UponReceiving("a request to create a webhook").823 WithRequest("POST", S("/webhooks")).824 WithHeader("Content-Type", S("application/json")).825 WithHeader("Authorization", Like("Bearer 1234")).826 WithJSONBody(Like(webhook)).827 WillRespondWith(200).828 WithHeader("Content-Type", S("application/hal+json")).829 WithJSONBody(Like(broker.WebhookResponse{830 Webhook: webhook,831 HalDoc: broker.HalDoc{832 Links: broker.HalLinks{833 "self": broker.Link{834 Href: "http://some-broker/webhooks/2e4bf0e6-b0cf-451f-b05b-69048955f019",835 },836 },837 },838 }))839 err = mockProvider.ExecuteTest(t, func(config MockServerConfig) error {840 client := clientForPact(config)841 res, e := client.CreateWebhook(webhook)842 assert.NoError(t, e)843 assert.Equal(t, "terraform webhook", res.Description)844 items := strings.Split(res.Links["self"].Href, "/")845 assert.Equal(t, "2e4bf0e6-b0cf-451f-b05b-69048955f019", items[len(items)-1])846 return e847 })848 assert.NoError(t, err)849 })850 t.Run("ReadWebhook", func(t *testing.T) {851 mockProvider.852 AddInteraction().853 Given("a webhook with ID 2e4bf0e6-b0cf-451f-b05b-69048955f019 exists").854 UponReceiving("a request to get a webhook").855 WithRequest("GET", S("/webhooks/2e4bf0e6-b0cf-451f-b05b-69048955f019")).856 WithHeader("Authorization", Like("Bearer 1234")).857 WillRespondWith(200).858 WithHeader("Content-Type", S("application/hal+json")).859 WithJSONBody(Like(created))860 err = mockProvider.ExecuteTest(t, func(config MockServerConfig) error {861 client := clientForPact(config)862 res, e := client.ReadWebhook("2e4bf0e6-b0cf-451f-b05b-69048955f019")863 assert.NoError(t, e)864 assert.Equal(t, "terraform webhook", res.Description)865 return e866 })867 assert.NoError(t, err)868 })869 t.Run("UpdateWebhook", func(t *testing.T) {870 mockProvider.871 AddInteraction().872 Given("a webhook with ID 2e4bf0e6-b0cf-451f-b05b-69048955f019 exists").873 UponReceiving("a request to update a webhook").874 WithRequest("PUT", S("/webhooks/2e4bf0e6-b0cf-451f-b05b-69048955f019")).875 WithHeader("Content-Type", S("application/json")).876 WithHeader("Authorization", Like("Bearer 1234")).877 WithJSONBody(Like(created)).878 WillRespondWith(200).879 WithHeader("Content-Type", S("application/hal+json")).880 WithJSONBody(Like(created))881 err = mockProvider.ExecuteTest(t, func(config MockServerConfig) error {882 client := clientForPact(config)883 res, e := client.UpdateWebhook(*created)884 assert.NoError(t, e)885 assert.Equal(t, "terraform webhook", res.Description)886 return e887 })888 assert.NoError(t, err)889 })890 t.Run("DeleteWebhook", func(t *testing.T) {891 mockProvider.892 AddInteraction().893 Given("a webhook with ID 2e4bf0e6-b0cf-451f-b05b-69048955f019 exists").894 UponReceiving("a request to delete a webhook").895 WithRequest("DELETE", S("/webhooks/2e4bf0e6-b0cf-451f-b05b-69048955f019")).896 WithHeader("Authorization", Like("Bearer 1234")).897 WillRespondWith(200)898 err = mockProvider.ExecuteTest(t, func(config MockServerConfig) error {899 client := clientForPact(config)900 return client.DeleteWebhook(*created)901 })902 assert.NoError(t, err)903 })904 })905 t.Run("AuthenticationSettings", func(t *testing.T) {906 authSettings := broker.AuthenticationSettings{907 Providers: broker.AuthenticationProviders{908 Google: broker.GoogleAuthenticationSettings{909 EmailDomains: []string{"pactflow.io"},910 },911 Github: broker.GithubAuthenticationSettings{912 Organizations: []string{"pactflow"},913 },914 },915 }916 t.Run("SetTenantAuthenticationSettings", func(t *testing.T) {917 mockProvider.918 AddInteraction().919 UponReceiving("a request to update authentication settings").920 WithRequest("PUT", S("/admin/tenant/authentication-settings")).921 WithHeader("Content-Type", S("application/json")).922 WithHeader("Authorization", Like("Bearer 1234")).923 WithJSONBody(Like(authSettings)).924 WillRespondWith(200).925 WithHeader("Content-Type", S("application/hal+json")).926 WithJSONBody(Like(authSettings))927 err = mockProvider.ExecuteTest(t, func(config MockServerConfig) error {928 client := clientForPact(config)929 res, e := client.SetTenantAuthenticationSettings(authSettings)930 assert.NoError(t, e)931 assert.Contains(t, res.Providers.Google.EmailDomains, "pactflow.io")932 assert.Contains(t, res.Providers.Github.Organizations, "pactflow")933 return e934 })935 assert.NoError(t, err)936 })937 t.Run("ReadTenantAuthenticationSettings", func(t *testing.T) {938 mockProvider.939 AddInteraction().940 UponReceiving("a request to get authentication settings").941 WithRequest("GET", S("/admin/tenant/authentication-settings")).942 WillRespondWith(200).943 WithHeader("Content-Type", S("application/hal+json")).944 WithJSONBody(Like(authSettings))945 err = mockProvider.ExecuteTest(t, func(config MockServerConfig) error {946 client := clientForPact(config)947 res, e := client.ReadTenantAuthenticationSettings()948 assert.NoError(t, e)949 assert.Contains(t, res.Providers.Google.EmailDomains, "pactflow.io")950 assert.Contains(t, res.Providers.Github.Organizations, "pactflow")951 return e952 })953 assert.NoError(t, err)954 })955 })956 t.Run("Environment", func(t *testing.T) {957 environment := broker.Environment{958 Name: "TerraformEnvironment",959 Production: true,960 DisplayName: "terraform environment",961 }962 create := broker.EnvironmentCreateOrUpdateRequest{963 DisplayName: environment.DisplayName,964 Name: environment.Name,965 Production: environment.Production,966 Teams: []string{967 "99643109-adb0-4e68-b25f-7b14d6bcae16",968 },969 }970 created := broker.EnvironmentCreateOrUpdateResponse{971 UUID: "8000883c-abf0-4b4c-b993-426f607092a9",972 Name: environment.Name,973 DisplayName: environment.DisplayName,974 Embedded: broker.EnvironmentEmbeddedItems{975 Teams: []broker.Team{976 {977 Name: "terraform-team",978 UUID: "99643109-adb0-4e68-b25f-7b14d6bcae16",979 },980 },981 },982 }983 update := broker.EnvironmentCreateOrUpdateRequest{984 UUID: created.UUID,985 DisplayName: environment.DisplayName,986 Name: "terraform-updated-environment",987 Production: environment.Production,988 Teams: []string{989 "99643109-adb0-4e68-b25f-7b14d6bcae16",990 },991 }992 updated := broker.EnvironmentCreateOrUpdateResponse{993 UUID: created.UUID,994 Name: update.Name,995 DisplayName: environment.DisplayName,996 Embedded: broker.EnvironmentEmbeddedItems{997 Teams: []broker.Team{998 {999 Name: "terraform-team",1000 UUID: "99643109-adb0-4e68-b25f-7b14d6bcae16",1001 },1002 },1003 },1004 }1005 t.Run("CreateEnvironment", func(t *testing.T) {1006 mockProvider.1007 AddInteraction().1008 Given("a team with uuid 99643109-adb0-4e68-b25f-7b14d6bcae16 exists").1009 UponReceiving("a request to create an environment").1010 WithRequest("POST", S("/environments")).1011 WithHeader("Content-Type", S("application/json")).1012 WithHeader("Authorization", Like("Bearer 1234")).1013 WithJSONBody(Like(create)).1014 WillRespondWith(200).1015 WithHeader("Content-Type", S("application/hal+json")).1016 WithJSONBody(Like(created))1017 err = mockProvider.ExecuteTest(t, func(config MockServerConfig) error {1018 client := clientForPact(config)1019 res, e := client.CreateEnvironment(create)1020 assert.NoError(t, e)1021 assert.Equal(t, "TerraformEnvironment", res.Name)1022 assert.Len(t, res.Embedded.Teams, 1)1023 return e1024 })1025 assert.NoError(t, err)1026 })1027 t.Run("ReadEnvironment", func(t *testing.T) {1028 mockProvider.1029 AddInteraction().1030 Given("an environment with uuid 8000883c-abf0-4b4c-b993-426f607092a9 exists").1031 UponReceiving("a request to get an environment").1032 WithRequest("GET", S("/environments/8000883c-abf0-4b4c-b993-426f607092a9")).1033 WithHeader("Authorization", Like("Bearer 1234")).1034 WillRespondWith(200).1035 WithHeader("Content-Type", S("application/hal+json")).1036 WithJSONBody(Like(created))1037 err = mockProvider.ExecuteTest(t, func(config MockServerConfig) error {1038 client := clientForPact(config)1039 res, e := client.ReadEnvironment(created.UUID)1040 assert.NoError(t, e)1041 assert.Equal(t, "TerraformEnvironment", res.Name)1042 assert.Len(t, res.Embedded.Teams, 1)1043 return e1044 })1045 assert.NoError(t, err)1046 })1047 t.Run("UpdateEnvironment", func(t *testing.T) {1048 mockProvider.1049 AddInteraction().1050 Given("an environment with uuid 8000883c-abf0-4b4c-b993-426f607092a9 exists").1051 UponReceiving("a request to update an environment").1052 WithRequest("PUT", S("/environments/8000883c-abf0-4b4c-b993-426f607092a9")).1053 WithHeader("Content-Type", S("application/json")).1054 WithHeader("Authorization", Like("Bearer 1234")).1055 WithJSONBody(Like(update)).1056 WillRespondWith(200).1057 WithHeader("Content-Type", S("application/hal+json")).1058 WithJSONBody(Like(updated))1059 err = mockProvider.ExecuteTest(t, func(config MockServerConfig) error {1060 client := clientForPact(config)1061 res, e := client.UpdateEnvironment(update)1062 assert.NoError(t, e)1063 assert.Equal(t, "terraform-updated-environment", res.Name)1064 assert.Len(t, res.Embedded.Teams, 1)1065 return e1066 })1067 assert.NoError(t, err)1068 })1069 t.Run("DeleteEnvironment", func(t *testing.T) {1070 mockProvider.1071 AddInteraction().1072 Given("an environment with uuid 8000883c-abf0-4b4c-b993-426f607092a9 exists").1073 UponReceiving("a request to delete an environment").1074 WithRequest("DELETE", S("/environments/8000883c-abf0-4b4c-b993-426f607092a9")).1075 WithHeader("Authorization", Like("Bearer 1234")).1076 WillRespondWith(200)1077 err = mockProvider.ExecuteTest(t, func(config MockServerConfig) error {1078 client := clientForPact(config)1079 return client.DeleteEnvironment(broker.Environment{1080 UUID: created.UUID,1081 })1082 })1083 assert.NoError(t, err)1084 })1085 })1086}1087func clientForPact(config MockServerConfig) *Client {1088 baseURL, err := url.Parse(fmt.Sprintf("http://%s:%d", config.Host, config.Port))1089 if err != nil {1090 panic(fmt.Sprintf("unable to create client for pact test: %s", err))1091 }...
header_match_test.go
Source:header_match_test.go
1package checkheadersplugin_test2import (3 "context"4 "net/http"5 "net/http/httptest"6 "testing"7 checkheaders "github.com/dkijkuit/checkheadersplugin"8)9var required = true10var not_required = false11var contains = true12var urlDecode = true13var testcert = `14Subject%3D%22C%3DNL%2CST%3DST-TEST%2CL%3DCity%2CO%3DOrganization%2CCN%3Dcommon-name%22%3BIssuer%3D%22DC%3Dnl%2CDC%3Ddomainpart1%2CDC%3Ddomainpart2%2CCN%3DSomeKindOfCa%22%3BNB%3D%221589744159%22%3BNA%3D%221765837153%22%3BSAN%3D%22somkindofdomain.domain.thing.test%2215`16func TestHeadersMatch(t *testing.T) {17 requestHeaders := map[string]string{18 "test1": "testvalue1",19 "test2": "testvalue2",20 "test3": "testvalue3",21 "test4": "value4",22 "X-Forwarded-Tls-Client-Cert-Info": testcert,23 "testMultipleContainsValues": "value5_or_value1_or_value_2_or_value_3",24 }25 executeTest(t, requestHeaders, http.StatusOK)26}27func TestHeadersOneMatch(t *testing.T) {28 requestHeaders := map[string]string{29 "test1": "testvalue1",30 "test2": "testvalue2",31 "test3": "testvalue3",32 "test4": "value4",33 "X-Forwarded-Tls-Client-Cert-Info": testcert,34 "testMultipleContainsValues": "test_or_value2",35 }36 executeTest(t, requestHeaders, http.StatusOK)37}38func TestHeadersNotMatch(t *testing.T) {39 requestHeaders := map[string]string{40 "test1": "wrongvalue1",41 "test2": "wrongvalue2",42 "test3": "wrongvalue3",43 "test4": "correctvalue4",44 "X-Forwarded-Tls-Client-Cert-Info": "wrongvalue",45 "testMultipleContainsValues": "wrongvalues",46 }47 executeTest(t, requestHeaders, http.StatusForbidden)48}49func TestHeadersNotRequired(t *testing.T) {50 requestHeaders := map[string]string{51 "test1": "testvalue1",52 "test2": "testvalue2",53 "test4": "ue4",54 "X-Forwarded-Tls-Client-Cert-Info": testcert,55 "testMultipleContainsValues": "value5_or_value1_or_value_2_or_value_3",56 }57 executeTest(t, requestHeaders, http.StatusOK)58}59func executeTest(t *testing.T, requestHeaders map[string]string, expectedResultCode int) {60 cfg := checkheaders.CreateConfig()61 cfg.Headers = []checkheaders.SingleHeader{62 {63 Name: "test1",64 MatchType: string(checkheaders.MatchOne),65 Values: []string{"testvalue1"},66 },67 {68 Name: "test2",69 MatchType: string(checkheaders.MatchOne),70 Values: []string{"testvalue2"},71 Required: &required,72 },73 {74 Name: "test3",75 MatchType: string(checkheaders.MatchOne),76 Values: []string{"testvalue3"},77 Required: ¬_required,78 },79 {80 Name: "test4",81 MatchType: string(checkheaders.MatchOne),82 Values: []string{"ue4"},83 Required: &required,84 Contains: &contains,85 },86 {87 Name: "X-Forwarded-Tls-Client-Cert-Info",88 Values: []string{89 "CN=common-name",90 "SAN=\"somkindofdomain.domain.thing.test\"",91 },92 MatchType: string(checkheaders.MatchAll),93 Required: &required,94 Contains: &contains,95 URLDecode: &urlDecode,96 },97 {98 Name: "testMultipleContainsValues",99 Values: []string{100 "value1",101 "or_value2",102 },103 MatchType: string(checkheaders.MatchOne),104 Required: &required,105 Contains: &contains,106 URLDecode: &urlDecode,107 },108 {109 Name: "testContainsNotRequired",110 Values: []string{111 "value_not_important",112 "value_not_important_2",113 },114 MatchType: string(checkheaders.MatchOne),115 Required: ¬_required,116 Contains: &contains,117 URLDecode: &urlDecode,118 },119 }120 ctx := context.Background()121 next := http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {})122 handler, err := checkheaders.New(ctx, next, cfg, "check-headers-plugin")123 if err != nil {124 t.Fatal(err)125 }126 req, err := http.NewRequestWithContext(ctx, http.MethodGet, "http://localhost", nil)127 if err != nil {128 t.Fatal(err)129 }130 recorder := httptest.NewRecorder()131 for headerName, headerValue := range requestHeaders {132 req.Header.Add(headerName, headerValue)133 }134 handler.ServeHTTP(recorder, req)135 if recorder.Result().StatusCode != expectedResultCode {136 t.Errorf("Unexpected response status code: %d, expected: %d for incoming request headers: %s", recorder.Result().StatusCode, expectedResultCode, requestHeaders)137 }138}...
ExecuteTest
Using AI Code Generation
1import (2func main() {3 client := test.NewClient()4 client.ExecuteTest()5}6import (7func main() {8 client := test.NewClient()9 client.ExecuteTest()10}11import (12func main() {13 client := test.NewClient()14 client.ExecuteTest()15}16import (17func main() {18 client := test.NewClient()19 client.ExecuteTest()20}21import (22func main() {23 client := test.NewClient()24 client.ExecuteTest()25}26import (27func main() {28 client := test.NewClient()29 client.ExecuteTest()30}31import (32func main() {33 client := test.NewClient()34 client.ExecuteTest()35}36import (37func main() {38 client := test.NewClient()39 client.ExecuteTest()40}41import (42func main() {43 client := test.NewClient()44 client.ExecuteTest()45}46import (47func main() {48 client := test.NewClient()49 client.ExecuteTest()50}51import (52func main() {53 client := test.NewClient()54 client.ExecuteTest()55}
ExecuteTest
Using AI Code Generation
1import (2func main() {3 client := test.Client{}4 client.ExecuteTest()5 fmt.Println("Hello, playground")6}7import "fmt"8type Client struct {9}10func (c Client) ExecuteTest() {11 fmt.Println("Test")12}13import (14func main() {15 client := test.Client{}16 client.ExecuteTest()17 fmt.Println("Hello, playground")18}19import "fmt"20type Client struct {21}22func (c Client) ExecuteTest() {23 fmt.Println("Test")24}
ExecuteTest
Using AI Code Generation
1import "client"2func main(){3 client.ExecuteTest()4}5import "fmt"6func ExecuteTest(){7 fmt.Println("execute test")8}9import "testing"10func TestExecuteTest(t *testing.T){11 ExecuteTest()12}13import "testing"14func TestExecuteTest(t *testing.T){15 ExecuteTest()16}
ExecuteTest
Using AI Code Generation
1import (2func main() {3 c.ExecuteTest()4 fmt.Println("Hello, playground")5}6import (7func main() {8 c.ExecuteTest()9 fmt.Println("Hello, playground")10}11import (12func main() {13 c.ExecuteTest()14 fmt.Println("Hello, playground")15}16import (17func main() {18 c.ExecuteTest()19 fmt.Println("Hello, playground")20}21import (22func main() {23 c.ExecuteTest()24 fmt.Println("Hello, playground")25}26import (27func main() {28 c.ExecuteTest()29 fmt.Println("Hello, playground")30}31import (32func main() {33 c.ExecuteTest()34 fmt.Println("Hello, playground")35}36import (37func main() {38 c.ExecuteTest()39 fmt.Println("Hello, playground")40}41import (42func main() {43 c.ExecuteTest()44 fmt.Println("Hello, playground")45}46import (
ExecuteTest
Using AI Code Generation
1import (2func main() {3 client := client.Client{}4 client.ExecuteTest()5 fmt.Println("Test Executed")6}7import (8type Client struct {9}10func (c *Client) ExecuteTest() {11 fmt.Println("Executing test")12 if err != nil {13 fmt.Println("Error while executing test")14 } else {15 fmt.Println("Response:", res)16 }17}18Response: &{200 OK 200 HTTP/1.1 1 1 map[Cache-Control:[private, max-age=0] Content-Type:[text/html; charset=ISO-8859-1] Date:[Wed, 29 Jul 2015 07:53:34 GMT] Expires:[-1] P3p:[CP="This i
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!