Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions relayer/chain/parachain/schedule_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package parachain_test

import (
"fmt"
"testing"
)

// 100 messages distrubuted to 10 relayers, check waitingPeriod for each relayer
func TestModuloSchedule(t *testing.T) {
message_count := 100
total_count := 10
var waitingPeriod uint64
for nonce := 1; nonce < message_count; nonce++ {
for id := 0; id < total_count; id++ {
waitingPeriod = uint64((nonce + total_count - id) % total_count)
fmt.Printf("algorithm 1: relay %d waiting for nonce %d for %d\n", id, nonce, waitingPeriod)
}
}
for nonce := 1; nonce < message_count; nonce++ {
for id := 0; id < total_count; id++ {
modNonce := nonce % total_count
if modNonce > id {
waitingPeriod = uint64(modNonce - id)
} else {
waitingPeriod = uint64(id - modNonce)
}
fmt.Printf("algorithm 2: relay %d waiting for nonce %d for %d\n", id, nonce, waitingPeriod)
}
}

}
52 changes: 42 additions & 10 deletions relayer/relays/parachain/beefy-listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
Expand All @@ -22,6 +23,7 @@ import (

type BeefyListener struct {
config *SourceConfig
scheduleConfig *ScheduleConfig
ethereumConn *ethereum.Connection
beefyClientContract *contracts.BeefyClient
relaychainConn *relaychain.Connection
Expand All @@ -33,13 +35,15 @@ type BeefyListener struct {

func NewBeefyListener(
config *SourceConfig,
scheduleConfig *ScheduleConfig,
ethereumConn *ethereum.Connection,
relaychainConn *relaychain.Connection,
parachainConnection *parachain.Connection,
tasks chan<- *Task,
) *BeefyListener {
return &BeefyListener{
config: config,
scheduleConfig: scheduleConfig,
ethereumConn: ethereumConn,
relaychainConn: relaychainConn,
parachainConnection: parachainConnection,
Expand Down Expand Up @@ -154,18 +158,12 @@ func (li *BeefyListener) doScan(ctx context.Context, beefyBlockNumber uint64) er
if err != nil {
return err
}

for _, task := range tasks {
// do final proof generation right before sending. The proof needs to be fresh.
task.ProofOutput, err = li.generateProof(ctx, task.ProofInput, task.Header)
paraNonce := (*task.MessageProofs)[0].Message.Nonce
waitingPeriod := (paraNonce + li.scheduleConfig.TotalRelayerCount - li.scheduleConfig.ID) % li.scheduleConfig.TotalRelayerCount
err = li.waitAndSend(ctx, task, waitingPeriod)
if err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case li.tasks <- task:
log.Info("Beefy Listener emitted new task")
return fmt.Errorf("wait task for nonce %d: %w", paraNonce, err)
}
}

Expand Down Expand Up @@ -285,3 +283,37 @@ func (li *BeefyListener) generateProof(ctx context.Context, input *ProofInput, h

return &output, nil
}

func (li *BeefyListener) waitAndSend(ctx context.Context, task *Task, waitingPeriod uint64) error {
paraNonce := (*task.MessageProofs)[0].Message.Nonce
log.Info(fmt.Sprintf("waiting for nonce %d to be picked up by another relayer", paraNonce))
var cnt uint64
var err error
for {
ethInboundNonce, err := li.scanner.findLatestNonce(ctx)
if err != nil {
return err
}
if ethInboundNonce >= paraNonce {
log.Info(fmt.Sprintf("nonce %d picked up by another relayer, just skip", paraNonce))
return nil
}
if cnt == waitingPeriod {
break
}
time.Sleep(time.Duration(li.scheduleConfig.SleepInterval) * time.Second)
cnt++
}
log.Info(fmt.Sprintf("nonce %d is not picked up by any one, submit anyway", paraNonce))
task.ProofOutput, err = li.generateProof(ctx, task.ProofInput, task.Header)
if err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case li.tasks <- task:
log.Info("Beefy Listener emitted new task")
}
return nil
}
33 changes: 31 additions & 2 deletions relayer/relays/parachain/config.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package parachain

import (
"errors"
"fmt"

"github.com/snowfork/snowbridge/relayer/config"
)

type Config struct {
Source SourceConfig `mapstructure:"source"`
Sink SinkConfig `mapstructure:"sink"`
Source SourceConfig `mapstructure:"source"`
Sink SinkConfig `mapstructure:"sink"`
Schedule ScheduleConfig `mapstructure:"schedule"`
}

type SourceConfig struct {
Expand All @@ -32,6 +35,25 @@ type SinkContractsConfig struct {
Gateway string `mapstructure:"Gateway"`
}

type ScheduleConfig struct {
// ID of current relayer, starting from 0
ID uint64 `mapstructure:"id"`
// Number of total count of all relayers
TotalRelayerCount uint64 `mapstructure:"totalRelayerCount"`
// Sleep interval(in seconds) to check if message(nonce) has already been relayed
SleepInterval uint64 `mapstructure:"sleepInterval"`
}

func (r ScheduleConfig) Validate() error {
if r.TotalRelayerCount < 1 {
return errors.New("Number of relayer is not set")
}
if r.ID >= r.TotalRelayerCount {
return errors.New("ID of the Number of relayer is not set")
}
return nil
}

type ChannelID [32]byte

func (c Config) Validate() error {
Expand Down Expand Up @@ -66,5 +88,12 @@ func (c Config) Validate() error {
if c.Sink.Contracts.Gateway == "" {
return fmt.Errorf("sink contracts setting [Gateway] is not set")
}

// Relay
err = c.Schedule.Validate()
if err != nil {
return fmt.Errorf("relay config: %w", err)
}

return nil
}
3 changes: 3 additions & 0 deletions relayer/relays/parachain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func NewRelay(config *Config, keypair *secp256k1.Keypair) (*Relay, error) {

beefyListener := NewBeefyListener(
&config.Source,
&config.Schedule,
ethereumConnBeefy,
relaychainConn,
parachainConn,
Expand Down Expand Up @@ -97,5 +98,7 @@ func (relay *Relay) Start(ctx context.Context, eg *errgroup.Group) error {
return err
}

log.Info("Current relay's ID:", relay.config.Schedule.ID)

return nil
}
42 changes: 24 additions & 18 deletions relayer/relays/parachain/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"

"github.com/snowfork/go-substrate-rpc-client/v4/scale"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
Expand Down Expand Up @@ -71,30 +72,13 @@ func (s *Scanner) findTasks(
paraHash types.Hash,
) ([]*Task, error) {
// Fetch latest nonce in ethereum gateway
gatewayAddress := common.HexToAddress(s.config.Contracts.Gateway)
gatewayContract, err := contracts.NewGateway(
gatewayAddress,
s.ethConn.Client(),
)
if err != nil {
return nil, fmt.Errorf("create gateway contract for address '%v': %w", gatewayAddress, err)
}

options := bind.CallOpts{
Pending: true,
Context: ctx,
}
ethInboundNonce, _, err := gatewayContract.ChannelNoncesOf(&options, s.config.ChannelID)
if err != nil {
return nil, fmt.Errorf("fetch nonce from gateway contract for channelID '%v': %w", s.config.ChannelID, err)
}
ethInboundNonce, err := s.findLatestNonce(ctx)
log.WithFields(log.Fields{
"nonce": ethInboundNonce,
"channelID": s.config.ChannelID,
}).Info("Checked latest nonce delivered to ethereum gateway")

// Fetch latest nonce in parachain outbound queue

paraNonceKey, err := types.CreateStorageKey(s.paraConn.Metadata(), "EthereumOutboundQueue", "Nonce", s.config.ChannelID[:], nil)
if err != nil {
return nil, fmt.Errorf("create storage key for parachain outbound queue nonce with channelID '%v': %w", s.config.ChannelID, err)
Expand Down Expand Up @@ -457,3 +441,25 @@ func fetchMessageProof(

return MessageProof{Message: message, Proof: proof}, nil
}

func (s *Scanner) findLatestNonce(ctx context.Context) (uint64, error) {
// Fetch latest nonce in ethereum gateway
gatewayAddress := common.HexToAddress(s.config.Contracts.Gateway)
gatewayContract, err := contracts.NewGateway(
gatewayAddress,
s.ethConn.Client(),
)
if err != nil {
return 0, fmt.Errorf("create gateway contract for address '%v': %w", gatewayAddress, err)
}

options := bind.CallOpts{
Pending: true,
Context: ctx,
}
ethInboundNonce, _, err := gatewayContract.ChannelNoncesOf(&options, s.config.ChannelID)
if err != nil {
return 0, fmt.Errorf("fetch nonce from gateway contract for channelID '%v': %w", s.config.ChannelID, err)
}
return ethInboundNonce, err
}
5 changes: 5 additions & 0 deletions web/packages/test/config/parachain-relay.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,10 @@
"contracts": {
"Gateway": null
}
},
"schedule": {
"id": null,
"totalRelayerCount": 3,
"sleepInterval": 45
}
}
79 changes: 73 additions & 6 deletions web/packages/test/scripts/start-relayer.sh
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ config_relayer() {
' \
config/parachain-relay.json >$output_dir/parachain-relay-bridge-hub-02.json

# Configure parachain relay (asset hub)
# Configure parachain relay (asset hub)-0
jq \
--arg k1 "$(address_for GatewayProxy)" \
--arg k2 "$(address_for BeefyClient)" \
Expand All @@ -70,8 +70,49 @@ config_relayer() {
| .sink.ethereum.endpoint = $eth_writer_endpoint
| .sink.ethereum."gas-limit" = $eth_gas_limit
| .source."channel-id" = $channelID
| .schedule.id = 0
' \
config/parachain-relay.json >$output_dir/parachain-relay-asset-hub.json
config/parachain-relay.json >$output_dir/parachain-relay-asset-hub-0.json

# Configure parachain relay (asset hub)-1
jq \
--arg k1 "$(address_for GatewayProxy)" \
--arg k2 "$(address_for BeefyClient)" \
--arg eth_endpoint_ws $eth_endpoint_ws \
--arg eth_writer_endpoint $eth_writer_endpoint \
--arg channelID $ASSET_HUB_CHANNEL_ID \
--arg eth_gas_limit $eth_gas_limit \
'
.source.contracts.Gateway = $k1
| .source.contracts.BeefyClient = $k2
| .sink.contracts.Gateway = $k1
| .source.ethereum.endpoint = $eth_endpoint_ws
| .sink.ethereum.endpoint = $eth_writer_endpoint
| .sink.ethereum."gas-limit" = $eth_gas_limit
| .source."channel-id" = $channelID
| .schedule.id = 1
' \
config/parachain-relay.json >$output_dir/parachain-relay-asset-hub-1.json

# Configure parachain relay (asset hub)-2
jq \
--arg k1 "$(address_for GatewayProxy)" \
--arg k2 "$(address_for BeefyClient)" \
--arg eth_endpoint_ws $eth_endpoint_ws \
--arg eth_writer_endpoint $eth_writer_endpoint \
--arg channelID $ASSET_HUB_CHANNEL_ID \
--arg eth_gas_limit $eth_gas_limit \
'
.source.contracts.Gateway = $k1
| .source.contracts.BeefyClient = $k2
| .sink.contracts.Gateway = $k1
| .source.ethereum.endpoint = $eth_endpoint_ws
| .sink.ethereum.endpoint = $eth_writer_endpoint
| .sink.ethereum."gas-limit" = $eth_gas_limit
| .source."channel-id" = $channelID
| .schedule.id = 2
' \
config/parachain-relay.json >$output_dir/parachain-relay-asset-hub-2.json

# Configure parachain relay (penpal)
jq \
Expand Down Expand Up @@ -172,15 +213,41 @@ start_relayer() {
done
) &

# Launch parachain relay for assethub
# Launch parachain relay 0 for assethub
(
: >"$output_dir"/parachain-relay-asset-hub.log
: >"$output_dir"/parachain-relay-asset-hub-0.log
while :; do
echo "Starting parachain relay (asset-hub) at $(date)"
"${relay_bin}" run parachain \
--config "$output_dir/parachain-relay-asset-hub.json" \
--config "$output_dir/parachain-relay-asset-hub-0.json" \
--ethereum.private-key $parachain_relay_assethub_eth_key \
>>"$output_dir"/parachain-relay-asset-hub.log 2>&1 || true
>>"$output_dir"/parachain-relay-asset-hub-0.log 2>&1 || true
sleep 20
done
) &

# Launch parachain relay 1 for assethub
(
: >"$output_dir"/parachain-relay-asset-hub-1.log
while :; do
echo "Starting parachain relay (asset-hub) at $(date)"
"${relay_bin}" run parachain \
--config "$output_dir/parachain-relay-asset-hub-1.json" \
--ethereum.private-key $parachain_relay_primary_gov_eth_key \
>>"$output_dir"/parachain-relay-asset-hub-1.log 2>&1 || true
sleep 20
done
) &

# Launch parachain relay 2 for assethub
(
: >"$output_dir"/parachain-relay-asset-hub-2.log
while :; do
echo "Starting parachain relay (asset-hub) at $(date)"
"${relay_bin}" run parachain \
--config "$output_dir/parachain-relay-asset-hub-2.json" \
--ethereum.private-key $parachain_relay_secondary_gov_eth_key \
>>"$output_dir"/parachain-relay-asset-hub-2.log 2>&1 || true
sleep 20
done
) &
Expand Down