Best Syzkaller code snippet using state.pendingInputs
sweeper.go
Source:sweeper.go
...77 // lastFeeRate is the most recent fee rate used for this input within a78 // transaction broadcast to the network.79 lastFeeRate lnwallet.SatPerKWeight80}81// pendingInputs is a type alias for a set of pending inputs.82type pendingInputs = map[wire.OutPoint]*pendingInput83// inputCluster is a helper struct to gather a set of pending inputs that should84// be swept with the specified fee rate.85type inputCluster struct {86 sweepFeeRate lnwallet.SatPerKWeight87 inputs pendingInputs88}89// pendingSweepsReq is an internal message we'll use to represent an external90// caller's intent to retrieve all of the pending inputs the UtxoSweeper is91// attempting to sweep.92type pendingSweepsReq struct {93 respChan chan map[wire.OutPoint]*PendingInput94}95// PendingInput contains information about an input that is currently being96// swept by the UtxoSweeper.97type PendingInput struct {98 // OutPoint is the identify outpoint of the input being swept.99 OutPoint wire.OutPoint100 // WitnessType is the witness type of the input being swept.101 WitnessType input.WitnessType102 // Amount is the amount of the input being swept.103 Amount btcutil.Amount104 // LastFeeRate is the most recent fee rate used for the input being105 // swept within a transaction broadcast to the network.106 LastFeeRate lnwallet.SatPerKWeight107 // BroadcastAttempts is the number of attempts we've made to sweept the108 // input.109 BroadcastAttempts int110 // NextBroadcastHeight is the next height of the chain at which we'll111 // attempt to broadcast a transaction sweeping the input.112 NextBroadcastHeight uint32113}114// bumpFeeReq is an internal message we'll use to represent an external caller's115// intent to bump the fee rate of a given input.116type bumpFeeReq struct {117 input wire.OutPoint118 feePreference FeePreference119 responseChan chan *bumpFeeResp120}121// bumpFeeResp is an internal message we'll use to hand off the response of a122// bumpFeeReq from the UtxoSweeper's main event loop back to the caller.123type bumpFeeResp struct {124 resultChan chan Result125 err error126}127// UtxoSweeper is responsible for sweeping outputs back into the wallet128type UtxoSweeper struct {129 started uint32 // To be used atomically.130 stopped uint32 // To be used atomically.131 cfg *UtxoSweeperConfig132 newInputs chan *sweepInputMessage133 spendChan chan *chainntnfs.SpendDetail134 // pendingSweepsReq is a channel that will be sent requests by external135 // callers in order to retrieve the set of pending inputs the136 // UtxoSweeper is attempting to sweep.137 pendingSweepsReqs chan *pendingSweepsReq138 // bumpFeeReqs is a channel that will be sent requests by external139 // callers who wish to bump the fee rate of a given input.140 bumpFeeReqs chan *bumpFeeReq141 // pendingInputs is the total set of inputs the UtxoSweeper has been142 // requested to sweep.143 pendingInputs pendingInputs144 // timer is the channel that signals expiry of the sweep batch timer.145 timer <-chan time.Time146 testSpendChan chan wire.OutPoint147 currentOutputScript []byte148 relayFeeRate lnwallet.SatPerKWeight149 quit chan struct{}150 wg sync.WaitGroup151}152// UtxoSweeperConfig contains dependencies of UtxoSweeper.153type UtxoSweeperConfig struct {154 // GenSweepScript generates a P2WKH script belonging to the wallet where155 // funds can be swept.156 GenSweepScript func() ([]byte, error)157 // FeeEstimator is used when crafting sweep transactions to estimate158 // the necessary fee relative to the expected size of the sweep159 // transaction.160 FeeEstimator lnwallet.FeeEstimator161 // PublishTransaction facilitates the process of broadcasting a signed162 // transaction to the appropriate network.163 PublishTransaction func(*wire.MsgTx) error164 // NewBatchTimer creates a channel that will be sent on when a certain165 // time window has passed. During this time window, new inputs can still166 // be added to the sweep tx that is about to be generated.167 NewBatchTimer func() <-chan time.Time168 // Notifier is an instance of a chain notifier we'll use to watch for169 // certain on-chain events.170 Notifier chainntnfs.ChainNotifier171 // Store stores the published sweeper txes.172 Store SweeperStore173 // Signer is used by the sweeper to generate valid witnesses at the174 // time the incubated outputs need to be spent.175 Signer input.Signer176 // MaxInputsPerTx specifies the default maximum number of inputs allowed177 // in a single sweep tx. If more need to be swept, multiple txes are178 // created and published.179 MaxInputsPerTx int180 // MaxSweepAttempts specifies the maximum number of times an input is181 // included in a publish attempt before giving up and returning an error182 // to the caller.183 MaxSweepAttempts int184 // NextAttemptDeltaFunc returns given the number of already attempted185 // sweeps, how many blocks to wait before retrying to sweep.186 NextAttemptDeltaFunc func(int) int32187 // MaxFeeRate is the the maximum fee rate allowed within the188 // UtxoSweeper.189 MaxFeeRate lnwallet.SatPerKWeight190 // FeeRateBucketSize is the default size of fee rate buckets we'll use191 // when clustering inputs into buckets with similar fee rates within the192 // UtxoSweeper.193 //194 // Given a minimum relay fee rate of 1 sat/vbyte, a fee rate bucket size195 // of 10 would result in the following fee rate buckets up to the196 // maximum fee rate:197 //198 // #1: min = 1 sat/vbyte, max = 10 sat/vbyte199 // #2: min = 11 sat/vbyte, max = 20 sat/vbyte...200 FeeRateBucketSize int201}202// Result is the struct that is pushed through the result channel. Callers can203// use this to be informed of the final sweep result. In case of a remote204// spend, Err will be ErrRemoteSpend.205type Result struct {206 // Err is the final result of the sweep. It is nil when the input is207 // swept successfully by us. ErrRemoteSpend is returned when another208 // party took the input.209 Err error210 // Tx is the transaction that spent the input.211 Tx *wire.MsgTx212}213// sweepInputMessage structs are used in the internal channel between the214// SweepInput call and the sweeper main loop.215type sweepInputMessage struct {216 input input.Input217 feePreference FeePreference218 resultChan chan Result219}220// New returns a new Sweeper instance.221func New(cfg *UtxoSweeperConfig) *UtxoSweeper {222 return &UtxoSweeper{223 cfg: cfg,224 newInputs: make(chan *sweepInputMessage),225 spendChan: make(chan *chainntnfs.SpendDetail),226 bumpFeeReqs: make(chan *bumpFeeReq),227 pendingSweepsReqs: make(chan *pendingSweepsReq),228 quit: make(chan struct{}),229 pendingInputs: make(pendingInputs),230 }231}232// Start starts the process of constructing and publish sweep txes.233func (s *UtxoSweeper) Start() error {234 if !atomic.CompareAndSwapUint32(&s.started, 0, 1) {235 return nil236 }237 log.Tracef("Sweeper starting")238 // Retrieve last published tx from database.239 lastTx, err := s.cfg.Store.GetLastPublishedTx()240 if err != nil {241 return fmt.Errorf("get last published tx: %v", err)242 }243 // Republish in case the previous call crashed lnd. We don't care about244 // the return value, because inputs will be re-offered and retried245 // anyway. The only reason we republish here is to prevent the corner246 // case where lnd goes into a restart loop because of a crashing publish247 // tx where we keep deriving new output script. By publishing and248 // possibly crashing already now, we haven't derived a new output script249 // yet.250 if lastTx != nil {251 log.Debugf("Publishing last tx %v", lastTx.TxHash())252 // Error can be ignored. Because we are starting up, there are253 // no pending inputs to update based on the publish result.254 err := s.cfg.PublishTransaction(lastTx)255 if err != nil && err != lnwallet.ErrDoubleSpend {256 log.Errorf("last tx publish: %v", err)257 }258 }259 // Retrieve relay fee for dust limit calculation. Assume that this will260 // not change from here on.261 s.relayFeeRate = s.cfg.FeeEstimator.RelayFeePerKW()262 // We need to register for block epochs and retry sweeping every block.263 // We should get a notification with the current best block immediately264 // if we don't provide any epoch. We'll wait for that in the collector.265 blockEpochs, err := s.cfg.Notifier.RegisterBlockEpochNtfn(nil)266 if err != nil {267 return fmt.Errorf("register block epoch ntfn: %v", err)268 }269 // Start sweeper main loop.270 s.wg.Add(1)271 go func() {272 defer blockEpochs.Cancel()273 defer s.wg.Done()274 s.collector(blockEpochs.Epochs)275 }()276 return nil277}278// Stop stops sweeper from listening to block epochs and constructing sweep279// txes.280func (s *UtxoSweeper) Stop() error {281 if !atomic.CompareAndSwapUint32(&s.stopped, 0, 1) {282 return nil283 }284 log.Debugf("Sweeper shutting down")285 close(s.quit)286 s.wg.Wait()287 log.Debugf("Sweeper shut down")288 return nil289}290// SweepInput sweeps inputs back into the wallet. The inputs will be batched and291// swept after the batch time window ends. A custom fee preference can be292// provided to determine what fee rate should be used for the input. Note that293// the input may not always be swept with this exact value, as its possible for294// it to be batched under the same transaction with other similar fee rate295// inputs.296//297// NOTE: Extreme care needs to be taken that input isn't changed externally.298// Because it is an interface and we don't know what is exactly behind it, we299// cannot make a local copy in sweeper.300func (s *UtxoSweeper) SweepInput(input input.Input,301 feePreference FeePreference) (chan Result, error) {302 if input == nil || input.OutPoint() == nil || input.SignDesc() == nil {303 return nil, errors.New("nil input received")304 }305 // Ensure the client provided a sane fee preference.306 if _, err := s.feeRateForPreference(feePreference); err != nil {307 return nil, err308 }309 log.Infof("Sweep request received: out_point=%v, witness_type=%v, "+310 "time_lock=%v, amount=%v, fee_preference=%v", input.OutPoint(),311 input.WitnessType(), input.BlocksToMaturity(),312 btcutil.Amount(input.SignDesc().Output.Value), feePreference)313 sweeperInput := &sweepInputMessage{314 input: input,315 feePreference: feePreference,316 resultChan: make(chan Result, 1),317 }318 // Deliver input to main event loop.319 select {320 case s.newInputs <- sweeperInput:321 case <-s.quit:322 return nil, ErrSweeperShuttingDown323 }324 return sweeperInput.resultChan, nil325}326// feeRateForPreference returns a fee rate for the given fee preference. It327// ensures that the fee rate respects the bounds of the UtxoSweeper.328func (s *UtxoSweeper) feeRateForPreference(329 feePreference FeePreference) (lnwallet.SatPerKWeight, error) {330 // Ensure a type of fee preference is specified to prevent using a331 // default below.332 if feePreference.FeeRate == 0 && feePreference.ConfTarget == 0 {333 return 0, ErrNoFeePreference334 }335 feeRate, err := DetermineFeePerKw(s.cfg.FeeEstimator, feePreference)336 if err != nil {337 return 0, err338 }339 if feeRate < s.relayFeeRate {340 return 0, fmt.Errorf("fee preference resulted in invalid fee "+341 "rate %v, mininum is %v", feeRate, s.relayFeeRate)342 }343 if feeRate > s.cfg.MaxFeeRate {344 return 0, fmt.Errorf("fee preference resulted in invalid fee "+345 "rate %v, maximum is %v", feeRate, s.cfg.MaxFeeRate)346 }347 return feeRate, nil348}349// collector is the sweeper main loop. It processes new inputs, spend350// notifications and counts down to publication of the sweep tx.351func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) {352 // We registered for the block epochs with a nil request. The notifier353 // should send us the current best block immediately. So we need to wait354 // for it here because we need to know the current best height.355 var bestHeight int32356 select {357 case bestBlock := <-blockEpochs:358 bestHeight = bestBlock.Height359 case <-s.quit:360 return361 }362 for {363 select {364 // A new inputs is offered to the sweeper. We check to see if we365 // are already trying to sweep this input and if not, set up a366 // listener for spend and schedule a sweep.367 case input := <-s.newInputs:368 outpoint := *input.input.OutPoint()369 pendInput, pending := s.pendingInputs[outpoint]370 if pending {371 log.Debugf("Already pending input %v received",372 outpoint)373 // Add additional result channel to signal374 // spend of this input.375 pendInput.listeners = append(376 pendInput.listeners, input.resultChan,377 )378 continue379 }380 // Create a new pendingInput and initialize the381 // listeners slice with the passed in result channel. If382 // this input is offered for sweep again, the result383 // channel will be appended to this slice.384 pendInput = &pendingInput{385 listeners: []chan Result{input.resultChan},386 input: input.input,387 minPublishHeight: bestHeight,388 feePreference: input.feePreference,389 }390 s.pendingInputs[outpoint] = pendInput391 // Start watching for spend of this input, either by us392 // or the remote party.393 cancel, err := s.waitForSpend(394 outpoint,395 input.input.SignDesc().Output.PkScript,396 input.input.HeightHint(),397 )398 if err != nil {399 err := fmt.Errorf("wait for spend: %v", err)400 s.signalAndRemove(&outpoint, Result{Err: err})401 continue402 }403 pendInput.ntfnRegCancel = cancel404 // Check to see if with this new input a sweep tx can be405 // formed.406 if err := s.scheduleSweep(bestHeight); err != nil {407 log.Errorf("schedule sweep: %v", err)408 }409 // A spend of one of our inputs is detected. Signal sweep410 // results to the caller(s).411 case spend := <-s.spendChan:412 // For testing purposes.413 if s.testSpendChan != nil {414 s.testSpendChan <- *spend.SpentOutPoint415 }416 // Query store to find out if we every published this417 // tx.418 spendHash := *spend.SpenderTxHash419 isOurTx, err := s.cfg.Store.IsOurTx(spendHash)420 if err != nil {421 log.Errorf("cannot determine if tx %v "+422 "is ours: %v", spendHash, err,423 )424 continue425 }426 log.Debugf("Detected spend related to in flight inputs "+427 "(is_ours=%v): %v",428 newLogClosure(func() string {429 return spew.Sdump(spend.SpendingTx)430 }), isOurTx,431 )432 // Signal sweep results for inputs in this confirmed433 // tx.434 for _, txIn := range spend.SpendingTx.TxIn {435 outpoint := txIn.PreviousOutPoint436 // Check if this input is known to us. It could437 // probably be unknown if we canceled the438 // registration, deleted from pendingInputs but439 // the ntfn was in-flight already. Or this could440 // be not one of our inputs.441 _, ok := s.pendingInputs[outpoint]442 if !ok {443 continue444 }445 // Return either a nil or a remote spend result.446 var err error447 if !isOurTx {448 err = ErrRemoteSpend449 }450 // Signal result channels.451 s.signalAndRemove(&outpoint, Result{452 Tx: spend.SpendingTx,453 Err: err,454 })455 }456 // Now that an input of ours is spent, we can try to457 // resweep the remaining inputs.458 if err := s.scheduleSweep(bestHeight); err != nil {459 log.Errorf("schedule sweep: %v", err)460 }461 // A new external request has been received to retrieve all of462 // the inputs we're currently attempting to sweep.463 case req := <-s.pendingSweepsReqs:464 req.respChan <- s.handlePendingSweepsReq(req)465 // A new external request has been received to bump the fee rate466 // of a given input.467 case req := <-s.bumpFeeReqs:468 resultChan, err := s.handleBumpFeeReq(req, bestHeight)469 req.responseChan <- &bumpFeeResp{470 resultChan: resultChan,471 err: err,472 }473 // The timer expires and we are going to (re)sweep.474 case <-s.timer:475 log.Debugf("Sweep timer expired")476 // Set timer to nil so we know that a new timer needs to477 // be started when new inputs arrive.478 s.timer = nil479 // We'll attempt to cluster all of our inputs with480 // similar fee rates. Before attempting to sweep them,481 // we'll sort them in descending fee rate order. We do482 // this to ensure any inputs which have had their fee483 // rate bumped are broadcast first in order enforce the484 // RBF policy.485 inputClusters := s.clusterBySweepFeeRate()486 sort.Slice(inputClusters, func(i, j int) bool {487 return inputClusters[i].sweepFeeRate >488 inputClusters[j].sweepFeeRate489 })490 for _, cluster := range inputClusters {491 // Examine pending inputs and try to construct492 // lists of inputs.493 inputLists, err := s.getInputLists(494 cluster, bestHeight,495 )496 if err != nil {497 log.Errorf("Unable to examine pending "+498 "inputs: %v", err)499 continue500 }501 // Sweep selected inputs.502 for _, inputs := range inputLists {503 err := s.sweep(504 inputs, cluster.sweepFeeRate,505 bestHeight,506 )507 if err != nil {508 log.Errorf("Unable to sweep "+509 "inputs: %v", err)510 }511 }512 }513 // A new block comes in. Things may have changed, so we retry a514 // sweep.515 case epoch, ok := <-blockEpochs:516 if !ok {517 return518 }519 bestHeight = epoch.Height520 log.Debugf("New block: height=%v, sha=%v",521 epoch.Height, epoch.Hash)522 if err := s.scheduleSweep(bestHeight); err != nil {523 log.Errorf("schedule sweep: %v", err)524 }525 case <-s.quit:526 return527 }528 }529}530// bucketForFeeReate determines the proper bucket for a fee rate. This is done531// in order to batch inputs with similar fee rates together.532func (s *UtxoSweeper) bucketForFeeRate(533 feeRate lnwallet.SatPerKWeight) lnwallet.SatPerKWeight {534 minBucket := s.relayFeeRate + lnwallet.SatPerKWeight(s.cfg.FeeRateBucketSize)535 return lnwallet.SatPerKWeight(536 math.Ceil(float64(feeRate) / float64(minBucket)),537 )538}539// clusterBySweepFeeRate takes the set of pending inputs within the UtxoSweeper540// and clusters those together with similar fee rates. Each cluster contains a541// sweep fee rate, which is determined by calculating the average fee rate of542// all inputs within that cluster.543func (s *UtxoSweeper) clusterBySweepFeeRate() []inputCluster {544 bucketInputs := make(map[lnwallet.SatPerKWeight]pendingInputs)545 inputFeeRates := make(map[wire.OutPoint]lnwallet.SatPerKWeight)546 // First, we'll group together all inputs with similar fee rates. This547 // is done by determining the fee rate bucket they should belong in.548 for op, input := range s.pendingInputs {549 feeRate, err := s.feeRateForPreference(input.feePreference)550 if err != nil {551 log.Warnf("Skipping input %v: %v", op, err)552 continue553 }554 bucket := s.bucketForFeeRate(feeRate)555 inputs, ok := bucketInputs[bucket]556 if !ok {557 inputs = make(pendingInputs)558 bucketInputs[bucket] = inputs559 }560 input.lastFeeRate = feeRate561 inputs[op] = input562 inputFeeRates[op] = feeRate563 }564 // We'll then determine the sweep fee rate for each set of inputs by565 // calculating the average fee rate of the inputs within each set.566 inputClusters := make([]inputCluster, 0, len(bucketInputs))567 for _, inputs := range bucketInputs {568 var sweepFeeRate lnwallet.SatPerKWeight569 for op := range inputs {570 sweepFeeRate += inputFeeRates[op]571 }572 sweepFeeRate /= lnwallet.SatPerKWeight(len(inputs))573 inputClusters = append(inputClusters, inputCluster{574 sweepFeeRate: sweepFeeRate,575 inputs: inputs,576 })577 }578 return inputClusters579}580// scheduleSweep starts the sweep timer to create an opportunity for more inputs581// to be added.582func (s *UtxoSweeper) scheduleSweep(currentHeight int32) error {583 // The timer is already ticking, no action needed for the sweep to584 // happen.585 if s.timer != nil {586 log.Debugf("Timer still ticking")587 return nil588 }589 // We'll only start our timer once we have inputs we're able to sweep.590 startTimer := false591 for _, cluster := range s.clusterBySweepFeeRate() {592 // Examine pending inputs and try to construct lists of inputs.593 inputLists, err := s.getInputLists(cluster, currentHeight)594 if err != nil {595 return fmt.Errorf("get input lists: %v", err)596 }597 log.Infof("Sweep candidates at height=%v with fee_rate=%v, "+598 "yield %v distinct txns", currentHeight,599 cluster.sweepFeeRate, len(inputLists))600 if len(inputLists) != 0 {601 startTimer = true602 break603 }604 }605 if !startTimer {606 return nil607 }608 // Start sweep timer to create opportunity for more inputs to be added609 // before a tx is constructed.610 s.timer = s.cfg.NewBatchTimer()611 log.Debugf("Sweep timer started")612 return nil613}614// signalAndRemove notifies the listeners of the final result of the input615// sweep. It cancels any pending spend notification and removes the input from616// the list of pending inputs. When this function returns, the sweeper has617// completely forgotten about the input.618func (s *UtxoSweeper) signalAndRemove(outpoint *wire.OutPoint, result Result) {619 pendInput := s.pendingInputs[*outpoint]620 listeners := pendInput.listeners621 if result.Err == nil {622 log.Debugf("Dispatching sweep success for %v to %v listeners",623 outpoint, len(listeners),624 )625 } else {626 log.Debugf("Dispatching sweep error for %v to %v listeners: %v",627 outpoint, len(listeners), result.Err,628 )629 }630 // Signal all listeners. Channel is buffered. Because we only send once631 // on every channel, it should never block.632 for _, resultChan := range listeners {633 resultChan <- result634 }635 // Cancel spend notification with chain notifier. This is not necessary636 // in case of a success, except for that a reorg could still happen.637 if pendInput.ntfnRegCancel != nil {638 log.Debugf("Canceling spend ntfn for %v", outpoint)639 pendInput.ntfnRegCancel()640 }641 // Inputs are no longer pending after result has been sent.642 delete(s.pendingInputs, *outpoint)643}644// getInputLists goes through the given inputs and constructs multiple distinct645// sweep lists with the given fee rate, each up to the configured maximum number646// of inputs. Negative yield inputs are skipped. Transactions with an output647// below the dust limit are not published. Those inputs remain pending and will648// be bundled with future inputs if possible.649func (s *UtxoSweeper) getInputLists(cluster inputCluster,650 currentHeight int32) ([]inputSet, error) {651 // Filter for inputs that need to be swept. Create two lists: all652 // sweepable inputs and a list containing only the new, never tried653 // inputs.654 //655 // We want to create as large a tx as possible, so we return a final set656 // list that starts with sets created from all inputs. However, there is657 // a chance that those txes will not publish, because they already658 // contain inputs that failed before. Therefore we also add sets659 // consisting of only new inputs to the list, to make sure that new660 // inputs are given a good, isolated chance of being published.661 var newInputs, retryInputs []input.Input662 for _, input := range cluster.inputs {663 // Skip inputs that have a minimum publish height that is not664 // yet reached.665 if input.minPublishHeight > currentHeight {666 continue667 }668 // Add input to the either one of the lists.669 if input.publishAttempts == 0 {670 newInputs = append(newInputs, input.input)671 } else {672 retryInputs = append(retryInputs, input.input)673 }674 }675 // If there is anything to retry, combine it with the new inputs and676 // form input sets.677 var allSets []inputSet678 if len(retryInputs) > 0 {679 var err error680 allSets, err = generateInputPartitionings(681 append(retryInputs, newInputs...), s.relayFeeRate,682 cluster.sweepFeeRate, s.cfg.MaxInputsPerTx,683 )684 if err != nil {685 return nil, fmt.Errorf("input partitionings: %v", err)686 }687 }688 // Create sets for just the new inputs.689 newSets, err := generateInputPartitionings(690 newInputs, s.relayFeeRate, cluster.sweepFeeRate,691 s.cfg.MaxInputsPerTx,692 )693 if err != nil {694 return nil, fmt.Errorf("input partitionings: %v", err)695 }696 log.Debugf("Sweep candidates at height=%v: total_num_pending=%v, "+697 "total_num_new=%v", currentHeight, len(allSets), len(newSets))698 // Append the new sets at the end of the list, because those tx likely699 // have a higher fee per input.700 return append(allSets, newSets...), nil701}702// sweep takes a set of preselected inputs, creates a sweep tx and publishes the703// tx. The output address is only marked as used if the publish succeeds.704func (s *UtxoSweeper) sweep(inputs inputSet, feeRate lnwallet.SatPerKWeight,705 currentHeight int32) error {706 // Generate an output script if there isn't an unused script available.707 if s.currentOutputScript == nil {708 pkScript, err := s.cfg.GenSweepScript()709 if err != nil {710 return fmt.Errorf("gen sweep script: %v", err)711 }712 s.currentOutputScript = pkScript713 }714 // Create sweep tx.715 tx, err := createSweepTx(716 inputs, s.currentOutputScript, uint32(currentHeight), feeRate,717 s.cfg.Signer,718 )719 if err != nil {720 return fmt.Errorf("create sweep tx: %v", err)721 }722 // Add tx before publication, so that we will always know that a spend723 // by this tx is ours. Otherwise if the publish doesn't return, but did724 // publish, we loose track of this tx. Even republication on startup725 // doesn't prevent this, because that call returns a double spend error726 // then and would also not add the hash to the store.727 err = s.cfg.Store.NotifyPublishTx(tx)728 if err != nil {729 return fmt.Errorf("notify publish tx: %v", err)730 }731 // Publish sweep tx.732 log.Debugf("Publishing sweep tx %v, num_inputs=%v, height=%v",733 tx.TxHash(), len(tx.TxIn), currentHeight)734 log.Tracef("Sweep tx at height=%v: %v", currentHeight,735 newLogClosure(func() string {736 return spew.Sdump(tx)737 }),738 )739 err = s.cfg.PublishTransaction(tx)740 // In case of an unexpected error, don't try to recover.741 if err != nil && err != lnwallet.ErrDoubleSpend {742 return fmt.Errorf("publish tx: %v", err)743 }744 // Keep the output script in case of an error, so that it can be reused745 // for the next transaction and causes no address inflation.746 if err == nil {747 s.currentOutputScript = nil748 }749 // Reschedule sweep.750 for _, input := range tx.TxIn {751 pi, ok := s.pendingInputs[input.PreviousOutPoint]752 if !ok {753 // It can be that the input has been removed because it754 // exceed the maximum number of attempts in a previous755 // input set.756 continue757 }758 // Record another publish attempt.759 pi.publishAttempts++760 // We don't care what the result of the publish call was. Even761 // if it is published successfully, it can still be that it762 // needs to be retried. Call NextAttemptDeltaFunc to calculate763 // when to resweep this input.764 nextAttemptDelta := s.cfg.NextAttemptDeltaFunc(765 pi.publishAttempts,766 )767 pi.minPublishHeight = currentHeight + nextAttemptDelta768 log.Debugf("Rescheduling input %v after %v attempts at "+769 "height %v (delta %v)", input.PreviousOutPoint,770 pi.publishAttempts, pi.minPublishHeight,771 nextAttemptDelta)772 if pi.publishAttempts >= s.cfg.MaxSweepAttempts {773 // Signal result channels sweep result.774 s.signalAndRemove(&input.PreviousOutPoint, Result{775 Err: ErrTooManyAttempts,776 })777 }778 }779 return nil780}781// waitForSpend registers a spend notification with the chain notifier. It782// returns a cancel function that can be used to cancel the registration.783func (s *UtxoSweeper) waitForSpend(outpoint wire.OutPoint,784 script []byte, heightHint uint32) (func(), error) {785 log.Debugf("Wait for spend of %v", outpoint)786 spendEvent, err := s.cfg.Notifier.RegisterSpendNtfn(787 &outpoint, script, heightHint,788 )789 if err != nil {790 return nil, fmt.Errorf("register spend ntfn: %v", err)791 }792 s.wg.Add(1)793 go func() {794 defer s.wg.Done()795 select {796 case spend, ok := <-spendEvent.Spend:797 if !ok {798 log.Debugf("Spend ntfn for %v canceled",799 outpoint)800 return801 }802 log.Debugf("Delivering spend ntfn for %v",803 outpoint)804 select {805 case s.spendChan <- spend:806 log.Debugf("Delivered spend ntfn for %v",807 outpoint)808 case <-s.quit:809 }810 case <-s.quit:811 }812 }()813 return spendEvent.Cancel, nil814}815// PendingInputs returns the set of inputs that the UtxoSweeper is currently816// attempting to sweep.817func (s *UtxoSweeper) PendingInputs() (map[wire.OutPoint]*PendingInput, error) {818 respChan := make(chan map[wire.OutPoint]*PendingInput, 1)819 select {820 case s.pendingSweepsReqs <- &pendingSweepsReq{821 respChan: respChan,822 }:823 case <-s.quit:824 return nil, ErrSweeperShuttingDown825 }826 select {827 case pendingSweeps := <-respChan:828 return pendingSweeps, nil829 case <-s.quit:830 return nil, ErrSweeperShuttingDown831 }832}833// handlePendingSweepsReq handles a request to retrieve all pending inputs the834// UtxoSweeper is attempting to sweep.835func (s *UtxoSweeper) handlePendingSweepsReq(836 req *pendingSweepsReq) map[wire.OutPoint]*PendingInput {837 pendingInputs := make(map[wire.OutPoint]*PendingInput, len(s.pendingInputs))838 for _, pendingInput := range s.pendingInputs {839 // Only the exported fields are set, as we expect the response840 // to only be consumed externally.841 op := *pendingInput.input.OutPoint()842 pendingInputs[op] = &PendingInput{843 OutPoint: op,844 WitnessType: pendingInput.input.WitnessType(),845 Amount: btcutil.Amount(846 pendingInput.input.SignDesc().Output.Value,847 ),848 LastFeeRate: pendingInput.lastFeeRate,849 BroadcastAttempts: pendingInput.publishAttempts,850 NextBroadcastHeight: uint32(pendingInput.minPublishHeight),851 }852 }853 return pendingInputs854}855// BumpFee allows bumping the fee of an input being swept by the UtxoSweeper856// according to the provided fee preference. The new fee preference will be used857// for a new sweep transaction of the input that will act as a replacement858// transaction (RBF) of the original sweeping transaction, if any.859//860// NOTE: This currently doesn't do any fee rate validation to ensure that a bump861// is actually successful. The responsibility of doing so should be handled by862// the caller.863func (s *UtxoSweeper) BumpFee(input wire.OutPoint,864 feePreference FeePreference) (chan Result, error) {865 // Ensure the client provided a sane fee preference.866 if _, err := s.feeRateForPreference(feePreference); err != nil {867 return nil, err868 }869 responseChan := make(chan *bumpFeeResp, 1)870 select {871 case s.bumpFeeReqs <- &bumpFeeReq{872 input: input,873 feePreference: feePreference,874 responseChan: responseChan,875 }:876 case <-s.quit:877 return nil, ErrSweeperShuttingDown878 }879 select {880 case response := <-responseChan:881 return response.resultChan, response.err882 case <-s.quit:883 return nil, ErrSweeperShuttingDown884 }885}886// handleBumpFeeReq handles a bump fee request by simply updating the inputs fee887// preference. Currently, no validation is done on the new fee preference to888// ensure it will properly create a replacement transaction.889//890// TODO(wilmer):891// * Validate fee preference to ensure we'll create a valid replacement892// transaction to allow the new fee rate to propagate throughout the893// network.894// * Ensure we don't combine this input with any other unconfirmed inputs that895// did not exist in the original sweep transaction, resulting in an invalid896// replacement transaction.897func (s *UtxoSweeper) handleBumpFeeReq(req *bumpFeeReq,898 bestHeight int32) (chan Result, error) {899 // If the UtxoSweeper is already trying to sweep this input, then we can900 // simply just increase its fee rate. This will allow the input to be901 // batched with others which also have a similar fee rate, creating a902 // higher fee rate transaction that replaces the original input's903 // sweeping transaction.904 pendingInput, ok := s.pendingInputs[req.input]905 if !ok {906 return nil, lnwallet.ErrNotMine907 }908 log.Debugf("Updating fee preference for %v from %v to %v", req.input,909 pendingInput.feePreference, req.feePreference)910 pendingInput.feePreference = req.feePreference911 // We'll reset the input's publish height to the current so that a new912 // transaction can be created that replaces the transaction currently913 // spending the input. We only do this for inputs that have been914 // broadcast at least once to ensure we don't spend an input before its915 // maturity height.916 //917 // NOTE: The UtxoSweeper is not yet offered time-locked inputs, so the918 // check for broadcast attempts is redundant at the moment....
state.go
Source:state.go
...15 Players: make(map[uint32]*entity.Player),16 Projectiles: make(map[uint32]*entity.Projectile),17 // Zombies: make(map[int]*entity.Zombie),18 EventQ: e,19 pendingInputs: make(chan *message.NetworkInput, 1000000),20 DeltaTime: 0,21 Before: time.Now(),22 }23}24//Start the broadcast timer25func (g *GameState) Start() {26 println("Game State Initialized")27 // seconds := time.Duration(1000 / 30)28 // // ticker := time.Tick(seconds * time.Millisecond)29}30// func (g *GameState) broadcastState(t <-chan time.Time) {31// for {32// select {33// case <-t:34// g.EventQueue.FireGameState(message.SendState())35// }36// }37// }38//GameState Whole game state39type GameState struct {40 requests chan message.UserInput41 Players map[uint32]*entity.Player42 Projectiles map[uint32]*entity.Projectile43 EventQ *events.EventQueue44 Before time.Time45 DeltaTime float6446 pendingInputs chan *message.NetworkInput47}48//HandleInput request49func (g *GameState) HandleInput(m *message.NetworkInput) {50 // lag := time.Duration(100 * time.Microsecond)51 // time.Sleep(lag)52 g.pendingInputs <- m53}54//HandleTimeStep runs physics step55func (g *GameState) HandleTimeStep(frame int) {56 g.UpdatePlayers()57 g.UpdatePhysics(frame)58 before := g.Before59 now := time.Now().Sub(before)60 g.Before = time.Now()61 t := float64(now/time.Millisecond) / 1000.062 g.DeltaTime = t63}64//UpdatePhysics s65func (g *GameState) UpdatePhysics(frame int) {66 g.updateShooting()67 g.UpdatePlayers()68 g.checkHits(g.Players)69 f := int(math.Floor(60 / 10))70 if frame%f == 0 {71 g.EventQ.FireGameState(g.CopyState())72 }73 g.updateProjectiles()74}75// func (g *GameState) HandleStartBroadcast() {76// g.EventQ.FireGameState(g.CopyState())77// }78//Broadcast state79func (g *GameState) Broadcast() {80 tick := time.Tick(100 * time.Millisecond)81 for {82 select {83 case <-tick:84 g.EventQ.FireGameState(g.CopyState())85 default:86 time.Sleep(5 * time.Millisecond)87 }88 }89}90//UpdatePlayers update players each server tick91func (g *GameState) UpdatePlayers() {92 applied := 093 for len(g.pendingInputs) > 0 {94 input := <-g.pendingInputs95 player, found := g.Players[input.ID]96 if found {97 player.UpdatePlayer(input)98 g.updatePlayerState(player)99 }100 applied++101 }102}103func (g *GameState) updatePlayerState(player *entity.Player) {104 // player.Update(g.DeltaTime)105 before := player.LastShot106 now := time.Now()107 diff := now.Sub(before) / time.Millisecond108 // println(diff)...
pendingInputs
Using AI Code Generation
1import (2func main() {3 db, _ := ethdb.NewMemDatabase()4 statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))5 tx := types.NewTransaction(0, common.HexToAddress("0x1"), new(big.Int), 0, new(big.Int), nil)6 receipt := types.NewReceipt(nil, false, 0)7 statedb.AddTxLookupEntry(tx.Hash(), common.Hash{})8 statedb.AddReceipt(tx.Hash(), receipt)9 pendingInputs := statedb.PendingInputs()10 fmt.Println(pendingInputs)11}12import (13func main() {14 db, _ := ethdb.NewMemDatabase()15 statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))16 tx := types.NewTransaction(0, common.HexToAddress("0x1"), new(big.Int), 0, new(big.Int), nil)17 receipt := types.NewReceipt(nil, false, 0)18 statedb.AddTxLookupEntry(tx.Hash(), common.Hash{})19 statedb.AddReceipt(tx.Hash(), receipt)
pendingInputs
Using AI Code Generation
1import (2func main() {3 db, _ := ethdb.NewMemDatabase()4 state, _ := state.New(common.Hash{}, state.NewDatabase(db))5 state.AddBalance(common.Address{1}, common.Big1)6 state.AddBalance(common.Address{2}, common.Big2)7 state.AddBalance(common.Address{3}, common.Big3)8 state.AddBalance(common.Address{4}, common.Big4)9 state.AddBalance(common.Address{5}, common.Big5)10 state.AddBalance(common.Address{6}, common.Big6)11 state.AddBalance(common.Address{7}, common.Big7)12 state.AddBalance(common.Address{8}, common.Big8)13 state.AddBalance(common.Address{9}, common.Big9)14 state.AddBalance(common.Address{10}, common.Big10)15 state.AddBalance(common.Address{11}, common.Big11)16 state.AddBalance(common.Address{12}, common.Big12)17 state.AddBalance(common.Address{13}, common.Big13)18 state.AddBalance(common.Address{14}, common.Big14)19 state.AddBalance(common.Address{15}, common.Big15)20 state.AddBalance(common.Address{16}, common.Big16)21 state.AddBalance(common.Address{17}, common.Big17)22 state.AddBalance(common.Address{18}, common.Big18)23 state.AddBalance(common.Address{19}, common.Big19)24 state.AddBalance(common.Address{20}, common.Big20)25 state.AddBalance(common.Address{21}, common.Big21)26 state.AddBalance(common.Address{22}, common.Big22)27 state.AddBalance(common.Address{23}, common.Big23)28 state.AddBalance(common.Address{24}, common.Big24)29 state.AddBalance(common.Address{25}, common.Big25)30 state.AddBalance(common.Address{26}, common.Big26)31 state.AddBalance(common.Address{27}, common.Big27)32 state.AddBalance(common.Address{28}, common.Big28)33 state.AddBalance(common.Address{29}, common.Big29)34 state.AddBalance(common.Address{30}, common.Big30)35 state.AddBalance(common.Address{31}, common.Big31)36 state.AddBalance(common.Address
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!