Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 48 additions & 10 deletions relayer/relays/parachain/beefy-listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
Expand All @@ -22,6 +23,7 @@ import (

type BeefyListener struct {
config *SourceConfig
scheduleConfig *ScheduleConfig
ethereumConn *ethereum.Connection
beefyClientContract *contracts.BeefyClient
relaychainConn *relaychain.Connection
Expand All @@ -33,13 +35,15 @@ type BeefyListener struct {

func NewBeefyListener(
config *SourceConfig,
scheduleConfig *ScheduleConfig,
ethereumConn *ethereum.Connection,
relaychainConn *relaychain.Connection,
parachainConnection *parachain.Connection,
tasks chan<- *Task,
) *BeefyListener {
return &BeefyListener{
config: config,
scheduleConfig: scheduleConfig,
ethereumConn: ethereumConn,
relaychainConn: relaychainConn,
parachainConnection: parachainConnection,
Expand Down Expand Up @@ -154,18 +158,18 @@ func (li *BeefyListener) doScan(ctx context.Context, beefyBlockNumber uint64) er
if err != nil {
return err
}

for _, task := range tasks {
// do final proof generation right before sending. The proof needs to be fresh.
task.ProofOutput, err = li.generateProof(ctx, task.ProofInput, task.Header)
if err != nil {
return err
paraNonce := (*task.MessageProofs)[0].Message.Nonce
modNonce := paraNonce % li.scheduleConfig.Num
var waitingPeriod uint64
if modNonce > li.scheduleConfig.ID {
waitingPeriod = modNonce - li.scheduleConfig.ID
} else {
waitingPeriod = li.scheduleConfig.ID - modNonce
}
Copy link
Copy Markdown
Contributor

@alistair-singh alistair-singh Aug 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please double check but the math below will produce the waiting period correctly for all input.

Suggested change
paraNonce := (*task.MessageProofs)[0].Message.Nonce
modNonce := paraNonce % li.scheduleConfig.Num
var waitingPeriod uint64
if modNonce > li.scheduleConfig.ID {
waitingPeriod = modNonce - li.scheduleConfig.ID
} else {
waitingPeriod = li.scheduleConfig.ID - modNonce
}
paraNonce := (*task.MessageProofs)[0].Message.Nonce
waitingPeriod := (paraNonce + li.scheduleConfig.Num - li.scheduleConfig.ID) % li.scheduleConfig.Num

The if/else version does not produce the correct waiting period for the scenario:
3 Relayers, relayer id 1, message nonce 5 and 6 both will produce a waiting period of 1.

Maybe we should add a unit test for this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch! Fixed in e787143

select {
case <-ctx.Done():
return ctx.Err()
case li.tasks <- task:
log.Info("Beefy Listener emitted new task")
err = li.waitAndSend(ctx, task, waitingPeriod)
if err != nil {
return fmt.Errorf("wait task for nonce %d: %w", paraNonce, err)
}
}

Expand Down Expand Up @@ -285,3 +289,37 @@ func (li *BeefyListener) generateProof(ctx context.Context, input *ProofInput, h

return &output, nil
}

func (li *BeefyListener) waitAndSend(ctx context.Context, task *Task, waitingPeriod uint64) error {
paraNonce := (*task.MessageProofs)[0].Message.Nonce
log.Info(fmt.Sprintf("waiting for nonce %d to be picked up by another relayer", paraNonce))
var cnt uint64
var err error
for {
ethInboundNonce, err := li.scanner.findLatestNonce(ctx)
if err != nil {
return err
}
if ethInboundNonce >= paraNonce {
log.Info(fmt.Sprintf("nonce %d picked up by another relayer, just skip", paraNonce))
return nil
}
if cnt == waitingPeriod {
break
}
time.Sleep(45 * time.Second)
Comment thread
alistair-singh marked this conversation as resolved.
Outdated
cnt++
}
log.Info(fmt.Sprintf("nonce %d is not picked up by any one, submit anyway", paraNonce))
task.ProofOutput, err = li.generateProof(ctx, task.ProofInput, task.Header)
if err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case li.tasks <- task:
log.Info("Beefy Listener emitted new task")
}
return nil
}
31 changes: 29 additions & 2 deletions relayer/relays/parachain/config.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package parachain

import (
"errors"
"fmt"

"github.com/snowfork/snowbridge/relayer/config"
)

type Config struct {
Source SourceConfig `mapstructure:"source"`
Sink SinkConfig `mapstructure:"sink"`
Source SourceConfig `mapstructure:"source"`
Sink SinkConfig `mapstructure:"sink"`
Schedule ScheduleConfig `mapstructure:"schedule"`
}

type SourceConfig struct {
Expand All @@ -32,6 +35,23 @@ type SinkContractsConfig struct {
Gateway string `mapstructure:"Gateway"`
}

type ScheduleConfig struct {
// ID of current relayer, starting from 0
ID uint64 `mapstructure:"id"`
// Number of total count of all relayers
Num uint64 `mapstructure:"num"`
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Num uint64 `mapstructure:"num"`
TotalRelayerCount uint64 `mapstructure:"totalRelayerCount"`

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

func (r ScheduleConfig) Validate() error {
if r.Num < 1 {
return errors.New("Number of relayer is not set")
}
if r.ID >= r.Num {
return errors.New("ID of the Number of relayer is not set")
}
return nil
}

type ChannelID [32]byte

func (c Config) Validate() error {
Expand Down Expand Up @@ -66,5 +86,12 @@ func (c Config) Validate() error {
if c.Sink.Contracts.Gateway == "" {
return fmt.Errorf("sink contracts setting [Gateway] is not set")
}

// Relay
err = c.Schedule.Validate()
if err != nil {
return fmt.Errorf("relay config: %w", err)
}

return nil
}
3 changes: 3 additions & 0 deletions relayer/relays/parachain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func NewRelay(config *Config, keypair *secp256k1.Keypair) (*Relay, error) {

beefyListener := NewBeefyListener(
&config.Source,
&config.Schedule,
ethereumConnBeefy,
relaychainConn,
parachainConn,
Expand Down Expand Up @@ -97,5 +98,7 @@ func (relay *Relay) Start(ctx context.Context, eg *errgroup.Group) error {
return err
}

log.Info("Current relay's ID:", relay.config.Schedule.ID)

return nil
}
42 changes: 24 additions & 18 deletions relayer/relays/parachain/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"

"github.com/snowfork/go-substrate-rpc-client/v4/scale"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
Expand Down Expand Up @@ -71,30 +72,13 @@ func (s *Scanner) findTasks(
paraHash types.Hash,
) ([]*Task, error) {
// Fetch latest nonce in ethereum gateway
gatewayAddress := common.HexToAddress(s.config.Contracts.Gateway)
gatewayContract, err := contracts.NewGateway(
gatewayAddress,
s.ethConn.Client(),
)
if err != nil {
return nil, fmt.Errorf("create gateway contract for address '%v': %w", gatewayAddress, err)
}

options := bind.CallOpts{
Pending: true,
Context: ctx,
}
ethInboundNonce, _, err := gatewayContract.ChannelNoncesOf(&options, s.config.ChannelID)
if err != nil {
return nil, fmt.Errorf("fetch nonce from gateway contract for channelID '%v': %w", s.config.ChannelID, err)
}
ethInboundNonce, err := s.findLatestNonce(ctx)
log.WithFields(log.Fields{
"nonce": ethInboundNonce,
"channelID": s.config.ChannelID,
}).Info("Checked latest nonce delivered to ethereum gateway")

// Fetch latest nonce in parachain outbound queue

paraNonceKey, err := types.CreateStorageKey(s.paraConn.Metadata(), "EthereumOutboundQueue", "Nonce", s.config.ChannelID[:], nil)
if err != nil {
return nil, fmt.Errorf("create storage key for parachain outbound queue nonce with channelID '%v': %w", s.config.ChannelID, err)
Expand Down Expand Up @@ -457,3 +441,25 @@ func fetchMessageProof(

return MessageProof{Message: message, Proof: proof}, nil
}

func (s *Scanner) findLatestNonce(ctx context.Context) (uint64, error) {
// Fetch latest nonce in ethereum gateway
gatewayAddress := common.HexToAddress(s.config.Contracts.Gateway)
gatewayContract, err := contracts.NewGateway(
gatewayAddress,
s.ethConn.Client(),
)
if err != nil {
return 0, fmt.Errorf("create gateway contract for address '%v': %w", gatewayAddress, err)
}

options := bind.CallOpts{
Pending: true,
Context: ctx,
}
ethInboundNonce, _, err := gatewayContract.ChannelNoncesOf(&options, s.config.ChannelID)
if err != nil {
return 0, fmt.Errorf("fetch nonce from gateway contract for channelID '%v': %w", s.config.ChannelID, err)
}
return ethInboundNonce, err
}
4 changes: 4 additions & 0 deletions web/packages/test/config/parachain-relay.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,9 @@
"contracts": {
"Gateway": null
}
},
"schedule": {
"id": null,
"num": 3
}
}
95 changes: 81 additions & 14 deletions web/packages/test/scripts/start-relayer.sh
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ config_relayer() {
' \
config/parachain-relay.json >$output_dir/parachain-relay-bridge-hub-02.json

# Configure parachain relay (asset hub)
# Configure parachain relay (asset hub)-0
jq \
--arg k1 "$(address_for GatewayProxy)" \
--arg k2 "$(address_for BeefyClient)" \
Expand All @@ -70,8 +70,49 @@ config_relayer() {
| .sink.ethereum.endpoint = $eth_writer_endpoint
| .sink.ethereum."gas-limit" = $eth_gas_limit
| .source."channel-id" = $channelID
| .schedule.id = 0
' \
config/parachain-relay.json >$output_dir/parachain-relay-asset-hub.json
config/parachain-relay.json >$output_dir/parachain-relay-asset-hub-0.json

# Configure parachain relay (asset hub)-1
jq \
--arg k1 "$(address_for GatewayProxy)" \
--arg k2 "$(address_for BeefyClient)" \
--arg eth_endpoint_ws $eth_endpoint_ws \
--arg eth_writer_endpoint $eth_writer_endpoint \
--arg channelID $ASSET_HUB_CHANNEL_ID \
--arg eth_gas_limit $eth_gas_limit \
'
.source.contracts.Gateway = $k1
| .source.contracts.BeefyClient = $k2
| .sink.contracts.Gateway = $k1
| .source.ethereum.endpoint = $eth_endpoint_ws
| .sink.ethereum.endpoint = $eth_writer_endpoint
| .sink.ethereum."gas-limit" = $eth_gas_limit
| .source."channel-id" = $channelID
| .schedule.id = 1
' \
config/parachain-relay.json >$output_dir/parachain-relay-asset-hub-1.json

# Configure parachain relay (asset hub)-2
jq \
--arg k1 "$(address_for GatewayProxy)" \
--arg k2 "$(address_for BeefyClient)" \
--arg eth_endpoint_ws $eth_endpoint_ws \
--arg eth_writer_endpoint $eth_writer_endpoint \
--arg channelID $ASSET_HUB_CHANNEL_ID \
--arg eth_gas_limit $eth_gas_limit \
'
.source.contracts.Gateway = $k1
| .source.contracts.BeefyClient = $k2
| .sink.contracts.Gateway = $k1
| .source.ethereum.endpoint = $eth_endpoint_ws
| .sink.ethereum.endpoint = $eth_writer_endpoint
| .sink.ethereum."gas-limit" = $eth_gas_limit
| .source."channel-id" = $channelID
| .schedule.id = 2
' \
config/parachain-relay.json >$output_dir/parachain-relay-asset-hub-2.json

# Configure parachain relay (penpal)
jq \
Expand Down Expand Up @@ -172,18 +213,44 @@ start_relayer() {
done
) &

# Launch parachain relay for assethub
(
: >"$output_dir"/parachain-relay-asset-hub.log
while :; do
echo "Starting parachain relay (asset-hub) at $(date)"
"${relay_bin}" run parachain \
--config "$output_dir/parachain-relay-asset-hub.json" \
--ethereum.private-key $parachain_relay_assethub_eth_key \
>>"$output_dir"/parachain-relay-asset-hub.log 2>&1 || true
sleep 20
done
) &
# # Launch parachain relay 0 for assethub
# (
# : >"$output_dir"/parachain-relay-asset-hub-0.log
# while :; do
# echo "Starting parachain relay (asset-hub) at $(date)"
# "${relay_bin}" run parachain \
# --config "$output_dir/parachain-relay-asset-hub-0.json" \
# --ethereum.private-key $parachain_relay_assethub_eth_key \
# >>"$output_dir"/parachain-relay-asset-hub-0.log 2>&1 || true
# sleep 20
# done
# ) &

# # Launch parachain relay 1 for assethub
# (
# : >"$output_dir"/parachain-relay-asset-hub-1.log
# while :; do
# echo "Starting parachain relay (asset-hub) at $(date)"
# "${relay_bin}" run parachain \
# --config "$output_dir/parachain-relay-asset-hub-1.json" \
# --ethereum.private-key $parachain_relay_primary_gov_eth_key \
# >>"$output_dir"/parachain-relay-asset-hub-1.log 2>&1 || true
# sleep 20
# done
# ) &

# # Launch parachain relay 2 for assethub
# (
# : >"$output_dir"/parachain-relay-asset-hub-2.log
# while :; do
# echo "Starting parachain relay (asset-hub) at $(date)"
# "${relay_bin}" run parachain \
# --config "$output_dir/parachain-relay-asset-hub-2.json" \
# --ethereum.private-key $parachain_relay_secondary_gov_eth_key \
# >>"$output_dir"/parachain-relay-asset-hub-2.log 2>&1 || true
# sleep 20
# done
# ) &

# Launch parachain relay for parachain penpal
(
Expand Down