From a6bc10fb571a04e1a2a1c8a18b5449e128e8e238 Mon Sep 17 00:00:00 2001 From: ron Date: Thu, 8 Aug 2024 15:26:06 +0800 Subject: [PATCH 01/16] Decentralize parachain relayer --- relayer/relays/parachain/beefy-listener.go | 84 ++++++++++++------- relayer/relays/parachain/config.go | 27 +++++- relayer/relays/parachain/main.go | 3 + relayer/relays/parachain/scanner.go | 13 +++ relayer/relays/parachain/types.go | 2 + web/packages/test/config/parachain-relay.json | 5 ++ web/packages/test/scripts/start-relayer.sh | 46 ++++++++-- 7 files changed, 142 insertions(+), 38 deletions(-) diff --git a/relayer/relays/parachain/beefy-listener.go b/relayer/relays/parachain/beefy-listener.go index dd9bc0f98..2c39f9dcf 100644 --- a/relayer/relays/parachain/beefy-listener.go +++ b/relayer/relays/parachain/beefy-listener.go @@ -2,8 +2,8 @@ package parachain import ( "context" - "errors" "fmt" + "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" @@ -22,6 +22,7 @@ import ( type BeefyListener struct { config *SourceConfig + relayConfig *RelayerConfig ethereumConn *ethereum.Connection beefyClientContract *contracts.BeefyClient relaychainConn *relaychain.Connection @@ -33,6 +34,7 @@ type BeefyListener struct { func NewBeefyListener( config *SourceConfig, + relayConfig *RelayerConfig, ethereumConn *ethereum.Connection, relaychainConn *relaychain.Connection, parachainConnection *parachain.Connection, @@ -40,6 +42,7 @@ func NewBeefyListener( ) *BeefyListener { return &BeefyListener{ config: config, + relayConfig: relayConfig, ethereumConn: ethereumConn, relaychainConn: relaychainConn, parachainConnection: parachainConnection, @@ -82,27 +85,24 @@ func (li *BeefyListener) Start(ctx context.Context, eg *errgroup.Group) error { eg.Go(func() error { defer close(li.tasks) - // Subscribe NewMMRRoot event logs and fetch parachain message commitments - // since latest beefy block - beefyBlockNumber, _, err := li.fetchLatestBeefyBlock(ctx) - if err != nil { - return fmt.Errorf("fetch latest beefy block: %w", err) - } - - err = li.doScan(ctx, beefyBlockNumber) - if err != nil { - return fmt.Errorf("scan for sync tasks bounded by BEEFY block %v: %w", beefyBlockNumber, err) - } + ticker := time.NewTicker(time.Second * 30) + for { + beefyBlockNumber, _, err := li.fetchLatestBeefyBlock(ctx) + if err != nil { + return fmt.Errorf("fetch latest beefy block: %w", err) + } - err = li.subscribeNewMMRRoots(ctx) - if err != nil { - if errors.Is(err, context.Canceled) { + err = li.doScan(ctx, beefyBlockNumber) + if err != nil { + return fmt.Errorf("scan for sync tasks bounded by BEEFY block %v: %w", beefyBlockNumber, err) + } + select { + case <-ctx.Done(): return nil + case <-ticker.C: + continue } - return err } - - return nil }) return nil @@ -154,18 +154,27 @@ func (li *BeefyListener) doScan(ctx context.Context, beefyBlockNumber uint64) er if err != nil { return err } - - for _, task := range tasks { - // do final proof generation right before sending. The proof needs to be fresh. - task.ProofOutput, err = li.generateProof(ctx, task.ProofInput, task.Header) - if err != nil { - return err - } - select { - case <-ctx.Done(): - return ctx.Err() - case li.tasks <- task: - log.Info("Beefy Listener emitted new task") + latestBlockNumber, err := li.scanner.findLatestBlockNumber() + if err != nil { + return err + } + if len(tasks) > 0 { + task := tasks[0] + if li.isAssigned(task) || li.isTimeout(task, latestBlockNumber) { + log.Info(fmt.Sprintf("Nonce %d round-robin to current relay:%d", (*task.MessageProofs)[0].Message.Nonce, li.relayConfig.ID)) + task.RelayID = li.relayConfig.ID + task.ProofOutput, err = li.generateProof(ctx, task.ProofInput, task.Header) + if err != nil { + return err + } + select { + case <-ctx.Done(): + return ctx.Err() + case li.tasks <- task: + log.Info("Beefy Listener emitted new task") + } + } else { + log.Info(fmt.Sprintf("Nonce %d does not belong to current relay:%d", (*task.MessageProofs)[0].Message.Nonce, li.relayConfig.ID)) } } @@ -285,3 +294,18 @@ func (li *BeefyListener) generateProof(ctx context.Context, input *ProofInput, h return &output, nil } + +func (li *BeefyListener) isAssigned(task *Task) bool { + proofs := *task.MessageProofs + if len(proofs) == 0 { + return false + } + if proofs[0].Message.Nonce%li.relayConfig.Num == li.relayConfig.ID { + return true + } + return false +} + +func (li *BeefyListener) isTimeout(task *Task, latestBlock uint64) bool { + return uint64(task.Header.Number)+li.relayConfig.Timeout < latestBlock +} diff --git a/relayer/relays/parachain/config.go b/relayer/relays/parachain/config.go index fcd8a32e4..79ecbf6cb 100644 --- a/relayer/relays/parachain/config.go +++ b/relayer/relays/parachain/config.go @@ -1,13 +1,16 @@ package parachain import ( + "errors" "fmt" + "github.com/snowfork/snowbridge/relayer/config" ) type Config struct { - Source SourceConfig `mapstructure:"source"` - Sink SinkConfig `mapstructure:"sink"` + Source SourceConfig `mapstructure:"source"` + Sink SinkConfig `mapstructure:"sink"` + Relay RelayerConfig `mapstructure:"relay"` } type SourceConfig struct { @@ -32,6 +35,19 @@ type SinkContractsConfig struct { Gateway string `mapstructure:"Gateway"` } +type RelayerConfig struct { + ID uint64 `mapstructure:"id"` + Num uint64 `mapstructure:"num"` + Timeout uint64 `mapstructure:"timeout"` +} + +func (r RelayerConfig) Validate() error { + if r.Num < 1 { + return errors.New("Number of relayer is not set") + } + return nil +} + type ChannelID [32]byte func (c Config) Validate() error { @@ -66,5 +82,12 @@ func (c Config) Validate() error { if c.Sink.Contracts.Gateway == "" { return fmt.Errorf("sink contracts setting [Gateway] is not set") } + + // Relay + err = c.Relay.Validate() + if err != nil { + return fmt.Errorf("relay config: %w", err) + } + return nil } diff --git a/relayer/relays/parachain/main.go b/relayer/relays/parachain/main.go index 50bb95dcd..c2173bbff 100644 --- a/relayer/relays/parachain/main.go +++ b/relayer/relays/parachain/main.go @@ -47,6 +47,7 @@ func NewRelay(config *Config, keypair *secp256k1.Keypair) (*Relay, error) { beefyListener := NewBeefyListener( &config.Source, + &config.Relay, ethereumConnBeefy, relaychainConn, parachainConn, @@ -97,5 +98,7 @@ func (relay *Relay) Start(ctx context.Context, eg *errgroup.Group) error { return err } + log.Info("Current relay's ID:", relay.config.Relay.ID) + return nil } diff --git a/relayer/relays/parachain/scanner.go b/relayer/relays/parachain/scanner.go index b71687309..6c534efd1 100644 --- a/relayer/relays/parachain/scanner.go +++ b/relayer/relays/parachain/scanner.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "github.com/snowfork/go-substrate-rpc-client/v4/scale" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -457,3 +458,15 @@ func fetchMessageProof( return MessageProof{Message: message, Proof: proof}, nil } + +func (s *Scanner) findLatestBlockNumber() (uint64, error) { + headerHash, err := s.paraConn.API().RPC.Chain.GetFinalizedHead() + if err != nil { + return 0, fmt.Errorf("fetch parachain hash: %w", err) + } + header, err := s.paraConn.API().RPC.Chain.GetHeader(headerHash) + if err != nil { + return 0, fmt.Errorf("fetch parachain header: %w", err) + } + return uint64(header.Number), nil +} diff --git a/relayer/relays/parachain/types.go b/relayer/relays/parachain/types.go index 632b7544d..ac97173a8 100644 --- a/relayer/relays/parachain/types.go +++ b/relayer/relays/parachain/types.go @@ -20,6 +20,8 @@ type Task struct { ProofOutput *ProofOutput // Proofs for messages from outbound channel on Polkadot MessageProofs *[]MessageProof + // ID of the relayer for the message + RelayID uint64 } // A ProofInput is data needed to generate a proof of parachain header inclusion diff --git a/web/packages/test/config/parachain-relay.json b/web/packages/test/config/parachain-relay.json index 8af630607..e3da0c953 100644 --- a/web/packages/test/config/parachain-relay.json +++ b/web/packages/test/config/parachain-relay.json @@ -24,5 +24,10 @@ "contracts": { "Gateway": null } + }, + "relay": { + "id": null, + "num": 2, + "timeout": 20 } } diff --git a/web/packages/test/scripts/start-relayer.sh b/web/packages/test/scripts/start-relayer.sh index 5977de9f5..eb32ac49f 100755 --- a/web/packages/test/scripts/start-relayer.sh +++ b/web/packages/test/scripts/start-relayer.sh @@ -54,7 +54,7 @@ config_relayer() { ' \ config/parachain-relay.json >$output_dir/parachain-relay-bridge-hub-02.json - # Configure parachain relay (asset hub) + # Configure parachain relay (asset hub)-0 jq \ --arg k1 "$(address_for GatewayProxy)" \ --arg k2 "$(address_for BeefyClient)" \ @@ -70,8 +70,29 @@ config_relayer() { | .sink.ethereum.endpoint = $eth_writer_endpoint | .sink.ethereum."gas-limit" = $eth_gas_limit | .source."channel-id" = $channelID + | .relay.id = 0 ' \ - config/parachain-relay.json >$output_dir/parachain-relay-asset-hub.json + config/parachain-relay.json >$output_dir/parachain-relay-asset-hub-0.json + + # Configure parachain relay (asset hub)-1 + jq \ + --arg k1 "$(address_for GatewayProxy)" \ + --arg k2 "$(address_for BeefyClient)" \ + --arg eth_endpoint_ws $eth_endpoint_ws \ + --arg eth_writer_endpoint $eth_writer_endpoint \ + --arg channelID $ASSET_HUB_CHANNEL_ID \ + --arg eth_gas_limit $eth_gas_limit \ + ' + .source.contracts.Gateway = $k1 + | .source.contracts.BeefyClient = $k2 + | .sink.contracts.Gateway = $k1 + | .source.ethereum.endpoint = $eth_endpoint_ws + | .sink.ethereum.endpoint = $eth_writer_endpoint + | .sink.ethereum."gas-limit" = $eth_gas_limit + | .source."channel-id" = $channelID + | .relay.id = 1 + ' \ + config/parachain-relay.json >$output_dir/parachain-relay-asset-hub-1.json # Configure parachain relay (penpal) jq \ @@ -172,15 +193,28 @@ start_relayer() { done ) & - # Launch parachain relay for assethub + # Launch parachain relay 0 for assethub ( - : >"$output_dir"/parachain-relay-asset-hub.log + : >"$output_dir"/parachain-relay-asset-hub-0.log while :; do echo "Starting parachain relay (asset-hub) at $(date)" "${relay_bin}" run parachain \ - --config "$output_dir/parachain-relay-asset-hub.json" \ + --config "$output_dir/parachain-relay-asset-hub-0.json" \ --ethereum.private-key $parachain_relay_assethub_eth_key \ - >>"$output_dir"/parachain-relay-asset-hub.log 2>&1 || true + >>"$output_dir"/parachain-relay-asset-hub-0.log 2>&1 || true + sleep 20 + done + ) & + + # Launch parachain relay 1 for assethub + ( + : >"$output_dir"/parachain-relay-asset-hub-1.log + while :; do + echo "Starting parachain relay (asset-hub) at $(date)" + "${relay_bin}" run parachain \ + --config "$output_dir/parachain-relay-asset-hub-1.json" \ + --ethereum.private-key $parachain_relay_secondary_gov_eth_key \ + >>"$output_dir"/parachain-relay-asset-hub-1.log 2>&1 || true sleep 20 done ) & From 2fe3c30403156117ef2013c676c05b2943e63390 Mon Sep 17 00:00:00 2001 From: ron Date: Thu, 8 Aug 2024 16:06:08 +0800 Subject: [PATCH 02/16] Cleanup --- relayer/relays/parachain/beefy-listener.go | 6 +----- relayer/relays/parachain/scanner.go | 12 ------------ 2 files changed, 1 insertion(+), 17 deletions(-) diff --git a/relayer/relays/parachain/beefy-listener.go b/relayer/relays/parachain/beefy-listener.go index 2c39f9dcf..3708c1639 100644 --- a/relayer/relays/parachain/beefy-listener.go +++ b/relayer/relays/parachain/beefy-listener.go @@ -154,13 +154,9 @@ func (li *BeefyListener) doScan(ctx context.Context, beefyBlockNumber uint64) er if err != nil { return err } - latestBlockNumber, err := li.scanner.findLatestBlockNumber() - if err != nil { - return err - } if len(tasks) > 0 { task := tasks[0] - if li.isAssigned(task) || li.isTimeout(task, latestBlockNumber) { + if li.isAssigned(task) || li.isTimeout(task, beefyBlockNumber) { log.Info(fmt.Sprintf("Nonce %d round-robin to current relay:%d", (*task.MessageProofs)[0].Message.Nonce, li.relayConfig.ID)) task.RelayID = li.relayConfig.ID task.ProofOutput, err = li.generateProof(ctx, task.ProofInput, task.Header) diff --git a/relayer/relays/parachain/scanner.go b/relayer/relays/parachain/scanner.go index 6c534efd1..cb450785b 100644 --- a/relayer/relays/parachain/scanner.go +++ b/relayer/relays/parachain/scanner.go @@ -458,15 +458,3 @@ func fetchMessageProof( return MessageProof{Message: message, Proof: proof}, nil } - -func (s *Scanner) findLatestBlockNumber() (uint64, error) { - headerHash, err := s.paraConn.API().RPC.Chain.GetFinalizedHead() - if err != nil { - return 0, fmt.Errorf("fetch parachain hash: %w", err) - } - header, err := s.paraConn.API().RPC.Chain.GetHeader(headerHash) - if err != nil { - return 0, fmt.Errorf("fetch parachain header: %w", err) - } - return uint64(header.Number), nil -} From d9f601d87989220eb468da035ca086e433dfc1b9 Mon Sep 17 00:00:00 2001 From: ron Date: Thu, 8 Aug 2024 16:23:10 +0800 Subject: [PATCH 03/16] Cleanup --- relayer/relays/parachain/beefy-listener.go | 1 - relayer/relays/parachain/types.go | 2 -- 2 files changed, 3 deletions(-) diff --git a/relayer/relays/parachain/beefy-listener.go b/relayer/relays/parachain/beefy-listener.go index 3708c1639..b41ad0f10 100644 --- a/relayer/relays/parachain/beefy-listener.go +++ b/relayer/relays/parachain/beefy-listener.go @@ -158,7 +158,6 @@ func (li *BeefyListener) doScan(ctx context.Context, beefyBlockNumber uint64) er task := tasks[0] if li.isAssigned(task) || li.isTimeout(task, beefyBlockNumber) { log.Info(fmt.Sprintf("Nonce %d round-robin to current relay:%d", (*task.MessageProofs)[0].Message.Nonce, li.relayConfig.ID)) - task.RelayID = li.relayConfig.ID task.ProofOutput, err = li.generateProof(ctx, task.ProofInput, task.Header) if err != nil { return err diff --git a/relayer/relays/parachain/types.go b/relayer/relays/parachain/types.go index ac97173a8..632b7544d 100644 --- a/relayer/relays/parachain/types.go +++ b/relayer/relays/parachain/types.go @@ -20,8 +20,6 @@ type Task struct { ProofOutput *ProofOutput // Proofs for messages from outbound channel on Polkadot MessageProofs *[]MessageProof - // ID of the relayer for the message - RelayID uint64 } // A ProofInput is data needed to generate a proof of parachain header inclusion From fb3c4e007bd08262532826c27aa417b0ffb846ee Mon Sep 17 00:00:00 2001 From: ron Date: Thu, 8 Aug 2024 20:27:38 +0800 Subject: [PATCH 04/16] Add some randomness --- relayer/relays/parachain/beefy-listener.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/relayer/relays/parachain/beefy-listener.go b/relayer/relays/parachain/beefy-listener.go index b41ad0f10..c049ab10e 100644 --- a/relayer/relays/parachain/beefy-listener.go +++ b/relayer/relays/parachain/beefy-listener.go @@ -8,6 +8,7 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" gethTypes "github.com/ethereum/go-ethereum/core/types" + "golang.org/x/exp/rand" "golang.org/x/sync/errgroup" "github.com/snowfork/go-substrate-rpc-client/v4/types" @@ -85,7 +86,7 @@ func (li *BeefyListener) Start(ctx context.Context, eg *errgroup.Group) error { eg.Go(func() error { defer close(li.tasks) - ticker := time.NewTicker(time.Second * 30) + ticker := time.NewTicker(time.Second*60 + time.Duration(rand.Intn(30))*time.Second) for { beefyBlockNumber, _, err := li.fetchLatestBeefyBlock(ctx) if err != nil { From 929e101ccc15b4eeb94f689ee91a2357e3b2ab52 Mon Sep 17 00:00:00 2001 From: ron Date: Thu, 8 Aug 2024 20:36:32 +0800 Subject: [PATCH 05/16] Revert "Cleanup" This reverts commit 2fe3c30403156117ef2013c676c05b2943e63390. --- relayer/relays/parachain/beefy-listener.go | 6 +++++- relayer/relays/parachain/scanner.go | 12 ++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/relayer/relays/parachain/beefy-listener.go b/relayer/relays/parachain/beefy-listener.go index c049ab10e..3cb4ea92c 100644 --- a/relayer/relays/parachain/beefy-listener.go +++ b/relayer/relays/parachain/beefy-listener.go @@ -155,9 +155,13 @@ func (li *BeefyListener) doScan(ctx context.Context, beefyBlockNumber uint64) er if err != nil { return err } + latestBlockNumber, err := li.scanner.findLatestBlockNumber() + if err != nil { + return err + } if len(tasks) > 0 { task := tasks[0] - if li.isAssigned(task) || li.isTimeout(task, beefyBlockNumber) { + if li.isAssigned(task) || li.isTimeout(task, latestBlockNumber) { log.Info(fmt.Sprintf("Nonce %d round-robin to current relay:%d", (*task.MessageProofs)[0].Message.Nonce, li.relayConfig.ID)) task.ProofOutput, err = li.generateProof(ctx, task.ProofInput, task.Header) if err != nil { diff --git a/relayer/relays/parachain/scanner.go b/relayer/relays/parachain/scanner.go index cb450785b..6c534efd1 100644 --- a/relayer/relays/parachain/scanner.go +++ b/relayer/relays/parachain/scanner.go @@ -458,3 +458,15 @@ func fetchMessageProof( return MessageProof{Message: message, Proof: proof}, nil } + +func (s *Scanner) findLatestBlockNumber() (uint64, error) { + headerHash, err := s.paraConn.API().RPC.Chain.GetFinalizedHead() + if err != nil { + return 0, fmt.Errorf("fetch parachain hash: %w", err) + } + header, err := s.paraConn.API().RPC.Chain.GetHeader(headerHash) + if err != nil { + return 0, fmt.Errorf("fetch parachain header: %w", err) + } + return uint64(header.Number), nil +} From a6b2529985398bf7335291a459ddc52e87218a8c Mon Sep 17 00:00:00 2001 From: ron Date: Thu, 8 Aug 2024 23:00:13 +0800 Subject: [PATCH 06/16] Wait timeout --- relayer/relays/parachain/beefy-listener.go | 99 +++++++------------ relayer/relays/parachain/config.go | 5 +- relayer/relays/parachain/scanner.go | 41 ++++---- web/packages/test/config/parachain-relay.json | 3 +- 4 files changed, 53 insertions(+), 95 deletions(-) diff --git a/relayer/relays/parachain/beefy-listener.go b/relayer/relays/parachain/beefy-listener.go index 3cb4ea92c..3397cef85 100644 --- a/relayer/relays/parachain/beefy-listener.go +++ b/relayer/relays/parachain/beefy-listener.go @@ -7,7 +7,6 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" - gethTypes "github.com/ethereum/go-ethereum/core/types" "golang.org/x/exp/rand" "golang.org/x/sync/errgroup" @@ -86,7 +85,7 @@ func (li *BeefyListener) Start(ctx context.Context, eg *errgroup.Group) error { eg.Go(func() error { defer close(li.tasks) - ticker := time.NewTicker(time.Second*60 + time.Duration(rand.Intn(30))*time.Second) + ticker := time.NewTicker(time.Second*30 + time.Duration(rand.Intn(10))*time.Second) for { beefyBlockNumber, _, err := li.fetchLatestBeefyBlock(ctx) if err != nil { @@ -109,72 +108,55 @@ func (li *BeefyListener) Start(ctx context.Context, eg *errgroup.Group) error { return nil } -func (li *BeefyListener) subscribeNewMMRRoots(ctx context.Context) error { - headers := make(chan *gethTypes.Header, 5) - - sub, err := li.ethereumConn.Client().SubscribeNewHead(ctx, headers) - if err != nil { - return fmt.Errorf("creating ethereum header subscription: %w", err) - } - defer sub.Unsubscribe() - - for { - select { - case <-ctx.Done(): - return ctx.Err() - case err := <-sub.Err(): - return fmt.Errorf("header subscription: %w", err) - case gethheader := <-headers: - blockNumber := gethheader.Number.Uint64() - contractEvents, err := li.queryBeefyClientEvents(ctx, blockNumber, &blockNumber) - if err != nil { - return fmt.Errorf("query NewMMRRoot event logs in block %v: %w", blockNumber, err) - } - - if len(contractEvents) > 0 { - log.Info(fmt.Sprintf("Found %d BeefyLightClient.NewMMRRoot events in block %d", len(contractEvents), blockNumber)) - // Only process the last emitted event in the block - event := contractEvents[len(contractEvents)-1] - log.WithFields(log.Fields{ - "beefyBlockNumber": event.BlockNumber, - "ethereumBlockNumber": event.Raw.BlockNumber, - "ethereumTxHash": event.Raw.TxHash.Hex(), - }).Info("Witnessed a new MMRRoot event") - - err = li.doScan(ctx, event.BlockNumber) - if err != nil { - return fmt.Errorf("scan for sync tasks bounded by BEEFY block %v: %w", event.BlockNumber, err) - } - } - } - } -} - func (li *BeefyListener) doScan(ctx context.Context, beefyBlockNumber uint64) error { tasks, err := li.scanner.Scan(ctx, beefyBlockNumber) if err != nil { return err } - latestBlockNumber, err := li.scanner.findLatestBlockNumber() - if err != nil { - return err - } if len(tasks) > 0 { task := tasks[0] - if li.isAssigned(task) || li.isTimeout(task, latestBlockNumber) { - log.Info(fmt.Sprintf("Nonce %d round-robin to current relay:%d", (*task.MessageProofs)[0].Message.Nonce, li.relayConfig.ID)) + task.ProofOutput, err = li.generateProof(ctx, task.ProofInput, task.Header) + if err != nil { + return err + } + paraNonce := (*task.MessageProofs)[0].Message.Nonce + if paraNonce%li.relayConfig.Num == li.relayConfig.ID { + log.Info(fmt.Sprintf("Nonce %d round-robin to current relay:%d", paraNonce, li.relayConfig.ID)) + select { + case <-ctx.Done(): + return ctx.Err() + case li.tasks <- task: + log.Info("Beefy Listener emitted new task") + } + } else { + cnt := 0 + for { + ethInboundNonce, err := li.scanner.findLatestNonce(ctx) + if err != nil { + return err + } + // Message already picked up by other relayers, just return + if ethInboundNonce >= paraNonce { + return nil + } + // Wait for the timeout + time.Sleep(10 * time.Second) + if cnt == 12 { + break + } + cnt++ + } task.ProofOutput, err = li.generateProof(ctx, task.ProofInput, task.Header) if err != nil { return err } + log.Warn(fmt.Sprintf("Nonce %d is time out, picked up by current relay:%d", paraNonce, li.relayConfig.ID)) select { case <-ctx.Done(): return ctx.Err() case li.tasks <- task: log.Info("Beefy Listener emitted new task") } - } else { - log.Info(fmt.Sprintf("Nonce %d does not belong to current relay:%d", (*task.MessageProofs)[0].Message.Nonce, li.relayConfig.ID)) } } @@ -294,18 +276,3 @@ func (li *BeefyListener) generateProof(ctx context.Context, input *ProofInput, h return &output, nil } - -func (li *BeefyListener) isAssigned(task *Task) bool { - proofs := *task.MessageProofs - if len(proofs) == 0 { - return false - } - if proofs[0].Message.Nonce%li.relayConfig.Num == li.relayConfig.ID { - return true - } - return false -} - -func (li *BeefyListener) isTimeout(task *Task, latestBlock uint64) bool { - return uint64(task.Header.Number)+li.relayConfig.Timeout < latestBlock -} diff --git a/relayer/relays/parachain/config.go b/relayer/relays/parachain/config.go index 79ecbf6cb..6edfaf12d 100644 --- a/relayer/relays/parachain/config.go +++ b/relayer/relays/parachain/config.go @@ -36,9 +36,8 @@ type SinkContractsConfig struct { } type RelayerConfig struct { - ID uint64 `mapstructure:"id"` - Num uint64 `mapstructure:"num"` - Timeout uint64 `mapstructure:"timeout"` + ID uint64 `mapstructure:"id"` + Num uint64 `mapstructure:"num"` } func (r RelayerConfig) Validate() error { diff --git a/relayer/relays/parachain/scanner.go b/relayer/relays/parachain/scanner.go index 6c534efd1..b884ed7d0 100644 --- a/relayer/relays/parachain/scanner.go +++ b/relayer/relays/parachain/scanner.go @@ -72,30 +72,13 @@ func (s *Scanner) findTasks( paraHash types.Hash, ) ([]*Task, error) { // Fetch latest nonce in ethereum gateway - gatewayAddress := common.HexToAddress(s.config.Contracts.Gateway) - gatewayContract, err := contracts.NewGateway( - gatewayAddress, - s.ethConn.Client(), - ) - if err != nil { - return nil, fmt.Errorf("create gateway contract for address '%v': %w", gatewayAddress, err) - } - - options := bind.CallOpts{ - Pending: true, - Context: ctx, - } - ethInboundNonce, _, err := gatewayContract.ChannelNoncesOf(&options, s.config.ChannelID) - if err != nil { - return nil, fmt.Errorf("fetch nonce from gateway contract for channelID '%v': %w", s.config.ChannelID, err) - } + ethInboundNonce, err := s.findLatestNonce(ctx) log.WithFields(log.Fields{ "nonce": ethInboundNonce, "channelID": s.config.ChannelID, }).Info("Checked latest nonce delivered to ethereum gateway") // Fetch latest nonce in parachain outbound queue - paraNonceKey, err := types.CreateStorageKey(s.paraConn.Metadata(), "EthereumOutboundQueue", "Nonce", s.config.ChannelID[:], nil) if err != nil { return nil, fmt.Errorf("create storage key for parachain outbound queue nonce with channelID '%v': %w", s.config.ChannelID, err) @@ -459,14 +442,24 @@ func fetchMessageProof( return MessageProof{Message: message, Proof: proof}, nil } -func (s *Scanner) findLatestBlockNumber() (uint64, error) { - headerHash, err := s.paraConn.API().RPC.Chain.GetFinalizedHead() +func (s *Scanner) findLatestNonce(ctx context.Context) (uint64, error) { + // Fetch latest nonce in ethereum gateway + gatewayAddress := common.HexToAddress(s.config.Contracts.Gateway) + gatewayContract, err := contracts.NewGateway( + gatewayAddress, + s.ethConn.Client(), + ) if err != nil { - return 0, fmt.Errorf("fetch parachain hash: %w", err) + return 0, fmt.Errorf("create gateway contract for address '%v': %w", gatewayAddress, err) } - header, err := s.paraConn.API().RPC.Chain.GetHeader(headerHash) + + options := bind.CallOpts{ + Pending: true, + Context: ctx, + } + ethInboundNonce, _, err := gatewayContract.ChannelNoncesOf(&options, s.config.ChannelID) if err != nil { - return 0, fmt.Errorf("fetch parachain header: %w", err) + return 0, fmt.Errorf("fetch nonce from gateway contract for channelID '%v': %w", s.config.ChannelID, err) } - return uint64(header.Number), nil + return ethInboundNonce, err } diff --git a/web/packages/test/config/parachain-relay.json b/web/packages/test/config/parachain-relay.json index e3da0c953..98484e7be 100644 --- a/web/packages/test/config/parachain-relay.json +++ b/web/packages/test/config/parachain-relay.json @@ -27,7 +27,6 @@ }, "relay": { "id": null, - "num": 2, - "timeout": 20 + "num": 2 } } From 261fd890430129a47112cc88003b72c261242e5d Mon Sep 17 00:00:00 2001 From: ron Date: Fri, 9 Aug 2024 07:42:33 +0800 Subject: [PATCH 07/16] Cleanups --- relayer/relays/parachain/beefy-listener.go | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/relayer/relays/parachain/beefy-listener.go b/relayer/relays/parachain/beefy-listener.go index 3cb4ea92c..86097e893 100644 --- a/relayer/relays/parachain/beefy-listener.go +++ b/relayer/relays/parachain/beefy-listener.go @@ -161,8 +161,9 @@ func (li *BeefyListener) doScan(ctx context.Context, beefyBlockNumber uint64) er } if len(tasks) > 0 { task := tasks[0] + paraNonce := (*task.MessageProofs)[0].Message.Nonce if li.isAssigned(task) || li.isTimeout(task, latestBlockNumber) { - log.Info(fmt.Sprintf("Nonce %d round-robin to current relay:%d", (*task.MessageProofs)[0].Message.Nonce, li.relayConfig.ID)) + log.Info(fmt.Sprintf("Nonce %d round-robin to current relay:%d", paraNonce, li.relayConfig.ID)) task.ProofOutput, err = li.generateProof(ctx, task.ProofInput, task.Header) if err != nil { return err @@ -173,8 +174,6 @@ func (li *BeefyListener) doScan(ctx context.Context, beefyBlockNumber uint64) er case li.tasks <- task: log.Info("Beefy Listener emitted new task") } - } else { - log.Info(fmt.Sprintf("Nonce %d does not belong to current relay:%d", (*task.MessageProofs)[0].Message.Nonce, li.relayConfig.ID)) } } @@ -296,14 +295,7 @@ func (li *BeefyListener) generateProof(ctx context.Context, input *ProofInput, h } func (li *BeefyListener) isAssigned(task *Task) bool { - proofs := *task.MessageProofs - if len(proofs) == 0 { - return false - } - if proofs[0].Message.Nonce%li.relayConfig.Num == li.relayConfig.ID { - return true - } - return false + return (*task.MessageProofs)[0].Message.Nonce%li.relayConfig.Num == li.relayConfig.ID } func (li *BeefyListener) isTimeout(task *Task, latestBlock uint64) bool { From fce791dc0c2a96230a6e9c00b05db5cbab497ecd Mon Sep 17 00:00:00 2001 From: ron Date: Fri, 9 Aug 2024 09:33:37 +0800 Subject: [PATCH 08/16] Chore --- relayer/relays/parachain/beefy-listener.go | 70 +++++++++++----------- 1 file changed, 35 insertions(+), 35 deletions(-) diff --git a/relayer/relays/parachain/beefy-listener.go b/relayer/relays/parachain/beefy-listener.go index 70ca646d0..e26253012 100644 --- a/relayer/relays/parachain/beefy-listener.go +++ b/relayer/relays/parachain/beefy-listener.go @@ -144,41 +144,6 @@ func (li *BeefyListener) doScan(ctx context.Context, beefyBlockNumber uint64) er return nil } -func (li *BeefyListener) addTask(ctx context.Context, task *Task) (err error) { - task.ProofOutput, err = li.generateProof(ctx, task.ProofInput, task.Header) - if err != nil { - return err - } - select { - case <-ctx.Done(): - return ctx.Err() - case li.tasks <- task: - log.Info("Beefy Listener emitted new task") - } - return nil -} - -func (li *BeefyListener) waitForTask(ctx context.Context, task *Task) (bool, error) { - paraNonce := (*task.MessageProofs)[0].Message.Nonce - cnt := 0 - log.Info(fmt.Sprintf("waiting for nonce %d to be picked up by another relayer", paraNonce)) - for { - ethInboundNonce, err := li.scanner.findLatestNonce(ctx) - if err != nil { - return false, err - } - if ethInboundNonce >= paraNonce { - return true, nil - } - time.Sleep(10 * time.Second) - if cnt == 12 { - break - } - cnt++ - } - return false, nil -} - // Fetch the latest verified beefy block number and hash from Ethereum func (li *BeefyListener) fetchLatestBeefyBlock(ctx context.Context) (uint64, types.Hash, error) { number, err := li.beefyClientContract.LatestBeefyBlock(&bind.CallOpts{ @@ -263,3 +228,38 @@ func (li *BeefyListener) generateProof(ctx context.Context, input *ProofInput, h return &output, nil } + +func (li *BeefyListener) addTask(ctx context.Context, task *Task) (err error) { + task.ProofOutput, err = li.generateProof(ctx, task.ProofInput, task.Header) + if err != nil { + return err + } + select { + case <-ctx.Done(): + return ctx.Err() + case li.tasks <- task: + log.Info("Beefy Listener emitted new task") + } + return nil +} + +func (li *BeefyListener) waitForTask(ctx context.Context, task *Task) (bool, error) { + paraNonce := (*task.MessageProofs)[0].Message.Nonce + cnt := 0 + log.Info(fmt.Sprintf("waiting for nonce %d to be picked up by another relayer", paraNonce)) + for { + ethInboundNonce, err := li.scanner.findLatestNonce(ctx) + if err != nil { + return false, err + } + if ethInboundNonce >= paraNonce { + return true, nil + } + time.Sleep(10 * time.Second) + if cnt == 12 { + break + } + cnt++ + } + return false, nil +} From 2d4387cbff61c6df6e355bf4a317cb384de50654 Mon Sep 17 00:00:00 2001 From: ron Date: Fri, 9 Aug 2024 09:39:14 +0800 Subject: [PATCH 09/16] Polish --- relayer/relays/parachain/beefy-listener.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/relayer/relays/parachain/beefy-listener.go b/relayer/relays/parachain/beefy-listener.go index e26253012..6e2dc8736 100644 --- a/relayer/relays/parachain/beefy-listener.go +++ b/relayer/relays/parachain/beefy-listener.go @@ -245,14 +245,15 @@ func (li *BeefyListener) addTask(ctx context.Context, task *Task) (err error) { func (li *BeefyListener) waitForTask(ctx context.Context, task *Task) (bool, error) { paraNonce := (*task.MessageProofs)[0].Message.Nonce - cnt := 0 log.Info(fmt.Sprintf("waiting for nonce %d to be picked up by another relayer", paraNonce)) + cnt := 0 for { ethInboundNonce, err := li.scanner.findLatestNonce(ctx) if err != nil { return false, err } if ethInboundNonce >= paraNonce { + log.Info(fmt.Sprintf("nonce %d picked up by another relayer, just skip", paraNonce)) return true, nil } time.Sleep(10 * time.Second) From b933ae83cf3091f5729ef57747fe5c63cc92ad64 Mon Sep 17 00:00:00 2001 From: ron Date: Fri, 9 Aug 2024 13:25:36 +0800 Subject: [PATCH 10/16] Short timeout & 3 relayers for test --- relayer/relays/parachain/beefy-listener.go | 6 ++-- web/packages/test/config/parachain-relay.json | 2 +- web/packages/test/scripts/start-relayer.sh | 35 ++++++++++++++++++- 3 files changed, 38 insertions(+), 5 deletions(-) diff --git a/relayer/relays/parachain/beefy-listener.go b/relayer/relays/parachain/beefy-listener.go index 6e2dc8736..e2de08746 100644 --- a/relayer/relays/parachain/beefy-listener.go +++ b/relayer/relays/parachain/beefy-listener.go @@ -87,7 +87,7 @@ func (li *BeefyListener) Start(ctx context.Context, eg *errgroup.Group) error { // Add some randomness here in case one relayer is down and other relayers won't compete for // that failed message at same time. - ticker := time.NewTicker(time.Second*30 + time.Duration(rand.Intn(10))*time.Second) + ticker := time.NewTicker(time.Second*60 + time.Duration(rand.Intn(30))*time.Second) for { beefyBlockNumber, _, err := li.fetchLatestBeefyBlock(ctx) if err != nil { @@ -256,8 +256,8 @@ func (li *BeefyListener) waitForTask(ctx context.Context, task *Task) (bool, err log.Info(fmt.Sprintf("nonce %d picked up by another relayer, just skip", paraNonce)) return true, nil } - time.Sleep(10 * time.Second) - if cnt == 12 { + time.Sleep(5 * time.Second) + if cnt == 6 { break } cnt++ diff --git a/web/packages/test/config/parachain-relay.json b/web/packages/test/config/parachain-relay.json index 98484e7be..85cde51ed 100644 --- a/web/packages/test/config/parachain-relay.json +++ b/web/packages/test/config/parachain-relay.json @@ -27,6 +27,6 @@ }, "relay": { "id": null, - "num": 2 + "num": 3 } } diff --git a/web/packages/test/scripts/start-relayer.sh b/web/packages/test/scripts/start-relayer.sh index eb32ac49f..86a7bba99 100755 --- a/web/packages/test/scripts/start-relayer.sh +++ b/web/packages/test/scripts/start-relayer.sh @@ -94,6 +94,26 @@ config_relayer() { ' \ config/parachain-relay.json >$output_dir/parachain-relay-asset-hub-1.json + # Configure parachain relay (asset hub)-2 + jq \ + --arg k1 "$(address_for GatewayProxy)" \ + --arg k2 "$(address_for BeefyClient)" \ + --arg eth_endpoint_ws $eth_endpoint_ws \ + --arg eth_writer_endpoint $eth_writer_endpoint \ + --arg channelID $ASSET_HUB_CHANNEL_ID \ + --arg eth_gas_limit $eth_gas_limit \ + ' + .source.contracts.Gateway = $k1 + | .source.contracts.BeefyClient = $k2 + | .sink.contracts.Gateway = $k1 + | .source.ethereum.endpoint = $eth_endpoint_ws + | .sink.ethereum.endpoint = $eth_writer_endpoint + | .sink.ethereum."gas-limit" = $eth_gas_limit + | .source."channel-id" = $channelID + | .relay.id = 2 + ' \ + config/parachain-relay.json >$output_dir/parachain-relay-asset-hub-2.json + # Configure parachain relay (penpal) jq \ --arg k1 "$(address_for GatewayProxy)" \ @@ -213,12 +233,25 @@ start_relayer() { echo "Starting parachain relay (asset-hub) at $(date)" "${relay_bin}" run parachain \ --config "$output_dir/parachain-relay-asset-hub-1.json" \ - --ethereum.private-key $parachain_relay_secondary_gov_eth_key \ + --ethereum.private-key $parachain_relay_primary_gov_eth_key \ >>"$output_dir"/parachain-relay-asset-hub-1.log 2>&1 || true sleep 20 done ) & + # Launch parachain relay 2 for assethub + ( + : >"$output_dir"/parachain-relay-asset-hub-2.log + while :; do + echo "Starting parachain relay (asset-hub) at $(date)" + "${relay_bin}" run parachain \ + --config "$output_dir/parachain-relay-asset-hub-2.json" \ + --ethereum.private-key $parachain_relay_secondary_gov_eth_key \ + >>"$output_dir"/parachain-relay-asset-hub-2.log 2>&1 || true + sleep 20 + done + ) & + # Launch parachain relay for parachain penpal ( : >"$output_dir"/parachain-relay-penpal.log From 09a9d11e6af94f8d1d08c852441dbe2fab7f2a35 Mon Sep 17 00:00:00 2001 From: ron Date: Sat, 10 Aug 2024 13:21:44 +0800 Subject: [PATCH 11/16] Increase scan period --- relayer/relays/parachain/beefy-listener.go | 28 ++++++++++------------ 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/relayer/relays/parachain/beefy-listener.go b/relayer/relays/parachain/beefy-listener.go index e2de08746..70addd9cb 100644 --- a/relayer/relays/parachain/beefy-listener.go +++ b/relayer/relays/parachain/beefy-listener.go @@ -85,24 +85,22 @@ func (li *BeefyListener) Start(ctx context.Context, eg *errgroup.Group) error { eg.Go(func() error { defer close(li.tasks) - // Add some randomness here in case one relayer is down and other relayers won't compete for - // that failed message at same time. - ticker := time.NewTicker(time.Second*60 + time.Duration(rand.Intn(30))*time.Second) for { - beefyBlockNumber, _, err := li.fetchLatestBeefyBlock(ctx) - if err != nil { - return fmt.Errorf("fetch latest beefy block: %w", err) - } - - err = li.doScan(ctx, beefyBlockNumber) - if err != nil { - return fmt.Errorf("scan for sync tasks bounded by BEEFY block %v: %w", beefyBlockNumber, err) - } select { case <-ctx.Done(): return nil - case <-ticker.C: - continue + // Add some randomness here in case one relayer is down and other relayers won't compete for + // that failed message at same time. + case <-time.After(time.Second*90 + time.Duration(rand.Intn(30))*time.Second): + beefyBlockNumber, _, err := li.fetchLatestBeefyBlock(ctx) + if err != nil { + return fmt.Errorf("fetch latest beefy block: %w", err) + } + + err = li.doScan(ctx, beefyBlockNumber) + if err != nil { + return fmt.Errorf("scan for sync tasks bounded by BEEFY block %v: %w", beefyBlockNumber, err) + } } } }) @@ -257,7 +255,7 @@ func (li *BeefyListener) waitForTask(ctx context.Context, task *Task) (bool, err return true, nil } time.Sleep(5 * time.Second) - if cnt == 6 { + if cnt == 12 { break } cnt++ From ebb257bda8dcc0bc7d67ad807cf094c3e47f7714 Mon Sep 17 00:00:00 2001 From: ron Date: Tue, 13 Aug 2024 10:48:30 +0800 Subject: [PATCH 12/16] Rename to ScheduleConfig --- relayer/relays/parachain/beefy-listener.go | 4 ++-- relayer/relays/parachain/config.go | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/relayer/relays/parachain/beefy-listener.go b/relayer/relays/parachain/beefy-listener.go index 70addd9cb..baddd81ce 100644 --- a/relayer/relays/parachain/beefy-listener.go +++ b/relayer/relays/parachain/beefy-listener.go @@ -22,7 +22,7 @@ import ( type BeefyListener struct { config *SourceConfig - relayConfig *RelayerConfig + relayConfig *ScheduleConfig ethereumConn *ethereum.Connection beefyClientContract *contracts.BeefyClient relaychainConn *relaychain.Connection @@ -34,7 +34,7 @@ type BeefyListener struct { func NewBeefyListener( config *SourceConfig, - relayConfig *RelayerConfig, + relayConfig *ScheduleConfig, ethereumConn *ethereum.Connection, relaychainConn *relaychain.Connection, parachainConnection *parachain.Connection, diff --git a/relayer/relays/parachain/config.go b/relayer/relays/parachain/config.go index 6edfaf12d..3e4db44f8 100644 --- a/relayer/relays/parachain/config.go +++ b/relayer/relays/parachain/config.go @@ -8,9 +8,9 @@ import ( ) type Config struct { - Source SourceConfig `mapstructure:"source"` - Sink SinkConfig `mapstructure:"sink"` - Relay RelayerConfig `mapstructure:"relay"` + Source SourceConfig `mapstructure:"source"` + Sink SinkConfig `mapstructure:"sink"` + Relay ScheduleConfig `mapstructure:"relay"` } type SourceConfig struct { @@ -35,12 +35,12 @@ type SinkContractsConfig struct { Gateway string `mapstructure:"Gateway"` } -type RelayerConfig struct { +type ScheduleConfig struct { ID uint64 `mapstructure:"id"` Num uint64 `mapstructure:"num"` } -func (r RelayerConfig) Validate() error { +func (r ScheduleConfig) Validate() error { if r.Num < 1 { return errors.New("Number of relayer is not set") } From 76206021ff14b21545074b00f22f1760183c45f5 Mon Sep 17 00:00:00 2001 From: ron Date: Tue, 13 Aug 2024 10:55:15 +0800 Subject: [PATCH 13/16] Rename to schedule --- relayer/relays/parachain/config.go | 8 ++++---- relayer/relays/parachain/main.go | 4 ++-- web/packages/test/config/parachain-relay.json | 2 +- web/packages/test/scripts/start-relayer.sh | 6 +++--- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/relayer/relays/parachain/config.go b/relayer/relays/parachain/config.go index 3e4db44f8..14f09e25c 100644 --- a/relayer/relays/parachain/config.go +++ b/relayer/relays/parachain/config.go @@ -8,9 +8,9 @@ import ( ) type Config struct { - Source SourceConfig `mapstructure:"source"` - Sink SinkConfig `mapstructure:"sink"` - Relay ScheduleConfig `mapstructure:"relay"` + Source SourceConfig `mapstructure:"source"` + Sink SinkConfig `mapstructure:"sink"` + Schedule ScheduleConfig `mapstructure:"schedule"` } type SourceConfig struct { @@ -83,7 +83,7 @@ func (c Config) Validate() error { } // Relay - err = c.Relay.Validate() + err = c.Schedule.Validate() if err != nil { return fmt.Errorf("relay config: %w", err) } diff --git a/relayer/relays/parachain/main.go b/relayer/relays/parachain/main.go index c2173bbff..5ce1c1b9c 100644 --- a/relayer/relays/parachain/main.go +++ b/relayer/relays/parachain/main.go @@ -47,7 +47,7 @@ func NewRelay(config *Config, keypair *secp256k1.Keypair) (*Relay, error) { beefyListener := NewBeefyListener( &config.Source, - &config.Relay, + &config.Schedule, ethereumConnBeefy, relaychainConn, parachainConn, @@ -98,7 +98,7 @@ func (relay *Relay) Start(ctx context.Context, eg *errgroup.Group) error { return err } - log.Info("Current relay's ID:", relay.config.Relay.ID) + log.Info("Current relay's ID:", relay.config.Schedule.ID) return nil } diff --git a/web/packages/test/config/parachain-relay.json b/web/packages/test/config/parachain-relay.json index 85cde51ed..8598265f3 100644 --- a/web/packages/test/config/parachain-relay.json +++ b/web/packages/test/config/parachain-relay.json @@ -25,7 +25,7 @@ "Gateway": null } }, - "relay": { + "schedule": { "id": null, "num": 3 } diff --git a/web/packages/test/scripts/start-relayer.sh b/web/packages/test/scripts/start-relayer.sh index 86a7bba99..c4a24876c 100755 --- a/web/packages/test/scripts/start-relayer.sh +++ b/web/packages/test/scripts/start-relayer.sh @@ -70,7 +70,7 @@ config_relayer() { | .sink.ethereum.endpoint = $eth_writer_endpoint | .sink.ethereum."gas-limit" = $eth_gas_limit | .source."channel-id" = $channelID - | .relay.id = 0 + | .schedule.id = 0 ' \ config/parachain-relay.json >$output_dir/parachain-relay-asset-hub-0.json @@ -90,7 +90,7 @@ config_relayer() { | .sink.ethereum.endpoint = $eth_writer_endpoint | .sink.ethereum."gas-limit" = $eth_gas_limit | .source."channel-id" = $channelID - | .relay.id = 1 + | .schedule.id = 1 ' \ config/parachain-relay.json >$output_dir/parachain-relay-asset-hub-1.json @@ -110,7 +110,7 @@ config_relayer() { | .sink.ethereum.endpoint = $eth_writer_endpoint | .sink.ethereum."gas-limit" = $eth_gas_limit | .source."channel-id" = $channelID - | .relay.id = 2 + | .schedule.id = 2 ' \ config/parachain-relay.json >$output_dir/parachain-relay-asset-hub-2.json From 0deea629c20833ef1ac6073ebbdd190541d851b7 Mon Sep 17 00:00:00 2001 From: ron Date: Tue, 13 Aug 2024 11:08:07 +0800 Subject: [PATCH 14/16] Add comment --- relayer/relays/parachain/config.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/relayer/relays/parachain/config.go b/relayer/relays/parachain/config.go index 14f09e25c..a0f881831 100644 --- a/relayer/relays/parachain/config.go +++ b/relayer/relays/parachain/config.go @@ -36,7 +36,9 @@ type SinkContractsConfig struct { } type ScheduleConfig struct { - ID uint64 `mapstructure:"id"` + // ID of current relayer, starting from 0 + ID uint64 `mapstructure:"id"` + // Number of total count of all relayers Num uint64 `mapstructure:"num"` } @@ -44,6 +46,9 @@ func (r ScheduleConfig) Validate() error { if r.Num < 1 { return errors.New("Number of relayer is not set") } + if r.ID >= r.Num { + return errors.New("ID of the Number of relayer is not set") + } return nil } From a3a7f02a3d508692ffc6b2951dbbacde7c4d47c8 Mon Sep 17 00:00:00 2001 From: ron Date: Tue, 13 Aug 2024 18:41:52 +0800 Subject: [PATCH 15/16] Rename as scheduleConfig --- relayer/relays/parachain/beefy-listener.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/relayer/relays/parachain/beefy-listener.go b/relayer/relays/parachain/beefy-listener.go index baddd81ce..901c426ff 100644 --- a/relayer/relays/parachain/beefy-listener.go +++ b/relayer/relays/parachain/beefy-listener.go @@ -34,7 +34,7 @@ type BeefyListener struct { func NewBeefyListener( config *SourceConfig, - relayConfig *ScheduleConfig, + scheduleConfig *ScheduleConfig, ethereumConn *ethereum.Connection, relaychainConn *relaychain.Connection, parachainConnection *parachain.Connection, @@ -42,7 +42,7 @@ func NewBeefyListener( ) *BeefyListener { return &BeefyListener{ config: config, - relayConfig: relayConfig, + relayConfig: scheduleConfig, ethereumConn: ethereumConn, relaychainConn: relaychainConn, parachainConnection: parachainConnection, From 6dfa1346d105aa2ef823ed1a99625795d39df2c6 Mon Sep 17 00:00:00 2001 From: ron Date: Tue, 13 Aug 2024 18:44:07 +0800 Subject: [PATCH 16/16] Rename --- relayer/relays/parachain/beefy-listener.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/relayer/relays/parachain/beefy-listener.go b/relayer/relays/parachain/beefy-listener.go index 901c426ff..5ca86203b 100644 --- a/relayer/relays/parachain/beefy-listener.go +++ b/relayer/relays/parachain/beefy-listener.go @@ -22,7 +22,7 @@ import ( type BeefyListener struct { config *SourceConfig - relayConfig *ScheduleConfig + scheduleConfig *ScheduleConfig ethereumConn *ethereum.Connection beefyClientContract *contracts.BeefyClient relaychainConn *relaychain.Connection @@ -42,7 +42,7 @@ func NewBeefyListener( ) *BeefyListener { return &BeefyListener{ config: config, - relayConfig: scheduleConfig, + scheduleConfig: scheduleConfig, ethereumConn: ethereumConn, relaychainConn: relaychainConn, parachainConnection: parachainConnection, @@ -116,13 +116,13 @@ func (li *BeefyListener) doScan(ctx context.Context, beefyBlockNumber uint64) er if len(tasks) > 0 { task := tasks[0] paraNonce := (*task.MessageProofs)[0].Message.Nonce - if paraNonce%li.relayConfig.Num == li.relayConfig.ID { + if paraNonce%li.scheduleConfig.Num == li.scheduleConfig.ID { // Task self assigned err = li.addTask(ctx, task) if err != nil { return fmt.Errorf("add task for nonce %d: %w", paraNonce, err) } - log.Info(fmt.Sprintf("nonce %d self assigned to relay(%d)", paraNonce, li.relayConfig.ID)) + log.Info(fmt.Sprintf("nonce %d self assigned to relay(%d)", paraNonce, li.scheduleConfig.ID)) } else { // Task wait for picked up by another relayer, submit anyway if timeout done, err := li.waitForTask(ctx, task) @@ -134,7 +134,7 @@ func (li *BeefyListener) doScan(ctx context.Context, beefyBlockNumber uint64) er if err != nil { return fmt.Errorf("add task for nonce %d: %w", paraNonce, err) } - log.Info(fmt.Sprintf("nonce %d timeout but picked up by relay(%d)", paraNonce, li.relayConfig.ID)) + log.Info(fmt.Sprintf("nonce %d timeout but picked up by relay(%d)", paraNonce, li.scheduleConfig.ID)) } } }