Best Quick code snippet using D
store.go
Source:store.go
...66 // handle of the other ShareableStore.67 With(other basestore.ShareableStore) Store68 // QueuedCount returns the number of queued records matching the given conditions.69 QueuedCount(ctx context.Context, includeProcessing bool, conditions []*sqlf.Query) (int, error)70 // MaxDurationInQueue returns the maximum age of queued records in this store. Returns 0 if there are no queued records.71 MaxDurationInQueue(ctx context.Context) (time.Duration, error)72 // Dequeue selects the first queued record matching the given conditions and updates the state to processing. If there73 // is such a record, it is returned. If there is no such unclaimed record, a nil record and and a nil cancel function74 // will be returned along with a false-valued flag. This method must not be called from within a transaction.75 //76 // The supplied conditions may use the alias provided in `ViewName`, if one was supplied.77 Dequeue(ctx context.Context, workerHostname string, conditions []*sqlf.Query) (workerutil.Record, bool, error)78 // Heartbeat marks the given record as currently being processed.79 Heartbeat(ctx context.Context, ids []int, options HeartbeatOptions) (knownIDs []int, err error)80 // Requeue updates the state of the record with the given identifier to queued and adds a processing delay before81 // the next dequeue of this record can be performed.82 Requeue(ctx context.Context, id int, after time.Time) error83 // AddExecutionLogEntry adds an executor log entry to the record and returns the ID of the new entry (which can be84 // used with UpdateExecutionLogEntry) and a possible error. When the record is not found (due to options not matching85 // or the record being deleted), ErrExecutionLogEntryNotUpdated is returned.86 AddExecutionLogEntry(ctx context.Context, id int, entry workerutil.ExecutionLogEntry, options ExecutionLogEntryOptions) (entryID int, err error)87 // UpdateExecutionLogEntry updates the executor log entry with the given ID on the given record. When the record is not88 // found (due to options not matching or the record being deleted), ErrExecutionLogEntryNotUpdated is returned.89 UpdateExecutionLogEntry(ctx context.Context, recordID, entryID int, entry workerutil.ExecutionLogEntry, options ExecutionLogEntryOptions) error90 // MarkComplete attempts to update the state of the record to complete. If this record has already been moved from91 // the processing state to a terminal state, this method will have no effect. This method returns a boolean flag92 // indicating if the record was updated.93 MarkComplete(ctx context.Context, id int, options MarkFinalOptions) (bool, error)94 // MarkErrored attempts to update the state of the record to errored. This method will only have an effect95 // if the current state of the record is processing or completed. A requeued record or a record already marked96 // with an error will not be updated. This method returns a boolean flag indicating if the record was updated.97 MarkErrored(ctx context.Context, id int, failureMessage string, options MarkFinalOptions) (bool, error)98 // MarkFailed attempts to update the state of the record to failed. This method will only have an effect99 // if the current state of the record is processing or completed. A requeued record or a record already marked100 // with an error will not be updated. This method returns a boolean flag indicating if the record was updated.101 MarkFailed(ctx context.Context, id int, failureMessage string, options MarkFinalOptions) (bool, error)102 // ResetStalled moves all processing records that have not received a heartbeat within `StalledMaxAge` back to the103 // queued state. In order to prevent input that continually crashes worker instances, records that have been reset104 // more than `MaxNumResets` times will be marked as failed. This method returns a pair of maps from record105 // identifiers the age of the record's last heartbeat timestamp for each record reset to queued and failed states,106 // respectively.107 ResetStalled(ctx context.Context) (resetLastHeartbeatsByIDs, failedLastHeartbeatsByIDs map[int]time.Duration, err error)108}109type ExecutionLogEntry workerutil.ExecutionLogEntry110func (e *ExecutionLogEntry) Scan(value any) error {111 b, ok := value.([]byte)112 if !ok {113 return errors.Errorf("value is not []byte: %T", value)114 }115 return json.Unmarshal(b, &e)116}117func (e ExecutionLogEntry) Value() (driver.Value, error) {118 return json.Marshal(e)119}120func ExecutionLogEntries(raw []workerutil.ExecutionLogEntry) (entries []ExecutionLogEntry) {121 for _, entry := range raw {122 entries = append(entries, ExecutionLogEntry(entry))123 }124 return entries125}126type store struct {127 *basestore.Store128 options Options129 columnReplacer *strings.Replacer130 modifiedColumnExpressionMatches [][]MatchingColumnExpressions131 operations *operations132}133var _ Store = &store{}134// Options configure the behavior of Store over a particular set of tables, columns, and expressions.135type Options struct {136 // Name denotes the name of the store used to distinguish log messages and emitted metrics. The137 // store constructor will fail if this field is not supplied.138 Name string139 // TableName is the name of the table containing work records.140 //141 // The target table (and the target view referenced by `ViewName`) must have the following columns142 // and types:143 //144 // - id: integer primary key145 // - state: text (may be updated to `queued`, `processing`, `errored`, or `failed`)146 // - failure_message: text147 // - queued_at: timestamp with time zone148 // - started_at: timestamp with time zone149 // - last_heartbeat_at: timestamp with time zone150 // - finished_at: timestamp with time zone151 // - process_after: timestamp with time zone152 // - num_resets: integer not null153 // - num_failures: integer not null154 // - execution_logs: json[] (each entry has the form of `ExecutionLogEntry`)155 // - worker_hostname: text156 //157 // The names of these columns may be customized based on the table name by adding a replacement158 // pair in the AlternateColumnNames mapping.159 //160 // It's recommended to put an index or (or partial index) on the state column for more efficient161 // dequeue operations.162 TableName string163 // AlternateColumnNames is a map from expected column names to actual column names in the target164 // table. This allows existing tables to be more easily retrofitted into the expected record165 // shape.166 AlternateColumnNames map[string]string167 // ViewName is an optional name of a view on top of the table containing work records to query when168 // selecting a candidate. If this value is not supplied, `TableName` will be used. The value supplied169 // may also indicate a table alias, which can be referenced in `OrderByExpression`, `ColumnExpressions`,170 // and the conditions supplied to `Dequeue`.171 //172 // The target of this column must be a view on top of the configured table with the same column173 // requirements as the base table described above.174 //175 // Example use case:176 // The processor for LSIF uploads supplies `lsif_uploads_with_repository_name`, a view on top of the177 // `lsif_uploads` table that joins work records with the `repo` table and adds an additional repository178 // name column. This allows `Dequeue` to return a record with additional data so that a second query179 // is not necessary by the caller.180 ViewName string181 // Scan is the function used to convert a rows object into a record of the expected shape.182 Scan RecordScanFn183 // OrderByExpression is the SQL expression used to order candidate records when selecting the next184 // batch of work to perform. This expression may use the alias provided in `ViewName`, if one was185 // supplied.186 OrderByExpression *sqlf.Query187 // ColumnExpressions are the target columns provided to the query when selecting a job record. These188 // expressions may use the alias provided in `ViewName`, if one was supplied.189 ColumnExpressions []*sqlf.Query190 // StalledMaxAge is the maximum allowed duration between heartbeat updates of a job's last_heartbeat_at191 // field. An unmodified row that is marked as processing likely indicates that the worker that dequeued192 // the record has died.193 StalledMaxAge time.Duration194 // MaxNumResets is the maximum number of times a record can be implicitly reset back to the queued195 // state (via `ResetStalled`). If a record's reset attempts counter reaches this threshold, it will196 // be moved into the errored state rather than queued on its next reset to prevent an infinite retry197 // cycle of the same input.198 MaxNumResets int199 // ResetFailureMessage overrides the default failure message written to job records that have been200 // reset the maximum number of times.201 ResetFailureMessage string202 // RetryAfter determines whether the store dequeues jobs that have errored more than RetryAfter ago.203 // Setting this value to zero will disable retries entirely.204 //205 // If RetryAfter is a non-zero duration, the store dequeues records where:206 //207 // - the state is 'errored'208 // - the failed attempts counter hasn't reached MaxNumRetries209 // - the finished_at timestamp was more than RetryAfter ago210 RetryAfter time.Duration211 // MaxNumRetries is the maximum number of times a record can be retried after an explicit failure.212 // Setting this value to zero will disable retries entirely.213 MaxNumRetries int214 // clock is used to mock out the wall clock used for heartbeat updates.215 clock glock.Clock216}217// RecordScanFn is a function that interprets row values as a particular record. This function should218// return a false-valued flag if the given result set was empty. This function must close the rows219// value if the given error value is nil.220//221// See the `CloseRows` function in the store/base package for suggested implementation details.222type RecordScanFn func(rows *sql.Rows, err error) (workerutil.Record, bool, error)223// New creates a new store with the given database handle and options.224func New(handle basestore.TransactableHandle, options Options) Store {225 return NewWithMetrics(handle, options, &observation.TestContext)226}227func NewWithMetrics(handle basestore.TransactableHandle, options Options, observationContext *observation.Context) Store {228 return newStore(handle, options, observationContext)229}230func newStore(handle basestore.TransactableHandle, options Options, observationContext *observation.Context) *store {231 if options.Name == "" {232 panic("no name supplied to github.com/sourcegraph/sourcegraph/internal/dbworker/store:newStore")233 }234 if options.ViewName == "" {235 options.ViewName = options.TableName236 }237 if options.clock == nil {238 options.clock = glock.NewRealClock()239 }240 alternateColumnNames := map[string]string{}241 for _, column := range columnNames {242 alternateColumnNames[column] = column243 }244 for k, v := range options.AlternateColumnNames {245 alternateColumnNames[k] = v246 }247 var replacements []string248 for k, v := range alternateColumnNames {249 replacements = append(replacements, fmt.Sprintf("{%s}", k), v)250 }251 modifiedColumnExpressionMatches := matchModifiedColumnExpressions(options.ViewName, options.ColumnExpressions, alternateColumnNames)252 for i, expression := range options.ColumnExpressions {253 for _, match := range modifiedColumnExpressionMatches[i] {254 if match.exact {255 continue256 }257 log15.Error(``+258 `dbworker store: column expression refers to a column modified by dequeue in a complex expression. `+259 `The given expression will currently evaluate to the OLD value of the row, and the associated handler `+260 `will not have a completely up-to-date record. Please refer to this column without a transform.`,261 "index", i,262 "expression", expression.Query(sqlf.PostgresBindVar),263 "columnName", match.columnName,264 "storeName", options.Name,265 )266 }267 }268 return &store{269 Store: basestore.NewWithHandle(handle),270 options: options,271 columnReplacer: strings.NewReplacer(replacements...),272 modifiedColumnExpressionMatches: modifiedColumnExpressionMatches,273 operations: newOperations(options.Name, observationContext),274 }275}276// With creates a new Store with the given basestore.Shareable store as the277// underlying basestore.Store.278func (s *store) With(other basestore.ShareableStore) Store {279 return &store{280 Store: s.Store.With(other),281 options: s.options,282 columnReplacer: s.columnReplacer,283 modifiedColumnExpressionMatches: s.modifiedColumnExpressionMatches,284 operations: s.operations,285 }286}287// columnNames contain the names of the columns expected to be defined by the target table.288// Note: adding a new column to this list requires updating the worker documentation289// https://github.com/sourcegraph/sourcegraph/blob/main/doc/dev/background-information/workers.md#database-backed-stores290var columnNames = []string{291 "id",292 "state",293 "failure_message",294 "queued_at",295 "started_at",296 "last_heartbeat_at",297 "finished_at",298 "process_after",299 "num_resets",300 "num_failures",301 "execution_logs",302 "worker_hostname",303}304// QueuedCount returns the number of queued records matching the given conditions.305func (s *store) QueuedCount(ctx context.Context, includeProcessing bool, conditions []*sqlf.Query) (_ int, err error) {306 ctx, _, endObservation := s.operations.queuedCount.With(ctx, &err, observation.Args{})307 defer endObservation(1, observation.Args{})308 stateQueries := make([]*sqlf.Query, 0, 2)309 stateQueries = append(stateQueries, sqlf.Sprintf("%s", "queued"))310 if includeProcessing {311 stateQueries = append(stateQueries, sqlf.Sprintf("%s", "processing"))312 }313 count, _, err := basestore.ScanFirstInt(s.Query(ctx, s.formatQuery(314 queuedCountQuery,315 quote(s.options.ViewName),316 sqlf.Join(stateQueries, ","),317 s.options.MaxNumRetries,318 makeConditionSuffix(conditions),319 )))320 return count, err321}322const queuedCountQuery = `323-- source: internal/workerutil/store.go:QueuedCount324SELECT COUNT(*) FROM %s WHERE (325 {state} IN (%s) OR326 ({state} = 'errored' AND {num_failures} < %s)327) %s328`329// MaxDurationInQueue returns the longest duration for which a job associated with this store instance has330// been in the queued state (including errored records that can be retried in the future). This method returns331// an duration of zero if there are no jobs ready for processing.332//333// If records backed by this store do not have an initial state of 'queued', or if it is possible to requeue334// records outside of this package, manual care should be taken to set the queued_at column to the proper time.335// This method makes no guarantees otherwise.336//337// See https://github.com/sourcegraph/sourcegraph/issues/32624.338func (s *store) MaxDurationInQueue(ctx context.Context) (_ time.Duration, err error) {339 ctx, _, endObservation := s.operations.maxDurationInQueue.With(ctx, &err, observation.Args{})340 defer endObservation(1, observation.Args{})341 now := s.now()342 retryAfter := int(s.options.RetryAfter / time.Second)343 ageInSeconds, ok, err := basestore.ScanFirstInt(s.Query(ctx, s.formatQuery(344 maxDurationInQueueQuery,345 // candidates346 quote(s.options.ViewName),347 // oldest_queued348 now,349 // oldest_retryable350 retryAfter,351 retryAfter,352 now,353 retryAfter,354 s.options.MaxNumRetries,355 )))356 if err != nil {357 return 0, err358 }359 if !ok {360 return 0, nil361 }362 return time.Duration(ageInSeconds) * time.Second, nil363}364const maxDurationInQueueQuery = `365-- source: internal/workerutil/store.go:MaxDurationInQueue366WITH367candidates AS (368 SELECT * FROM %s369),370oldest_queued AS (371 SELECT372 -- Select when the record was most recently dequeueable373 GREATEST({queued_at}, {process_after}) AS last_queued_at374 FROM candidates375 WHERE376 {state} = 'queued' AND377 ({process_after} IS NULL OR {process_after} <= %s)378),379oldest_retryable AS (380 SELECT381 -- Select when the record was most recently dequeueable382 {finished_at} + (%s * '1 second'::interval) AS last_queued_at383 FROM candidates384 WHERE385 %s > 0 AND386 {state} = 'errored' AND387 %s - {finished_at} > (%s * '1 second'::interval) AND388 {num_failures} < %s389),390oldest_record AS (391 (392 SELECT last_queued_at FROM oldest_queued393 UNION394 SELECT last_queued_at FROM oldest_retryable395 )396 ORDER BY last_queued_at397 LIMIT 1398)399SELECT EXTRACT(EPOCH FROM NOW() - last_queued_at)::integer AS age FROM oldest_record400`401// columnsUpdatedByDequeue are the unmapped column names modified by the dequeue method.402var columnsUpdatedByDequeue = []string{403 "state",404 "started_at",405 "last_heartbeat_at",406 "finished_at",407 "failure_message",408 "execution_logs",409 "worker_hostname",410}411// Dequeue selects the first queued record matching the given conditions and updates the state to processing. If there412// is such a record, it is returned. If there is no such unclaimed record, a nil record and and a nil cancel function413// will be returned along with a false-valued flag. This method must not be called from within a transaction.414//415// A background goroutine that continuously updates the record's last modified time will be started. The returned cancel416// function should be called once the record no longer needs to be locked from selection or reset by another process.417// Most often, this will be when the handler moves the record into a terminal state.418//419// The supplied conditions may use the alias provided in `ViewName`, if one was supplied.420func (s *store) Dequeue(ctx context.Context, workerHostname string, conditions []*sqlf.Query) (_ workerutil.Record, _ bool, err error) {421 ctx, trace, endObservation := s.operations.dequeue.With(ctx, &err, observation.Args{})422 defer endObservation(1, observation.Args{})423 if s.InTransaction() {424 return nil, false, ErrDequeueTransaction425 }426 now := s.now()427 retryAfter := int(s.options.RetryAfter / time.Second)428 var (429 processingExpr = sqlf.Sprintf("%s", "processing")430 nowTimestampExpr = sqlf.Sprintf("%s::timestamp", now)431 nullExpr = sqlf.Sprintf("NULL")432 workerHostnameExpr = sqlf.Sprintf("%s", workerHostname)433 )434 // NOTE: Changes to this mapping should be reflected in the package variable435 // columnsUpdatedByDequeue, also defined in this file.436 updatedColumns := map[string]*sqlf.Query{437 s.columnReplacer.Replace("{state}"): processingExpr,438 s.columnReplacer.Replace("{started_at}"): nowTimestampExpr,439 s.columnReplacer.Replace("{last_heartbeat_at}"): nowTimestampExpr,440 s.columnReplacer.Replace("{finished_at}"): nullExpr,441 s.columnReplacer.Replace("{failure_message}"): nullExpr,442 s.columnReplacer.Replace("{execution_logs}"): nullExpr,443 s.columnReplacer.Replace("{worker_hostname}"): workerHostnameExpr,444 }445 record, exists, err := s.options.Scan(s.Query(ctx, s.formatQuery(446 dequeueQuery,447 s.options.OrderByExpression,448 quote(s.options.ViewName),449 now,450 retryAfter,451 now,452 retryAfter,453 s.options.MaxNumRetries,454 makeConditionSuffix(conditions),455 s.options.OrderByExpression,456 quote(s.options.TableName),457 quote(s.options.TableName),458 quote(s.options.TableName),459 sqlf.Join(s.makeDequeueUpdateStatements(updatedColumns), ", "),460 sqlf.Join(s.makeDequeueSelectExpressions(updatedColumns), ", "),461 quote(s.options.ViewName),462 )))463 if err != nil {464 return nil, false, err465 }466 if !exists {467 return nil, false, nil468 }469 trace.Log(log.Int("recordID", record.RecordID()))470 return record, true, nil471}472const dequeueQuery = `473-- source: internal/workerutil/store.go:Dequeue474WITH potential_candidates AS (475 SELECT476 {id} AS candidate_id,477 ROW_NUMBER() OVER (ORDER BY %s) AS order478 FROM %s479 WHERE480 (481 (482 {state} = 'queued' AND483 ({process_after} IS NULL OR {process_after} <= %s)484 ) OR (485 %s > 0 AND486 {state} = 'errored' AND487 %s - {finished_at} > (%s * '1 second'::interval) AND488 {num_failures} < %s489 )490 )491 %s492 ORDER BY %s493),494candidate AS (495 SELECT496 {id} FROM %s497 JOIN potential_candidates pc ON pc.candidate_id = {id}498 WHERE499 -- Recheck state.500 {state} IN ('queued', 'errored')501 ORDER BY pc.order502 FOR UPDATE OF %s SKIP LOCKED503 LIMIT 1504),505updated_record AS (506 UPDATE507 %s508 SET509 %s510 WHERE511 {id} IN (SELECT {id} FROM candidate)512)513SELECT514 %s515FROM516 %s517WHERE518 {id} IN (SELECT {id} FROM candidate)519`520// makeDequeueSelectExpressions constructs the ordered set of SQL expressions that are returned521// from the dequeue query. This method returns a copy of the configured column expressions slice522// where expressions referencing one of the column updated by dequeue are replaced by the updated523// value.524//525// Note that this method only considers select expressions like `alias.ColumnName` and not something526// more complex like `SomeFunction(alias.ColumnName) + 1`. We issue a warning on construction of a527// new store configured in this way to indicate this (probably) unwanted behavior.528func (s *store) makeDequeueSelectExpressions(updatedColumns map[string]*sqlf.Query) []*sqlf.Query {529 selectExpressions := make([]*sqlf.Query, len(s.options.ColumnExpressions))530 copy(selectExpressions, s.options.ColumnExpressions)531 for i := range selectExpressions {532 for _, match := range s.modifiedColumnExpressionMatches[i] {533 if match.exact {534 selectExpressions[i] = updatedColumns[match.columnName]535 break536 }537 }538 }539 return selectExpressions540}541// makeDequeueUpdateStatements constructs the set of SQL statements that update values of the target table542// in the dequeue query.543func (s *store) makeDequeueUpdateStatements(updatedColumns map[string]*sqlf.Query) []*sqlf.Query {544 updateStatements := make([]*sqlf.Query, 0, len(updatedColumns))545 for columnName, updateValue := range updatedColumns {546 updateStatements = append(updateStatements, sqlf.Sprintf(columnName+"=%s", updateValue))547 }548 return updateStatements549}550func (s *store) Heartbeat(ctx context.Context, ids []int, options HeartbeatOptions) (knownIDs []int, err error) {551 ctx, _, endObservation := s.operations.heartbeat.With(ctx, &err, observation.Args{})552 defer endObservation(1, observation.Args{})553 if len(ids) == 0 {554 return []int{}, nil555 }556 sqlIDs := make([]*sqlf.Query, 0, len(ids))557 for _, id := range ids {558 sqlIDs = append(sqlIDs, sqlf.Sprintf("%s", id))559 }560 quotedTableName := quote(s.options.TableName)561 conds := []*sqlf.Query{562 s.formatQuery("{id} IN (%s)", sqlf.Join(sqlIDs, ",")),563 s.formatQuery("{state} = 'processing'"),564 }565 conds = append(conds, options.ToSQLConds(s.formatQuery)...)566 knownIDs, err = basestore.ScanInts(s.Query(ctx, s.formatQuery(updateCandidateQuery, quotedTableName, sqlf.Join(conds, "AND"), quotedTableName, s.now())))567 if err != nil {568 return nil, err569 }570 if len(knownIDs) != len(ids) {571 outer:572 for _, recordID := range ids {573 for _, test := range knownIDs {574 if test == recordID {575 continue outer576 }577 }578 debug, debugErr := s.fetchDebugInformationForJob(ctx, recordID)579 if debugErr != nil {580 log15.Error("failed to fetch debug information for job",581 "recordID", recordID,582 "err", debugErr,583 )584 }585 log15.Error("heartbeat lost a job",586 "recordID", recordID,587 "debug", debug,588 "options.workerHostname", options.WorkerHostname,589 )590 }591 }592 return knownIDs, nil593}594const updateCandidateQuery = `595-- source: internal/workerutil/store.go:Heartbeat596WITH alive_candidates AS (597 SELECT598 {id}599 FROM600 %s601 WHERE602 %s603 ORDER BY604 {id} ASC605 FOR UPDATE606)607UPDATE608 %s609SET610 {last_heartbeat_at} = %s611WHERE612 {id} IN (SELECT {id} FROM alive_candidates)613RETURNING {id}614`615// Requeue updates the state of the record with the given identifier to queued and adds a processing delay before616// the next dequeue of this record can be performed.617func (s *store) Requeue(ctx context.Context, id int, after time.Time) (err error) {618 ctx, _, endObservation := s.operations.requeue.With(ctx, &err, observation.Args{LogFields: []log.Field{619 log.Int("id", id),620 log.String("after", after.String()),621 }})622 defer endObservation(1, observation.Args{})623 return s.Exec(ctx, s.formatQuery(624 requeueQuery,625 quote(s.options.TableName),626 after,627 id,628 ))629}630const requeueQuery = `631-- source: internal/workerutil/store.go:Requeue632UPDATE %s633SET634 {state} = 'queued',635 {queued_at} = clock_timestamp(),636 {started_at} = null,637 {process_after} = %s638WHERE {id} = %s639`640// AddExecutionLogEntry adds an executor log entry to the record and returns the ID of the new entry (which can be641// used with UpdateExecutionLogEntry) and a possible error. When the record is not found (due to options not matching642// or the record being deleted), ErrExecutionLogEntryNotUpdated is returned.643func (s *store) AddExecutionLogEntry(ctx context.Context, id int, entry workerutil.ExecutionLogEntry, options ExecutionLogEntryOptions) (entryID int, err error) {644 ctx, _, endObservation := s.operations.addExecutionLogEntry.With(ctx, &err, observation.Args{LogFields: []log.Field{645 log.Int("id", id),646 }})647 defer endObservation(1, observation.Args{})648 conds := []*sqlf.Query{649 s.formatQuery("{id} = %s", id),650 }651 conds = append(conds, options.ToSQLConds(s.formatQuery)...)652 entryID, ok, err := basestore.ScanFirstInt(s.Query(ctx, s.formatQuery(653 addExecutionLogEntryQuery,654 quote(s.options.TableName),655 ExecutionLogEntry(entry),656 sqlf.Join(conds, "AND"),657 )))658 if err != nil {659 return entryID, err660 }661 if !ok {662 debug, debugErr := s.fetchDebugInformationForJob(ctx, id)663 if debugErr != nil {664 log15.Error("failed to fetch debug information for job",665 "recordID", id,666 "err", debugErr,667 )668 }669 log15.Error("updateExecutionLogEntry failed and didn't match rows",670 "recordID", id,671 "debug", debug,672 "options.workerHostname", options.WorkerHostname,673 "options.state", options.State,674 )675 return entryID, ErrExecutionLogEntryNotUpdated676 }677 return entryID, nil678}679const addExecutionLogEntryQuery = `680-- source: internal/workerutil/store.go:AddExecutionLogEntry681UPDATE682 %s683SET {execution_logs} = {execution_logs} || %s::json684WHERE685 %s686RETURNING array_length({execution_logs}, 1)687`688// UpdateExecutionLogEntry updates the executor log entry with the given ID on the given record. When the record is not689// found (due to options not matching or the record being deleted), ErrExecutionLogEntryNotUpdated is returned.690func (s *store) UpdateExecutionLogEntry(ctx context.Context, recordID, entryID int, entry workerutil.ExecutionLogEntry, options ExecutionLogEntryOptions) (err error) {691 ctx, _, endObservation := s.operations.updateExecutionLogEntry.With(ctx, &err, observation.Args{LogFields: []log.Field{692 log.Int("recordID", recordID),693 log.Int("entryID", entryID),694 }})695 defer endObservation(1, observation.Args{})696 conds := []*sqlf.Query{697 s.formatQuery("{id} = %s", recordID),698 s.formatQuery("array_length({execution_logs}, 1) >= %s", entryID),699 }700 conds = append(conds, options.ToSQLConds(s.formatQuery)...)701 _, ok, err := basestore.ScanFirstInt(s.Query(ctx, s.formatQuery(702 updateExecutionLogEntryQuery,703 quote(s.options.TableName),704 entryID,705 ExecutionLogEntry(entry),706 sqlf.Join(conds, "AND"),707 )))708 if err != nil {709 return err710 }711 if !ok {712 debug, debugErr := s.fetchDebugInformationForJob(ctx, recordID)713 if debugErr != nil {714 log15.Error("failed to fetch debug information for job",715 "recordID", recordID,716 "err", debugErr,717 )718 }719 log15.Error("updateExecutionLogEntry failed and didn't match rows",720 "recordID", recordID,721 "debug", debug,722 "options.workerHostname", options.WorkerHostname,723 "options.state", options.State,724 )725 return ErrExecutionLogEntryNotUpdated726 }727 return nil728}729const updateExecutionLogEntryQuery = `730-- source: internal/workerutil/store.go:UpdateExecutionLogEntry731UPDATE732 %s733SET {execution_logs}[%s] = %s::json734WHERE735 %s736RETURNING737 array_length({execution_logs}, 1)738`739// MarkComplete attempts to update the state of the record to complete. If this record has already been moved from740// the processing state to a terminal state, this method will have no effect. This method returns a boolean flag741// indicating if the record was updated.742func (s *store) MarkComplete(ctx context.Context, id int, options MarkFinalOptions) (_ bool, err error) {743 ctx, _, endObservation := s.operations.markComplete.With(ctx, &err, observation.Args{LogFields: []log.Field{744 log.Int("id", id),745 }})746 defer endObservation(1, observation.Args{})747 conds := []*sqlf.Query{748 s.formatQuery("{id} = %s", id),749 s.formatQuery("{state} = 'processing'"),750 }751 conds = append(conds, options.ToSQLConds(s.formatQuery)...)752 _, ok, err := basestore.ScanFirstInt(s.Query(ctx, s.formatQuery(markCompleteQuery, quote(s.options.TableName), sqlf.Join(conds, "AND"))))753 return ok, err754}755const markCompleteQuery = `756-- source: internal/workerutil/store.go:MarkComplete757UPDATE %s758SET {state} = 'completed', {finished_at} = clock_timestamp()759WHERE %s760RETURNING {id}761`762// MarkErrored attempts to update the state of the record to errored. This method will only have an effect763// if the current state of the record is processing. A requeued record or a record already marked with an764// error will not be updated. This method returns a boolean flag indicating if the record was updated.765func (s *store) MarkErrored(ctx context.Context, id int, failureMessage string, options MarkFinalOptions) (_ bool, err error) {766 ctx, _, endObservation := s.operations.markErrored.With(ctx, &err, observation.Args{LogFields: []log.Field{767 log.Int("id", id),768 }})769 defer endObservation(1, observation.Args{})770 conds := []*sqlf.Query{771 s.formatQuery("{id} = %s", id),772 s.formatQuery("{state} = 'processing'"),773 }774 conds = append(conds, options.ToSQLConds(s.formatQuery)...)775 q := s.formatQuery(markErroredQuery, quote(s.options.TableName), s.options.MaxNumRetries, failureMessage, sqlf.Join(conds, "AND"))776 _, ok, err := basestore.ScanFirstInt(s.Query(ctx, q))777 return ok, err778}779const markErroredQuery = `780-- source: internal/workerutil/store.go:MarkErrored781UPDATE %s782SET {state} = CASE WHEN {num_failures} + 1 >= %d THEN 'failed' ELSE 'errored' END,783 {finished_at} = clock_timestamp(),784 {failure_message} = %s,785 {num_failures} = {num_failures} + 1786WHERE %s787RETURNING {id}788`789// MarkFailed attempts to update the state of the record to failed. This method will only have an effect790// if the current state of the record is processing. A requeued record or a record already marked with an791// error will not be updated. This method returns a boolean flag indicating if the record was updated.792func (s *store) MarkFailed(ctx context.Context, id int, failureMessage string, options MarkFinalOptions) (_ bool, err error) {793 ctx, _, endObservation := s.operations.markFailed.With(ctx, &err, observation.Args{LogFields: []log.Field{794 log.Int("id", id),795 }})796 defer endObservation(1, observation.Args{})797 conds := []*sqlf.Query{798 s.formatQuery("{id} = %s", id),799 s.formatQuery("{state} = 'processing'"),800 }801 conds = append(conds, options.ToSQLConds(s.formatQuery)...)802 q := s.formatQuery(markFailedQuery, quote(s.options.TableName), failureMessage, sqlf.Join(conds, "AND"))803 _, ok, err := basestore.ScanFirstInt(s.Query(ctx, q))804 return ok, err805}806const markFailedQuery = `807-- source: internal/workerutil/store.go:MarkFailed808UPDATE %s809SET {state} = 'failed',810 {finished_at} = clock_timestamp(),811 {failure_message} = %s,812 {num_failures} = {num_failures} + 1813WHERE %s814RETURNING {id}815`816const defaultResetFailureMessage = "job processor died while handling this message too many times"817// ResetStalled moves all processing records that have not received a heartbeat within `StalledMaxAge` back to the818// queued state. In order to prevent input that continually crashes worker instances, records that have been reset819// more than `MaxNumResets` times will be marked as failed. This method returns a pair of maps from record820// identifiers the age of the record's last heartbeat timestamp for each record reset to queued and failed states,821// respectively.822func (s *store) ResetStalled(ctx context.Context) (resetLastHeartbeatsByIDs, failedLastHeartbeatsByIDs map[int]time.Duration, err error) {823 ctx, trace, endObservation := s.operations.resetStalled.With(ctx, &err, observation.Args{})824 defer endObservation(1, observation.Args{})825 now := s.now()826 scan := scanLastHeartbeatTimestampsFrom(now)827 resetLastHeartbeatsByIDs, err = scan(s.Query(828 ctx,829 s.formatQuery(830 resetStalledQuery,831 quote(s.options.TableName),832 now,833 int(s.options.StalledMaxAge/time.Second),834 s.options.MaxNumResets,835 quote(s.options.TableName),836 ),837 ))838 if err != nil {839 return resetLastHeartbeatsByIDs, failedLastHeartbeatsByIDs, err840 }841 trace.Log(log.Int("numResetIDs", len(resetLastHeartbeatsByIDs)))842 resetFailureMessage := s.options.ResetFailureMessage843 if resetFailureMessage == "" {844 resetFailureMessage = defaultResetFailureMessage845 }846 failedLastHeartbeatsByIDs, err = scan(s.Query(847 ctx,848 s.formatQuery(849 resetStalledMaxResetsQuery,850 quote(s.options.TableName),851 now,852 int(s.options.StalledMaxAge/time.Second),853 s.options.MaxNumResets,854 quote(s.options.TableName),855 resetFailureMessage,856 ),857 ))858 if err != nil {859 return resetLastHeartbeatsByIDs, failedLastHeartbeatsByIDs, err860 }861 trace.Log(log.Int("numErroredIDs", len(failedLastHeartbeatsByIDs)))862 return resetLastHeartbeatsByIDs, failedLastHeartbeatsByIDs, nil863}864func scanLastHeartbeatTimestampsFrom(now time.Time) func(rows *sql.Rows, queryErr error) (_ map[int]time.Duration, err error) {865 return func(rows *sql.Rows, queryErr error) (_ map[int]time.Duration, err error) {866 if queryErr != nil {867 return nil, queryErr868 }869 defer func() { err = basestore.CloseRows(rows, err) }()870 m := map[int]time.Duration{}871 for rows.Next() {872 var id int873 var lastHeartbeat time.Time874 if err := rows.Scan(&id, &lastHeartbeat); err != nil {875 return nil, err876 }877 m[id] = now.Sub(lastHeartbeat)878 }879 return m, nil880 }881}882const resetStalledQuery = `883-- source: internal/workerutil/store.go:ResetStalled884WITH stalled AS (885 SELECT {id} FROM %s886 WHERE887 {state} = 'processing' AND888 %s - {last_heartbeat_at} > (%s * '1 second'::interval) AND889 {num_resets} < %s890 FOR UPDATE SKIP LOCKED891)892UPDATE %s893SET894 {state} = 'queued',895 {queued_at} = clock_timestamp(),896 {started_at} = null,897 {num_resets} = {num_resets} + 1898WHERE {id} IN (SELECT {id} FROM stalled)899RETURNING {id}, {last_heartbeat_at}900`901const resetStalledMaxResetsQuery = `902-- source: internal/workerutil/store.go:ResetStalled903WITH stalled AS (904 SELECT {id} FROM %s905 WHERE906 {state} = 'processing' AND907 %s - {last_heartbeat_at} > (%s * '1 second'::interval) AND908 {num_resets} >= %s909 FOR UPDATE SKIP LOCKED910)911UPDATE %s912SET913 {state} = 'failed',914 {finished_at} = clock_timestamp(),915 {failure_message} = %s916WHERE {id} IN (SELECT {id} FROM stalled)917RETURNING {id}, {last_heartbeat_at}918`919func (s *store) formatQuery(query string, args ...any) *sqlf.Query {920 return sqlf.Sprintf(s.columnReplacer.Replace(query), args...)921}922func (s *store) now() time.Time {923 return s.options.clock.Now().UTC()924}925const fetchDebugInformationForJob = `926-- source: internal/workerutil/store.go:UpdateExecutionLogEntry927SELECT928 row_to_json(%s)929FROM930 %s931WHERE932 {id} = %s933`934func (s *store) fetchDebugInformationForJob(ctx context.Context, recordID int) (debug string, err error) {935 debug, ok, err := basestore.ScanFirstNullString(s.Query(ctx, s.formatQuery(936 fetchDebugInformationForJob,937 quote(extractTableName(s.options.TableName)),938 quote(s.options.TableName),939 recordID,940 )))941 if err != nil {942 return "", err943 }944 if !ok {945 return "", errors.Newf("fetching debug information for record %d didn't return rows")946 }947 return debug, nil948}949// quote wraps the given string in a *sqlf.Query so that it is not passed to the database950// as a parameter. It is necessary to quote things such as table names, columns, and other951// expressions that are not simple values.952func quote(s string) *sqlf.Query {953 return sqlf.Sprintf(s)954}955// makeConditionSuffix returns a *sqlf.Query containing "AND {c1 AND c2 AND ...}" when the956// given set of conditions is non-empty, and an empty string otherwise.957func makeConditionSuffix(conditions []*sqlf.Query) *sqlf.Query {958 if len(conditions) == 0 {959 return sqlf.Sprintf("")960 }961 var quotedConditions []*sqlf.Query962 for _, condition := range conditions {963 // Ensure everything is quoted in case the condition has an OR964 quotedConditions = append(quotedConditions, sqlf.Sprintf("(%s)", condition))965 }966 return sqlf.Sprintf("AND %s", sqlf.Join(quotedConditions, " AND "))967}968type MatchingColumnExpressions struct {969 columnName string970 exact bool971}972// matchModifiedColumnExpressions returns a slice of columns to which each of the973// given column expressions refers. Column references that do not refere to a member974// of the columnsUpdatedByDequeue slice are ignored. Each match indicates the column975// name and whether or not the expression is an exact reference or a reference within976// a more complex expression (arithmetic, function call argument, etc).977//978// The output slice has the same number of elements as the input column expressions979// and the results are ordered in parallel with the given column expressions.980func matchModifiedColumnExpressions(viewName string, columnExpressions []*sqlf.Query, alternateColumnNames map[string]string) [][]MatchingColumnExpressions {981 matches := make([][]MatchingColumnExpressions, len(columnExpressions))982 columnPrefixes := makeColumnPrefixes(viewName)983 for i, columnExpression := range columnExpressions {984 columnExpressionText := columnExpression.Query(sqlf.PostgresBindVar)985 for _, columnName := range columnsUpdatedByDequeue {986 match := false987 exact := false988 if name, ok := alternateColumnNames[columnName]; ok {989 columnName = name990 }991 for _, columnPrefix := range columnPrefixes {992 if regexp.MustCompile(fmt.Sprintf(`^%s%s$`, columnPrefix, columnName)).MatchString(columnExpressionText) {993 match = true994 exact = true995 break996 }997 if !match && regexp.MustCompile(fmt.Sprintf(`\b%s%s\b`, columnPrefix, columnName)).MatchString(columnExpressionText) {998 match = true999 }...
store_test.go
Source:store_test.go
...97 if count != 2 {98 t.Errorf("unexpected count. want=%d have=%d", 2, count)99 }100}101func TestStoreMaxDurationInQueue(t *testing.T) {102 db := setupStoreTest(t)103 if _, err := db.ExecContext(context.Background(), `104 INSERT INTO workerutil_test (id, state, created_at)105 VALUES106 (1, 'queued', NOW() - '20 minutes'::interval), -- young107 (2, 'queued', NOW() - '30 minutes'::interval), -- oldest queued108 (3, 'state2', NOW() - '40 minutes'::interval), -- wrong state109 (4, 'queued', NOW() - '10 minutes'::interval), -- young110 (5, 'state3', NOW() - '50 minutes'::interval) -- wrong state111 `); err != nil {112 t.Fatalf("unexpected error inserting records: %s", err)113 }114 age, err := testStore(db, defaultTestStoreOptions(nil)).MaxDurationInQueue(context.Background())115 if err != nil {116 t.Fatalf("unexpected error getting max duration in queue: %s", err)117 }118 if age.Round(time.Second) != 30*time.Minute {119 t.Fatalf("unexpected max age. want=%s have=%s", 30*time.Minute, age)120 }121}122func TestStoreMaxDurationInQueueProcessAfter(t *testing.T) {123 db := setupStoreTest(t)124 if _, err := db.ExecContext(context.Background(), `125 INSERT INTO workerutil_test (id, state, created_at, process_after)126 VALUES127 (1, 'queued', NOW() - '90 minutes'::interval, NOW() + '10 minutes'::interval), -- oldest queued, waiting for process_after128 (2, 'queued', NOW() - '70 minutes'::interval, NOW() - '30 minutes'::interval), -- oldest queued129 (3, 'state2', NOW() - '40 minutes'::interval, NULL), -- wrong state130 (4, 'queued', NOW() - '10 minutes'::interval, NULL), -- young131 (5, 'state3', NOW() - '50 minutes'::interval, NULL) -- wrong state132 `); err != nil {133 t.Fatalf("unexpected error inserting records: %s", err)134 }135 age, err := testStore(db, defaultTestStoreOptions(nil)).MaxDurationInQueue(context.Background())136 if err != nil {137 t.Fatalf("unexpected error getting max duration in queue: %s", err)138 }139 if age.Round(time.Second) != 30*time.Minute {140 t.Fatalf("unexpected max age. want=%s have=%s", 30*time.Minute, age)141 }142}143func TestStoreMaxDurationInQueueFailed(t *testing.T) {144 db := setupStoreTest(t)145 if _, err := db.ExecContext(context.Background(), `146 INSERT INTO workerutil_test (id, state, created_at, finished_at, num_failures)147 VALUES148 (1, 'queued', NOW() - '10 minutes'::interval, NULL, 0), -- young149 (2, 'errored', NOW(), NOW() - '30 minutes'::interval, 2), -- oldest retryable error'd150 (3, 'errored', NOW(), NOW() - '10 minutes'::interval, 2), -- retryable, but too young to be queued151 (4, 'state2', NOW() - '40 minutes'::interval, NULL, 0), -- wrong state152 (5, 'errored', NOW(), NOW() - '50 minutes'::interval, 3), -- non-retryable (max attempts exceeded)153 (6, 'queued', NOW() - '20 minutes'::interval, NULL, 0), -- oldest queued154 (7, 'failed', NOW(), NOW() - '60 minutes'::interval, 1) -- wrong state155 `); err != nil {156 t.Fatalf("unexpected error inserting records: %s", err)157 }158 options := defaultTestStoreOptions(nil)159 options.RetryAfter = 5 * time.Minute160 age, err := testStore(db, options).MaxDurationInQueue(context.Background())161 if err != nil {162 t.Fatalf("unexpected error getting max duration in queue: %s", err)163 }164 if age.Round(time.Second) != 25*time.Minute {165 t.Fatalf("unexpected max age. want=%s have=%s", 25*time.Minute, age)166 }167}168func TestStoreMaxDurationInQueueEmpty(t *testing.T) {169 db := setupStoreTest(t)170 age, err := testStore(db, defaultTestStoreOptions(nil)).MaxDurationInQueue(context.Background())171 if err != nil {172 t.Fatalf("unexpected error getting max duration in queue: %s", err)173 }174 if age.Round(time.Second) != 0*time.Minute {175 t.Fatalf("unexpected max age. want=%s have=%s", 0*time.Minute, age)176 }177}178func TestStoreDequeueState(t *testing.T) {179 db := setupStoreTest(t)180 if _, err := db.ExecContext(context.Background(), `181 INSERT INTO workerutil_test (id, state, created_at)182 VALUES183 (1, 'queued', NOW() - '1 minute'::interval),184 (2, 'queued', NOW() - '2 minute'::interval),185 (3, 'state2', NOW() - '3 minute'::interval),186 (4, 'queued', NOW() - '4 minute'::interval),187 (5, 'state2', NOW() - '5 minute'::interval)188 `); err != nil {189 t.Fatalf("unexpected error inserting records: %s", err)190 }191 record, ok, err := testStore(db, defaultTestStoreOptions(nil)).Dequeue(context.Background(), "test", nil)192 assertDequeueRecordResult(t, 4, record, ok, err)193}194func TestStoreDequeueOrder(t *testing.T) {195 db := setupStoreTest(t)196 if _, err := db.ExecContext(context.Background(), `197 INSERT INTO workerutil_test (id, state, created_at)198 VALUES199 (1, 'queued', NOW() - '2 minute'::interval),200 (2, 'queued', NOW() - '5 minute'::interval),201 (3, 'queued', NOW() - '3 minute'::interval),202 (4, 'queued', NOW() - '1 minute'::interval),203 (5, 'queued', NOW() - '4 minute'::interval)204 `); err != nil {205 t.Fatalf("unexpected error inserting records: %s", err)206 }207 record, ok, err := testStore(db, defaultTestStoreOptions(nil)).Dequeue(context.Background(), "test", nil)208 assertDequeueRecordResult(t, 2, record, ok, err)209}210func TestStoreDequeueConditions(t *testing.T) {211 db := setupStoreTest(t)212 if _, err := db.ExecContext(context.Background(), `213 INSERT INTO workerutil_test (id, state, created_at)214 VALUES215 (1, 'queued', NOW() - '1 minute'::interval),216 (2, 'queued', NOW() - '2 minute'::interval),217 (3, 'queued', NOW() - '3 minute'::interval),218 (4, 'queued', NOW() - '4 minute'::interval),219 (5, 'queued', NOW() - '5 minute'::interval)220 `); err != nil {221 t.Fatalf("unexpected error inserting records: %s", err)222 }223 conditions := []*sqlf.Query{sqlf.Sprintf("workerutil_test.id < 4")}224 record, ok, err := testStore(db, defaultTestStoreOptions(nil)).Dequeue(context.Background(), "test", conditions)225 assertDequeueRecordResult(t, 3, record, ok, err)226}227func TestStoreDequeueResetExecutionLogs(t *testing.T) {228 db := setupStoreTest(t)229 if _, err := db.ExecContext(context.Background(), `230 INSERT INTO workerutil_test (id, state, execution_logs, created_at)231 VALUES232 (1, 'queued', E'{"{\\"key\\": \\"test\\"}"}', NOW() - '1 minute'::interval)233 `); err != nil {234 t.Fatalf("unexpected error inserting records: %s", err)235 }236 record, ok, err := testStore(db, defaultTestStoreOptions(nil)).Dequeue(context.Background(), "test", nil)237 assertDequeueRecordResult(t, 1, record, ok, err)238 assertDequeueRecordResultLogCount(t, 0, record)239}240func TestStoreDequeueDelay(t *testing.T) {241 db := setupStoreTest(t)242 if _, err := db.ExecContext(context.Background(), `243 INSERT INTO workerutil_test (id, state, created_at, process_after)244 VALUES245 (1, 'queued', NOW() - '1 minute'::interval, NULL),246 (2, 'queued', NOW() - '2 minute'::interval, NULL),247 (3, 'queued', NOW() - '3 minute'::interval, NOW() + '2 minute'::interval),248 (4, 'queued', NOW() - '4 minute'::interval, NULL),249 (5, 'queued', NOW() - '5 minute'::interval, NOW() + '1 minute'::interval)250 `); err != nil {251 t.Fatalf("unexpected error inserting records: %s", err)252 }253 record, ok, err := testStore(db, defaultTestStoreOptions(nil)).Dequeue(context.Background(), "test", nil)254 assertDequeueRecordResult(t, 4, record, ok, err)255}256func TestStoreDequeueView(t *testing.T) {257 db := setupStoreTest(t)258 if _, err := db.ExecContext(context.Background(), `259 INSERT INTO workerutil_test (id, state, created_at)260 VALUES261 (1, 'queued', NOW() - '1 minute'::interval),262 (2, 'queued', NOW() - '2 minute'::interval),263 (3, 'queued', NOW() - '3 minute'::interval),264 (4, 'queued', NOW() - '4 minute'::interval),265 (5, 'queued', NOW() - '5 minute'::interval)266 `); err != nil {267 t.Fatalf("unexpected error inserting records: %s", err)268 }269 options := defaultTestStoreOptions(nil)270 options.ViewName = "workerutil_test_view v"271 options.Scan = testScanFirstRecordView272 options.OrderByExpression = sqlf.Sprintf("v.created_at")273 options.ColumnExpressions = []*sqlf.Query{274 sqlf.Sprintf("v.id"),275 sqlf.Sprintf("v.state"),276 sqlf.Sprintf("v.new_field"),277 }278 conditions := []*sqlf.Query{sqlf.Sprintf("v.new_field < 15")}279 record, ok, err := testStore(db, options).Dequeue(context.Background(), "test", conditions)280 assertDequeueRecordViewResult(t, 2, 14, record, ok, err)281}282func TestStoreDequeueConcurrent(t *testing.T) {283 db := setupStoreTest(t)284 if _, err := db.ExecContext(context.Background(), `285 INSERT INTO workerutil_test (id, state, created_at)286 VALUES287 (1, 'queued', NOW() - '2 minute'::interval),288 (2, 'queued', NOW() - '1 minute'::interval)289 `); err != nil {290 t.Fatalf("unexpected error inserting records: %s", err)291 }292 store := testStore(db, defaultTestStoreOptions(nil))293 // Worker A294 record1, ok, err := store.Dequeue(context.Background(), "test", nil)295 if err != nil {296 t.Fatalf("unexpected error: %s", err)297 }298 if !ok {299 t.Fatalf("expected a dequeueable record")300 }301 // Worker B302 record2, ok, err := store.Dequeue(context.Background(), "test", nil)303 if err != nil {304 t.Fatalf("unexpected error: %s", err)305 }306 if !ok {307 t.Fatalf("expected a second dequeueable record")308 }309 if val := record1.(TestRecord).ID; val != 1 {310 t.Errorf("unexpected id. want=%d have=%d", 1, val)311 }312 if val := record2.(TestRecord).ID; val != 2 {313 t.Errorf("unexpected id. want=%d have=%d", 2, val)314 }315 // Worker C316 _, ok, err = store.Dequeue(context.Background(), "test", nil)317 if err != nil {318 t.Fatalf("unexpected error: %s", err)319 }320 if ok {321 t.Fatalf("did not expect a third dequeueable record")322 }323}324func TestStoreDequeueRetryAfter(t *testing.T) {325 db := setupStoreTest(t)326 if _, err := db.ExecContext(context.Background(), `327 INSERT INTO workerutil_test (id, state, finished_at, failure_message, num_failures, created_at)328 VALUES329 (1, 'errored', NOW() - '6 minute'::interval, 'error', 3, NOW() - '2 minutes'::interval),330 (2, 'errored', NOW() - '4 minute'::interval, 'error', 0, NOW() - '3 minutes'::interval),331 (3, 'errored', NOW() - '6 minute'::interval, 'error', 5, NOW() - '4 minutes'::interval),332 (4, 'queued', NULL, NULL, 0, NOW() - '1 minutes'::interval)333 `); err != nil {334 t.Fatalf("unexpected error inserting records: %s", err)335 }336 options := defaultTestStoreOptions(nil)337 options.Scan = testScanFirstRecordRetry338 options.MaxNumRetries = 5339 options.RetryAfter = 5 * time.Minute340 options.ColumnExpressions = []*sqlf.Query{341 sqlf.Sprintf("workerutil_test.id"),342 sqlf.Sprintf("workerutil_test.state"),343 sqlf.Sprintf("workerutil_test.num_resets"),344 }345 store := testStore(db, options)346 // Dequeue errored record347 record1, ok, err := store.Dequeue(context.Background(), "test", nil)348 assertDequeueRecordRetryResult(t, 1, record1, ok, err)349 // Dequeue non-errored record350 record2, ok, err := store.Dequeue(context.Background(), "test", nil)351 assertDequeueRecordRetryResult(t, 4, record2, ok, err)352 // Does not dequeue old or max retried errored353 if _, ok, _ := store.Dequeue(context.Background(), "test", nil); ok {354 t.Fatalf("did not expect a third dequeueable record")355 }356}357func TestStoreDequeueRetryAfterDisabled(t *testing.T) {358 db := setupStoreTest(t)359 if _, err := db.ExecContext(context.Background(), `360 INSERT INTO workerutil_test (id, state, finished_at, failure_message, num_failures, created_at)361 VALUES362 (1, 'errored', NOW() - '6 minute'::interval, 'error', 3, NOW() - '2 minutes'::interval),363 (2, 'errored', NOW() - '4 minute'::interval, 'error', 0, NOW() - '3 minutes'::interval),364 (3, 'errored', NOW() - '6 minute'::interval, 'error', 5, NOW() - '4 minutes'::interval),365 (4, 'queued', NULL, NULL, 0, NOW() - '1 minutes'::interval)366 `); err != nil {367 t.Fatalf("unexpected error inserting records: %s", err)368 }369 options := defaultTestStoreOptions(nil)370 options.Scan = testScanFirstRecordRetry371 options.MaxNumRetries = 5372 options.RetryAfter = 0373 options.ColumnExpressions = []*sqlf.Query{374 sqlf.Sprintf("workerutil_test.id"),375 sqlf.Sprintf("workerutil_test.state"),376 sqlf.Sprintf("workerutil_test.num_resets"),377 }378 store := testStore(db, options)379 // Dequeue non-errored record only380 record2, ok, err := store.Dequeue(context.Background(), "test", nil)381 assertDequeueRecordRetryResult(t, 4, record2, ok, err)382 // Does not dequeue errored383 if _, ok, _ := store.Dequeue(context.Background(), "test", nil); ok {384 t.Fatalf("did not expect a second dequeueable record")385 }386}387func TestStoreRequeue(t *testing.T) {388 db := setupStoreTest(t)389 if _, err := db.ExecContext(context.Background(), `390 INSERT INTO workerutil_test (id, state)391 VALUES392 (1, 'processing')393 `); err != nil {394 t.Fatalf("unexpected error inserting records: %s", err)395 }396 after := testNow().Add(time.Hour)397 if err := testStore(db, defaultTestStoreOptions(nil)).Requeue(context.Background(), 1, after); err != nil {398 t.Fatalf("unexpected error requeueing record: %s", err)399 }400 rows, err := db.QueryContext(context.Background(), `SELECT state, process_after FROM workerutil_test WHERE id = 1`)401 if err != nil {402 t.Fatalf("unexpected error querying record: %s", err)403 }404 defer func() { _ = basestore.CloseRows(rows, nil) }()405 if !rows.Next() {406 t.Fatal("expected record to exist")407 }408 var state string409 var processAfter *time.Time410 if err := rows.Scan(&state, &processAfter); err != nil {411 t.Fatalf("unexpected error scanning record: %s", err)412 }413 if state != "queued" {414 t.Errorf("unexpected state. want=%q have=%q", "queued", state)415 }416 if processAfter == nil || !processAfter.Equal(after) {417 t.Errorf("unexpected process after. want=%s have=%s", after, processAfter)418 }419}420func TestStoreAddExecutionLogEntry(t *testing.T) {421 db := setupStoreTest(t)422 if _, err := db.ExecContext(context.Background(), `423 INSERT INTO workerutil_test (id, state)424 VALUES425 (1, 'processing')426 `); err != nil {427 t.Fatalf("unexpected error inserting records: %s", err)428 }429 numEntries := 5430 for i := 0; i < numEntries; i++ {431 command := []string{"ls", "-a", fmt.Sprintf("%d", i+1)}432 payload := fmt.Sprintf("<load payload %d>", i+1)433 entry := workerutil.ExecutionLogEntry{434 Command: command,435 Out: payload,436 }437 entryID, err := testStore(db, defaultTestStoreOptions(nil)).AddExecutionLogEntry(context.Background(), 1, entry, ExecutionLogEntryOptions{})438 if err != nil {439 t.Fatalf("unexpected error adding executor log entry: %s", err)440 }441 // PostgreSQL's arrays use 1-based indexing, so the first entry is at 1442 if entryID != i+1 {443 t.Fatalf("executor log entry has wrong entry id. want=%d, have=%d", i+1, entryID)444 }445 }446 contents, err := basestore.ScanStrings(db.QueryContext(context.Background(), `SELECT unnest(execution_logs)::text FROM workerutil_test WHERE id = 1`))447 if err != nil {448 t.Fatalf("unexpected error scanning record: %s", err)449 }450 if len(contents) != numEntries {451 t.Fatalf("unexpected number of payloads. want=%d have=%d", numEntries, len(contents))452 }453 for i := 0; i < numEntries; i++ {454 var entry workerutil.ExecutionLogEntry455 if err := json.Unmarshal([]byte(contents[i]), &entry); err != nil {456 t.Fatalf("unexpected error decoding entry: %s", err)457 }458 expected := workerutil.ExecutionLogEntry{459 Command: []string{"ls", "-a", fmt.Sprintf("%d", i+1)},460 Out: fmt.Sprintf("<load payload %d>", i+1),461 }462 if diff := cmp.Diff(expected, entry); diff != "" {463 t.Errorf("unexpected entry (-want +got):\n%s", diff)464 }465 }466}467func TestStoreAddExecutionLogEntryNoRecord(t *testing.T) {468 db := setupStoreTest(t)469 entry := workerutil.ExecutionLogEntry{470 Command: []string{"ls", "-a"},471 Out: "output",472 }473 _, err := testStore(db, defaultTestStoreOptions(nil)).AddExecutionLogEntry(context.Background(), 1, entry, ExecutionLogEntryOptions{})474 if err == nil {475 t.Fatalf("expected error but got none")476 }477}478func TestStoreUpdateExecutionLogEntry(t *testing.T) {479 db := setupStoreTest(t)480 if _, err := db.ExecContext(context.Background(), `481 INSERT INTO workerutil_test (id, state)482 VALUES483 (1, 'processing')484 `); err != nil {485 t.Fatalf("unexpected error inserting records: %s", err)486 }487 numEntries := 5488 for i := 0; i < numEntries; i++ {489 command := []string{"ls", "-a", fmt.Sprintf("%d", i+1)}490 payload := fmt.Sprintf("<load payload %d>", i+1)491 entry := workerutil.ExecutionLogEntry{492 Command: command,493 Out: payload,494 }495 entryID, err := testStore(db, defaultTestStoreOptions(nil)).AddExecutionLogEntry(context.Background(), 1, entry, ExecutionLogEntryOptions{})496 if err != nil {497 t.Fatalf("unexpected error adding executor log entry: %s", err)498 }499 // PostgreSQL's arrays use 1-based indexing, so the first entry is at 1500 if entryID != i+1 {501 t.Fatalf("executor log entry has wrong entry id. want=%d, have=%d", i+1, entryID)502 }503 entry.Out += fmt.Sprintf("\n<load payload %d again, nobody was at home>", i+1)504 if err := testStore(db, defaultTestStoreOptions(nil)).UpdateExecutionLogEntry(context.Background(), 1, entryID, entry, ExecutionLogEntryOptions{}); err != nil {505 t.Fatalf("unexpected error updating executor log entry: %s", err)506 }507 }508 contents, err := basestore.ScanStrings(db.QueryContext(context.Background(), `SELECT unnest(execution_logs)::text FROM workerutil_test WHERE id = 1`))509 if err != nil {510 t.Fatalf("unexpected error scanning record: %s", err)511 }512 if len(contents) != numEntries {513 t.Fatalf("unexpected number of payloads. want=%d have=%d", numEntries, len(contents))514 }515 for i := 0; i < numEntries; i++ {516 var entry workerutil.ExecutionLogEntry517 if err := json.Unmarshal([]byte(contents[i]), &entry); err != nil {518 t.Fatalf("unexpected error decoding entry: %s", err)519 }520 expected := workerutil.ExecutionLogEntry{521 Command: []string{"ls", "-a", fmt.Sprintf("%d", i+1)},522 Out: fmt.Sprintf("<load payload %d>\n<load payload %d again, nobody was at home>", i+1, i+1),523 }524 if diff := cmp.Diff(expected, entry); diff != "" {525 t.Errorf("unexpected entry (-want +got):\n%s", diff)526 }527 }528}529func TestStoreUpdateExecutionLogEntryUnknownEntry(t *testing.T) {530 db := setupStoreTest(t)531 if _, err := db.ExecContext(context.Background(), `532 INSERT INTO workerutil_test (id, state)533 VALUES534 (1, 'processing')535 `); err != nil {536 t.Fatalf("unexpected error inserting records: %s", err)537 }538 entry := workerutil.ExecutionLogEntry{539 Command: []string{"ls", "-a"},540 Out: "<load payload>",541 }542 for unknownEntryID := 0; unknownEntryID < 2; unknownEntryID++ {543 err := testStore(db, defaultTestStoreOptions(nil)).UpdateExecutionLogEntry(context.Background(), 1, unknownEntryID, entry, ExecutionLogEntryOptions{})544 if err == nil {545 t.Fatal("expected error but got none")546 }547 }548}549func TestStoreMarkComplete(t *testing.T) {550 db := setupStoreTest(t)551 if _, err := db.ExecContext(context.Background(), `552 INSERT INTO workerutil_test (id, state)553 VALUES554 (1, 'processing')555 `); err != nil {556 t.Fatalf("unexpected error inserting records: %s", err)557 }558 marked, err := testStore(db, defaultTestStoreOptions(nil)).MarkComplete(context.Background(), 1, MarkFinalOptions{})559 if err != nil {560 t.Fatalf("unexpected error marking record as completed: %s", err)561 }562 if !marked {563 t.Fatalf("expected record to be marked")564 }565 rows, err := db.QueryContext(context.Background(), `SELECT state, failure_message FROM workerutil_test WHERE id = 1`)566 if err != nil {567 t.Fatalf("unexpected error querying record: %s", err)568 }569 defer func() { _ = basestore.CloseRows(rows, nil) }()570 if !rows.Next() {571 t.Fatal("expected record to exist")572 }573 var state string574 var failureMessage *string575 if err := rows.Scan(&state, &failureMessage); err != nil {576 t.Fatalf("unexpected error scanning record: %s", err)577 }578 if state != "completed" {579 t.Errorf("unexpected state. want=%q have=%q", "completed", state)580 }581 if failureMessage != nil {582 t.Errorf("unexpected failure message. want=%v have=%v", nil, failureMessage)583 }584}585func TestStoreMarkCompleteNotProcessing(t *testing.T) {586 db := setupStoreTest(t)587 if _, err := db.ExecContext(context.Background(), `588 INSERT INTO workerutil_test (id, state, failure_message)589 VALUES590 (1, 'errored', 'old message')591 `); err != nil {592 t.Fatalf("unexpected error inserting records: %s", err)593 }594 marked, err := testStore(db, defaultTestStoreOptions(nil)).MarkComplete(context.Background(), 1, MarkFinalOptions{})595 if err != nil {596 t.Fatalf("unexpected error marking record as completed: %s", err)597 }598 if marked {599 t.Fatalf("expected record not to be marked")600 }601 rows, err := db.QueryContext(context.Background(), `SELECT state, failure_message FROM workerutil_test WHERE id = 1`)602 if err != nil {603 t.Fatalf("unexpected error querying record: %s", err)604 }605 defer func() { _ = basestore.CloseRows(rows, nil) }()606 if !rows.Next() {607 t.Fatal("expected record to exist")608 }609 var state string610 var failureMessage *string611 if err := rows.Scan(&state, &failureMessage); err != nil {612 t.Fatalf("unexpected error scanning record: %s", err)613 }614 if state != "errored" {615 t.Errorf("unexpected state. want=%q have=%q", "errored", state)616 }617 if failureMessage == nil || *failureMessage != "old message" {618 t.Errorf("unexpected failure message. want=%v have=%v", "old message", failureMessage)619 }620}621func TestStoreMarkErrored(t *testing.T) {622 db := setupStoreTest(t)623 if _, err := db.ExecContext(context.Background(), `624 INSERT INTO workerutil_test (id, state)625 VALUES626 (1, 'processing')627 `); err != nil {628 t.Fatalf("unexpected error inserting records: %s", err)629 }630 marked, err := testStore(db, defaultTestStoreOptions(nil)).MarkErrored(context.Background(), 1, "new message", MarkFinalOptions{})631 if err != nil {632 t.Fatalf("unexpected error marking record as errored: %s", err)633 }634 if !marked {635 t.Fatalf("expected record to be marked")636 }637 rows, err := db.QueryContext(context.Background(), `SELECT state, failure_message FROM workerutil_test WHERE id = 1`)638 if err != nil {639 t.Fatalf("unexpected error querying record: %s", err)640 }641 defer func() { _ = basestore.CloseRows(rows, nil) }()642 if !rows.Next() {643 t.Fatal("expected record to exist")644 }645 var state string646 var failureMessage *string647 if err := rows.Scan(&state, &failureMessage); err != nil {648 t.Fatalf("unexpected error scanning record: %s", err)649 }650 if state != "errored" {651 t.Errorf("unexpected state. want=%q have=%q", "errored", state)652 }653 if failureMessage == nil || *failureMessage != "new message" {654 t.Errorf("unexpected failure message. want=%v have=%v", "new message", failureMessage)655 }656}657func TestStoreMarkFailed(t *testing.T) {658 db := setupStoreTest(t)659 if _, err := db.ExecContext(context.Background(), `660 INSERT INTO workerutil_test (id, state)661 VALUES662 (1, 'processing')663 `); err != nil {664 t.Fatalf("unexpected error inserting records: %s", err)665 }666 marked, err := testStore(db, defaultTestStoreOptions(nil)).MarkFailed(context.Background(), 1, "new message", MarkFinalOptions{})667 if err != nil {668 t.Fatalf("unexpected error marking upload as completed: %s", err)669 }670 if !marked {671 t.Fatalf("expected record to be marked")672 }673 rows, err := db.QueryContext(context.Background(), `SELECT state, failure_message FROM workerutil_test WHERE id = 1`)674 if err != nil {675 t.Fatalf("unexpected error querying record: %s", err)676 }677 defer func() { _ = basestore.CloseRows(rows, nil) }()678 if !rows.Next() {679 t.Fatal("expected record to exist")680 }681 var state string682 var failureMessage *string683 if err := rows.Scan(&state, &failureMessage); err != nil {684 t.Fatalf("unexpected error scanning record: %s", err)685 }686 if state != "failed" {687 t.Errorf("unexpected state. want=%q have=%q", "failed", state)688 }689 if failureMessage == nil || *failureMessage != "new message" {690 t.Errorf("unexpected failure message. want=%v have=%v", "new message", failureMessage)691 }692}693func TestStoreMarkErroredAlreadyCompleted(t *testing.T) {694 db := setupStoreTest(t)695 if _, err := db.ExecContext(context.Background(), `696 INSERT INTO workerutil_test (id, state)697 VALUES698 (1, 'completed')699 `); err != nil {700 t.Fatalf("unexpected error inserting records: %s", err)701 }702 marked, err := testStore(db, defaultTestStoreOptions(nil)).MarkErrored(context.Background(), 1, "new message", MarkFinalOptions{})703 if err != nil {704 t.Fatalf("unexpected error marking record as errored: %s", err)705 }706 if marked {707 t.Fatalf("expected record not to be marked errired")708 }709 rows, err := db.QueryContext(context.Background(), `SELECT state, failure_message FROM workerutil_test WHERE id = 1`)710 if err != nil {711 t.Fatalf("unexpected error querying record: %s", err)712 }713 defer func() { _ = basestore.CloseRows(rows, nil) }()714 if !rows.Next() {715 t.Fatal("expected record to exist")716 }717 var state string718 var failureMessage *string719 if err := rows.Scan(&state, &failureMessage); err != nil {720 t.Fatalf("unexpected error scanning record: %s", err)721 }722 if state != "completed" {723 t.Errorf("unexpected state. want=%q have=%q", "completed", state)724 }725 if failureMessage != nil {726 t.Errorf("unexpected non-empty failure message")727 }728}729func TestStoreMarkErroredAlreadyErrored(t *testing.T) {730 db := setupStoreTest(t)731 if _, err := db.ExecContext(context.Background(), `732 INSERT INTO workerutil_test (id, state, failure_message)733 VALUES734 (1, 'errored', 'old message')735 `); err != nil {736 t.Fatalf("unexpected error inserting records: %s", err)737 }738 marked, err := testStore(db, defaultTestStoreOptions(nil)).MarkErrored(context.Background(), 1, "new message", MarkFinalOptions{})739 if err != nil {740 t.Fatalf("unexpected error marking record as errored: %s", err)741 }742 if marked {743 t.Fatalf("expected record not to be marked")744 }745 rows, err := db.QueryContext(context.Background(), `SELECT state, failure_message FROM workerutil_test WHERE id = 1`)746 if err != nil {747 t.Fatalf("unexpected error querying record: %s", err)748 }749 defer func() { _ = basestore.CloseRows(rows, nil) }()750 if !rows.Next() {751 t.Fatal("expected record to exist")752 }753 var state string754 var failureMessage *string755 if err := rows.Scan(&state, &failureMessage); err != nil {756 t.Fatalf("unexpected error scanning record: %s", err)757 }758 if state != "errored" {759 t.Errorf("unexpected state. want=%q have=%q", "errored", state)760 }761 if failureMessage == nil || *failureMessage != "old message" {762 t.Errorf("unexpected failure message. want=%v have=%v", "old message", failureMessage)763 }764}765func TestStoreMarkErroredRetriesExhausted(t *testing.T) {766 db := setupStoreTest(t)767 if _, err := db.ExecContext(context.Background(), `768 INSERT INTO workerutil_test (id, state, num_failures)769 VALUES770 (1, 'processing', 0),771 (2, 'processing', 1)772 `); err != nil {773 t.Fatalf("unexpected error inserting records: %s", err)774 }775 options := defaultTestStoreOptions(nil)776 options.MaxNumRetries = 2777 store := testStore(db, options)778 for i := 1; i < 3; i++ {779 marked, err := store.MarkErrored(context.Background(), i, "new message", MarkFinalOptions{})780 if err != nil {781 t.Fatalf("unexpected error marking record as errored: %s", err)782 }783 if !marked {784 t.Fatalf("expected record to be marked")785 }786 }787 assertState := func(id int, wantState string) {788 q := fmt.Sprintf(`SELECT state FROM workerutil_test WHERE id = %d`, id)789 rows, err := db.QueryContext(context.Background(), q)790 if err != nil {791 t.Fatalf("unexpected error querying record: %s", err)792 }793 defer func() { _ = basestore.CloseRows(rows, nil) }()794 if !rows.Next() {795 t.Fatal("expected record to exist")796 }797 var state string798 if err := rows.Scan(&state); err != nil {799 t.Fatalf("unexpected error scanning record: %s", err)800 }801 if state != wantState {802 t.Errorf("record %d unexpected state. want=%q have=%q", id, wantState, state)803 }804 }805 assertState(1, "errored")806 assertState(2, "failed")807}808func TestStoreResetStalled(t *testing.T) {809 db := setupStoreTest(t)810 if _, err := db.ExecContext(context.Background(), `811 INSERT INTO workerutil_test (id, state, last_heartbeat_at, num_resets)812 VALUES813 (1, 'processing', NOW() - '6 second'::interval, 1),814 (2, 'processing', NOW() - '2 second'::interval, 0),815 (3, 'processing', NOW() - '3 second'::interval, 0),816 (4, 'processing', NOW() - '8 second'::interval, 0),817 (5, 'processing', NOW() - '8 second'::interval, 0),818 (6, 'processing', NOW() - '6 second'::interval, 5),819 (7, 'processing', NOW() - '8 second'::interval, 5)820 `); err != nil {821 t.Fatalf("unexpected error inserting records: %s", err)822 }823 tx, err := db.BeginTx(context.Background(), nil)824 if err != nil {825 t.Fatal(err)826 }827 defer func() { _ = tx.Rollback() }()828 // Row lock record 5 in a transaction which should be skipped by ResetStalled829 if _, err := tx.Exec(`SELECT * FROM workerutil_test WHERE id = 5 FOR UPDATE`); err != nil {830 t.Fatal(err)831 }832 resetLastHeartbeatsByIDs, erroredLastHeartbeatsByIDs, err := testStore(db, defaultTestStoreOptions(nil)).ResetStalled(context.Background())833 if err != nil {834 t.Fatalf("unexpected error resetting stalled records: %s", err)835 }836 var resetIDs []int837 for id := range resetLastHeartbeatsByIDs {838 resetIDs = append(resetIDs, id)839 }840 sort.Ints(resetIDs)841 var erroredIDs []int842 for id := range erroredLastHeartbeatsByIDs {843 erroredIDs = append(erroredIDs, id)844 }845 sort.Ints(erroredIDs)846 if diff := cmp.Diff([]int{1, 4}, resetIDs); diff != "" {847 t.Errorf("unexpected reset ids (-want +got):\n%s", diff)848 }849 if diff := cmp.Diff([]int{6, 7}, erroredIDs); diff != "" {850 t.Errorf("unexpected errored ids (-want +got):\n%s", diff)851 }852 rows, err := db.QueryContext(context.Background(), `SELECT state, num_resets FROM workerutil_test WHERE id = 1`)853 if err != nil {854 t.Fatalf("unexpected error querying record: %s", err)855 }856 defer func() { _ = basestore.CloseRows(rows, nil) }()857 if !rows.Next() {858 t.Fatal("expected record to exist")859 }860 var state string861 var numResets int862 if err := rows.Scan(&state, &numResets); err != nil {863 t.Fatalf("unexpected error scanning record: %s", err)864 }865 if state != "queued" {866 t.Errorf("unexpected state. want=%q have=%q", "queued", state)867 }868 if numResets != 2 {869 t.Errorf("unexpected num resets. want=%d have=%d", 2, numResets)870 }871 rows, err = db.QueryContext(context.Background(), `SELECT state FROM workerutil_test WHERE id = 6`)872 if err != nil {873 t.Fatalf("unexpected error querying record: %s", err)874 }875 defer func() { _ = basestore.CloseRows(rows, nil) }()876 if !rows.Next() {877 t.Fatal("expected record to exist")878 }879 if err := rows.Scan(&state); err != nil {880 t.Fatalf("unexpected error scanning record: %s", err)881 }882 if state != "failed" {883 t.Errorf("unexpected state. want=%q have=%q", "failed", state)884 }885}886func TestStoreHeartbeat(t *testing.T) {887 db := setupStoreTest(t)888 now := time.Unix(1587396557, 0).UTC()889 clock := glock.NewMockClockAt(now)890 store := testStore(db, defaultTestStoreOptions(clock))891 if err := store.Exec(context.Background(), sqlf.Sprintf(`892 INSERT INTO workerutil_test (id, state, worker_hostname, last_heartbeat_at)893 VALUES894 (1, 'queued', 'worker1', %s),895 (2, 'queued', 'worker1', %s),896 (3, 'queued', 'worker2', %s)897 `, now, now, now)); err != nil {898 t.Fatalf("unexpected error inserting records: %s", err)899 }900 readAndCompareTimes := func(expected map[int]time.Duration) {901 times, err := scanLastHeartbeatTimestampsFrom(clock.Now())(store.Query(context.Background(), sqlf.Sprintf(`902 SELECT id, last_heartbeat_at FROM workerutil_test903 `)))904 if err != nil {905 t.Fatalf("unexpected error scanning heartbeats: %s", err)906 }907 if diff := cmp.Diff(expected, times); diff != "" {908 t.Errorf("unexpected times (-want +got):\n%s", diff)909 }910 }911 clock.Advance(5 * time.Second)912 if _, err := store.Heartbeat(context.Background(), []int{1, 2, 3}, HeartbeatOptions{}); err != nil {913 t.Fatalf("unexpected error updating heartbeat: %s", err)914 }915 readAndCompareTimes(map[int]time.Duration{916 1: 5 * time.Second, // not updated, clock advanced 5s from start; note state='queued'917 2: 5 * time.Second, // not updated, clock advanced 5s from start; note state='queued'918 3: 5 * time.Second, // not updated, clock advanced 5s from start; note state='queued'919 })920 // Now update state to processing and expect it to update properly.921 if _, err := db.ExecContext(context.Background(), `UPDATE workerutil_test SET state = 'processing'`); err != nil {922 t.Fatalf("unexpected error updating records: %s", err)923 }924 clock.Advance(5 * time.Second)925 // Only one worker926 if _, err := store.Heartbeat(context.Background(), []int{1, 2, 3}, HeartbeatOptions{WorkerHostname: "worker1"}); err != nil {927 t.Fatalf("unexpected error updating heartbeat: %s", err)928 }929 readAndCompareTimes(map[int]time.Duration{930 1: 0, // updated931 2: 0, // updated932 3: 10 * time.Second, // not updated, clock advanced 10s from start; note worker_hostname=worker2933 })934 clock.Advance(5 * time.Second)935 // Multiple workers936 if _, err := store.Heartbeat(context.Background(), []int{1, 3}, HeartbeatOptions{}); err != nil {937 t.Fatalf("unexpected error updating heartbeat: %s", err)938 }939 readAndCompareTimes(map[int]time.Duration{940 1: 0, // updated941 2: 5 * time.Second, // not in known ID list942 3: 0, // updated943 })944}...
bulk_processor_test.go
Source:bulk_processor_test.go
...19)20func TestBulkProcessor(t *testing.T) {21 t.Parallel()22 ctx := context.Background()23 sqlDB := dbtest.NewDB(t)24 tx := dbtest.NewTx(t, sqlDB)25 db := database.NewDB(sqlDB)26 bstore := store.New(database.NewDBWith(basestore.NewWithHandle(basestore.NewHandleWithTx(tx, sql.TxOptions{}))), &observation.TestContext, nil)27 user := ct.CreateTestUser(t, db, true)28 repo, _ := ct.CreateTestRepo(t, ctx, db)29 ct.CreateTestSiteCredential(t, bstore, repo)30 batchSpec := ct.CreateBatchSpec(t, ctx, bstore, "test-bulk", user.ID)31 batchChange := ct.CreateBatchChange(t, ctx, bstore, "test-bulk", user.ID, batchSpec.ID)32 changesetSpec := ct.CreateChangesetSpec(t, ctx, bstore, ct.TestSpecOpts{33 User: user.ID,34 Repo: repo.ID,35 BatchSpec: batchSpec.ID,36 HeadRef: "main",37 })38 changeset := ct.CreateChangeset(t, ctx, bstore, ct.TestChangesetOpts{39 Repo: repo.ID,40 BatchChanges: []types.BatchChangeAssoc{{BatchChangeID: batchChange.ID}},41 Metadata: &github.PullRequest{},42 ExternalServiceType: extsvc.TypeGitHub,43 CurrentSpec: changesetSpec.ID,44 })45 t.Run("Unknown job type", func(t *testing.T) {46 fake := &sources.FakeChangesetSource{}47 bp := &bulkProcessor{48 tx: bstore,49 sourcer: sources.NewFakeSourcer(nil, fake),50 }51 job := &types.ChangesetJob{JobType: types.ChangesetJobType("UNKNOWN")}52 err := bp.Process(ctx, job)53 if err == nil || err.Error() != `invalid job type "UNKNOWN"` {54 t.Fatalf("unexpected error returned %s", err)55 }56 })57 t.Run("changeset is processing", func(t *testing.T) {58 processingChangeset := ct.CreateChangeset(t, ctx, bstore, ct.TestChangesetOpts{59 Repo: repo.ID,60 BatchChanges: []types.BatchChangeAssoc{{BatchChangeID: batchChange.ID}},61 Metadata: &github.PullRequest{},62 ExternalServiceType: extsvc.TypeGitHub,63 CurrentSpec: changesetSpec.ID,64 ReconcilerState: btypes.ReconcilerStateProcessing,65 })66 job := &types.ChangesetJob{67 // JobType doesn't matter but we need one for database validation68 JobType: types.ChangesetJobTypeComment,69 ChangesetID: processingChangeset.ID,70 UserID: user.ID,71 }72 if err := bstore.CreateChangesetJob(ctx, job); err != nil {73 t.Fatal(err)74 }75 bp := &bulkProcessor{tx: bstore}76 err := bp.Process(ctx, job)77 if err != changesetIsProcessingErr {78 t.Fatalf("unexpected error. want=%s, got=%s", changesetIsProcessingErr, err)79 }80 })81 t.Run("Comment job", func(t *testing.T) {82 fake := &sources.FakeChangesetSource{}83 bp := &bulkProcessor{84 tx: bstore,85 sourcer: sources.NewFakeSourcer(nil, fake),86 }87 job := &types.ChangesetJob{88 JobType: types.ChangesetJobTypeComment,89 ChangesetID: changeset.ID,90 UserID: user.ID,91 Payload: &btypes.ChangesetJobCommentPayload{},92 }93 if err := bstore.CreateChangesetJob(ctx, job); err != nil {94 t.Fatal(err)95 }96 err := bp.Process(ctx, job)97 if err != nil {98 t.Fatal(err)99 }100 if !fake.CreateCommentCalled {101 t.Fatal("expected CreateComment to be called but wasn't")102 }103 })104 t.Run("Detach job", func(t *testing.T) {105 fake := &sources.FakeChangesetSource{}106 bp := &bulkProcessor{107 tx: bstore,108 sourcer: sources.NewFakeSourcer(nil, fake),109 }110 job := &types.ChangesetJob{111 JobType: types.ChangesetJobTypeDetach,112 ChangesetID: changeset.ID,113 UserID: user.ID,114 BatchChangeID: batchChange.ID,115 Payload: &btypes.ChangesetJobDetachPayload{},116 }117 err := bp.Process(ctx, job)118 if err != nil {119 t.Fatal(err)120 }121 ch, err := bstore.GetChangesetByID(ctx, changeset.ID)122 if err != nil {123 t.Fatal(err)124 }125 if len(ch.BatchChanges) != 1 {126 t.Fatalf("invalid batch changes associated, expected one, got=%+v", ch.BatchChanges)127 }128 if !ch.BatchChanges[0].Detach {129 t.Fatal("not marked as to be detached")130 }131 if ch.ReconcilerState != btypes.ReconcilerStateQueued {132 t.Fatalf("invalid reconciler state, got=%q", ch.ReconcilerState)133 }134 })135 t.Run("Reenqueue job", func(t *testing.T) {136 fake := &sources.FakeChangesetSource{}137 bp := &bulkProcessor{138 tx: bstore,139 sourcer: sources.NewFakeSourcer(nil, fake),140 }141 job := &types.ChangesetJob{142 JobType: types.ChangesetJobTypeReenqueue,143 ChangesetID: changeset.ID,144 UserID: user.ID,145 Payload: &btypes.ChangesetJobReenqueuePayload{},146 }147 changeset.ReconcilerState = btypes.ReconcilerStateFailed148 if err := bstore.UpdateChangeset(ctx, changeset); err != nil {149 t.Fatal(err)150 }151 err := bp.Process(ctx, job)152 if err != nil {153 t.Fatal(err)154 }155 changeset, err = bstore.GetChangesetByID(ctx, changeset.ID)156 if err != nil {157 t.Fatal(err)158 }159 if have, want := changeset.ReconcilerState, btypes.ReconcilerStateQueued; have != want {160 t.Fatalf("unexpected reconciler state, have=%q want=%q", have, want)161 }162 })163 t.Run("Merge job", func(t *testing.T) {164 fake := &sources.FakeChangesetSource{}165 bp := &bulkProcessor{166 tx: bstore,167 sourcer: sources.NewFakeSourcer(nil, fake),168 }169 job := &types.ChangesetJob{170 JobType: types.ChangesetJobTypeMerge,171 ChangesetID: changeset.ID,172 UserID: user.ID,173 Payload: &btypes.ChangesetJobMergePayload{},174 }175 err := bp.Process(ctx, job)176 if err != nil {177 t.Fatal(err)178 }179 if !fake.MergeChangesetCalled {180 t.Fatal("expected MergeChangeset to be called but wasn't")181 }182 })183 t.Run("Close job", func(t *testing.T) {184 fake := &sources.FakeChangesetSource{FakeMetadata: &github.PullRequest{}}185 bp := &bulkProcessor{186 tx: bstore,187 sourcer: sources.NewFakeSourcer(nil, fake),188 }189 job := &types.ChangesetJob{190 JobType: types.ChangesetJobTypeClose,191 ChangesetID: changeset.ID,192 UserID: user.ID,193 Payload: &btypes.ChangesetJobClosePayload{},194 }195 err := bp.Process(ctx, job)196 if err != nil {197 t.Fatal(err)198 }199 if !fake.CloseChangesetCalled {200 t.Fatal("expected CloseChangeset to be called but wasn't")201 }202 })203 t.Run("Publish job", func(t *testing.T) {204 fake := &sources.FakeChangesetSource{FakeMetadata: &github.PullRequest{}}205 bp := &bulkProcessor{206 tx: bstore,207 sourcer: sources.NewFakeSourcer(nil, fake),208 }209 t.Run("errors", func(t *testing.T) {210 for name, tc := range map[string]struct {211 spec *ct.TestSpecOpts212 changeset ct.TestChangesetOpts213 wantRetryable bool214 }{215 "imported changeset": {216 spec: nil,217 changeset: ct.TestChangesetOpts{218 Repo: repo.ID,219 BatchChange: batchChange.ID,220 CurrentSpec: 0,221 ReconcilerState: btypes.ReconcilerStateCompleted,222 },223 wantRetryable: false,224 },225 "bogus changeset spec ID, dude": {226 spec: nil,227 changeset: ct.TestChangesetOpts{228 Repo: repo.ID,229 BatchChange: batchChange.ID,230 CurrentSpec: -1,231 ReconcilerState: btypes.ReconcilerStateCompleted,232 },233 wantRetryable: false,234 },235 "publication state set": {236 spec: &ct.TestSpecOpts{237 User: user.ID,238 Repo: repo.ID,239 BatchSpec: batchSpec.ID,240 HeadRef: "main",241 Published: false,242 },243 changeset: ct.TestChangesetOpts{244 Repo: repo.ID,245 BatchChange: batchChange.ID,246 ReconcilerState: btypes.ReconcilerStateCompleted,247 },248 wantRetryable: false,249 },250 } {251 t.Run(name, func(t *testing.T) {252 var changesetSpec *btypes.ChangesetSpec253 if tc.spec != nil {254 changesetSpec = ct.CreateChangesetSpec(t, ctx, bstore, *tc.spec)255 }256 if changesetSpec != nil {257 tc.changeset.CurrentSpec = changesetSpec.ID258 }259 changeset := ct.CreateChangeset(t, ctx, bstore, tc.changeset)260 job := &types.ChangesetJob{261 JobType: types.ChangesetJobTypePublish,262 BatchChangeID: batchChange.ID,263 ChangesetID: changeset.ID,264 UserID: user.ID,265 Payload: &types.ChangesetJobPublishPayload{266 Draft: false,267 },268 }269 if err := bp.Process(ctx, job); err == nil {270 t.Error("unexpected nil error")271 } else if tc.wantRetryable && errcode.IsNonRetryable(err) {272 t.Errorf("error is not retryable: %v", err)273 } else if !tc.wantRetryable && !errcode.IsNonRetryable(err) {274 t.Errorf("error is retryable: %v", err)275 }276 })277 }278 })279 t.Run("success", func(t *testing.T) {280 for _, reconcilerState := range []btypes.ReconcilerState{281 btypes.ReconcilerStateCompleted,282 btypes.ReconcilerStateErrored,283 btypes.ReconcilerStateFailed,284 btypes.ReconcilerStateQueued,285 btypes.ReconcilerStateScheduled,286 } {287 t.Run(string(reconcilerState), func(t *testing.T) {288 for name, draft := range map[string]bool{289 "draft": true,290 "published": false,291 } {292 t.Run(name, func(t *testing.T) {293 changesetSpec := ct.CreateChangesetSpec(t, ctx, bstore, ct.TestSpecOpts{294 User: user.ID,295 Repo: repo.ID,296 BatchSpec: batchSpec.ID,297 HeadRef: "main",298 })299 changeset := ct.CreateChangeset(t, ctx, bstore, ct.TestChangesetOpts{300 Repo: repo.ID,301 BatchChange: batchChange.ID,302 CurrentSpec: changesetSpec.ID,303 ReconcilerState: reconcilerState,304 })305 job := &types.ChangesetJob{306 JobType: types.ChangesetJobTypePublish,307 BatchChangeID: batchChange.ID,308 ChangesetID: changeset.ID,309 UserID: user.ID,310 Payload: &types.ChangesetJobPublishPayload{311 Draft: draft,312 },313 }314 if err := bp.Process(ctx, job); err != nil {315 t.Errorf("unexpected error: %v", err)316 }317 changeset, err := bstore.GetChangesetByID(ctx, changeset.ID)318 if err != nil {319 t.Fatal(err)320 }321 var want btypes.ChangesetUiPublicationState322 if draft {323 want = btypes.ChangesetUiPublicationStateDraft324 } else {325 want = btypes.ChangesetUiPublicationStatePublished326 }327 if have := changeset.UiPublicationState; have == nil || *have != want {328 t.Fatalf("unexpected UI publication state: have=%v want=%q", have, want)329 }330 if have, want := changeset.ReconcilerState, global.DefaultReconcilerEnqueueState(); have != want {331 t.Fatalf("unexpected reconciler state, have=%q want=%q", have, want)332 }333 })334 }335 })336 }337 })338 })339}...
D
Using AI Code Generation
1import Quick2import Nimble3class D: QuickSpec {4 override func spec() {5 describe("D") {6 it("D") {7 expect(1).to(equal(1))8 }9 }10 }11}12import Quick13import Nimble14class D: QuickSpec {15 override func spec() {16 describe("D") {17 it("D") {18 expect(1).to(equal(1))19 }20 }21 }22}23import Quick24import Nimble25class D: QuickSpec {26 override func spec() {27 describe("D") {28 it("D") {29 expect(1).to(equal(1))30 }31 }32 }33}34import Quick35import Nimble36class D: QuickSpec {37 override func spec() {38 describe("D") {39 it("D") {40 expect(1).to(equal(1))41 }42 }43 }44}45import Quick46import Nimble47class D: QuickSpec {48 override func spec() {49 describe("D") {50 it("D") {51 expect(1).to(equal(1))52 }53 }54 }55}56import Quick57import Nimble58class D: QuickSpec {59 override func spec() {60 describe("D") {61 it("D") {62 expect(1).to(equal(1))63 }64 }65 }66}67import Quick68import Nimble69class D: QuickSpec {70 override func spec() {71 describe("D") {72 it("D") {73 expect(1).to(equal(1))74 }75 }76 }77}78import Quick79import Nimble80class D: QuickSpec {81 override func spec() {82 describe("D") {83 it("D") {84 expect(1).to(equal(1))85 }86 }87 }88}
D
Using AI Code Generation
1import QuickStart2var d = D()3d.doSomething()4import QuickStart5var e = E()6e.doSomething()7import QuickStart8var f = F()9f.doSomething()10import QuickStart11var g = G()12g.doSomething()13import QuickStart14var h = H()15h.doSomething()16import QuickStart17var i = I()18i.doSomething()19import QuickStart20var j = J()21j.doSomething()22import QuickStart23var k = K()24k.doSomething()25import QuickStart26var l = L()27l.doSomething()28import QuickStart29var m = M()30m.doSomething()31import QuickStart32var n = N()33n.doSomething()34import QuickStart35var o = O()36o.doSomething()37import QuickStart38var p = P()39p.doSomething()40import QuickStart41var q = Q()42q.doSomething()43import QuickStart44var r = R()45r.doSomething()46import QuickStart47var s = S()48s.doSomething()49import QuickStart50var t = T()51t.doSomething()
D
Using AI Code Generation
1import Quick2import Nimble3import Foundation4class D: QuickSpec {5 override func spec() {6 describe("D") {7 it("has a test") {8 expect(true).to(beTrue())9 }10 }11 }12}13import Quick14import Nimble15import Foundation16class C: QuickSpec {17 override func spec() {18 describe("C") {19 it("has a test") {20 expect(true).to(beTrue())21 }22 }23 }24}25import Quick26import Nimble27import Foundation28class B: QuickSpec {29 override func spec() {30 describe("B") {31 it("has a test") {32 expect(true).to(beTrue())33 }34 }35 }36}37import Quick38import Nimble39import Foundation40class A: QuickSpec {41 override func spec() {42 describe("A") {43 it("has a test") {44 expect(true).to(beTrue())45 }46 }47 }48}49import Quick50import Nimble51import Foundation52class E: QuickSpec {53 override func spec() {54 describe("E") {55 it("has a test") {56 expect(true).to(beTrue())57 }58 }59 }60}61import Quick62import Nimble63import Foundation64class F: QuickSpec {65 override func spec() {66 describe("F") {67 it("has a test") {68 expect(true).to(beTrue())69 }70 }71 }72}73import Quick74import Nimble75import Foundation76class G: QuickSpec {77 override func spec() {78 describe("G") {79 it("has a test") {80 expect(true).to(beTrue())81 }82 }83 }84}85import Quick86import Nimble87import Foundation88class H: QuickSpec {89 override func spec() {90 describe("H") {
D
Using AI Code Generation
1import QuickLook2var d = D()3import QuickLookUI4var d = D()5import QuickLookUI6var d = D()7import QuickLook8var d = D()9error: ambiguous use of 'D()'10var d = D()11clang: error: linker command failed with exit code 1 (use -v to see invocation)12I'm trying to use the QuickLook framework in my iOS app. I've added the framework to my project and I've added the #import <QuickLook/QuickLook.h> to my header file. However, when I try to build my project, I get the following error
D
Using AI Code Generation
1import QuickLook2let d = D()3d.doSomething()4import QuickLook5let d = D()6d.doSomething()7import QuickLook8import QuickLook9class MyTableViewCell: UITableViewCell {10 var myString: String? {11 didSet {12 }13 }14 override func awakeFromNib() {15 super.awakeFromNib()16 }17 override func setSelected(_ selected: Bool, animated: Bool) {18 super.setSelected(selected, animated: animated)19 }20}21class ViewController: UIViewController, UITableViewDataSource, UITableViewDelegate {22 override func viewDidLoad() {23 super.viewDidLoad()24 tableView.register(UINib(nibName: "MyTableViewCell", bundle: nil), forCellReuseIdentifier: "MyTableViewCell")25 }26 override func didReceiveMemoryWarning() {27 super.didReceiveMemoryWarning()28 }29 func tableView(_ tableView: UITableView, numberOfRowsInSection section: Int) -> Int {30 }31 func tableView(_ tableView: UITableView, cellForRowAt indexPath: IndexPath) -> UITableViewCell {32 let cell = tableView.dequeueReusableCell(withIdentifier: "MyTableViewCell", for: indexPath) as! MyTableViewCell33 }34}
D
Using AI Code Generation
1import QuickType2let d = D()3d.test()4import QuickType5let d = D()6d.test()7import QuickType8let d = D()9d.test()10import QuickType11let d = D()12d.test()13import QuickType14let d = D()15d.test()16import QuickType
D
Using AI Code Generation
1let d = D()2print("d.property1 = \(d.property1)")3print("d.property2 = \(d.property2)")4print("d.property3 = \(d.property3)")5print("d.property4 = \(d.property4)")6print("d.property5 = \(d.property5)")7print("d.property6 = \(d.property6)")8print("d.property7 = \(d.property7)")9print("d.property8 = \(d.property8)")10print("d.property9 = \(d.property9)")11print("d.property10 = \(d.property10)")12print("d.property11 = \(d.property11)")13print("d.property12 = \(d.property12)")14print("d.property13 = \(d.property13)")15print("d.property14 = \(d.property14)")16print("d.property15 = \(d.property15)")17print("d.property16 = \(d.property16)")18print("d.property17 = \(d.property17)")19print("d.property18 = \(d.property18)")20print("d.property19 = \(d.property19)")21print("d.property20 = \(d.property20)")22print("d.property21 = \(d.property21)")23print("d.property22 = \(d.property22)")24print("d.property23 = \(d.property23)")25print("d.property24 = \(d.property24)")26print("d.property25 = \(d.property25)")27print("d.property26 = \(d.property26)")28print("d.property27 = \(d.property27)")29print("d.property28 = \(d.property28)")30print("d.property29 = \(d.property29)")31print("d.property30 = \(d.property30)")32print("d.property31 = \(d.property31)")33print("d.property32 = \(d.property32)")34print("d.property33 = \(d.property33)")35print("d.property34 = \(d.property34)")36print("d.property35 = \(d.property35)")37print("d.property36 = \(d.property36)")38print("d.property37 = \(d.property37)")39print("d.property38 = \(d.property38)")40print("d.property39 = \(d.property39)")41print("d.property40 = \(d.property40)")42print("d.property41 = \(d.property41)")43print("d.property42 = \(d.property42
D
Using AI Code Generation
1import Foundation2let sorted = D.quickSort(list)3print(sorted)4import Foundation5let sorted = C.quickSort(list)6print(sorted)7import Foundation8let sorted = B.quickSort(list)9print(sorted)10import Foundation11let sorted = A.quickSort(list)12print(sorted)13import Foundation14let sorted = QuickSort.quickSort(list)15print(sorted)16import Foundation17let sorted = QuickSort.quickSort(list)18print(sorted)19import Foundation20let sorted = QuickSort.quickSort(list)21print(sorted)22import Foundation23let sorted = QuickSort.quickSort(list)24print(sorted)25import Foundation
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!