Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions relayer/relays/execution/config.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,37 @@
package execution

import (
"errors"
"fmt"

"github.com/snowfork/snowbridge/relayer/config"
beaconconf "github.com/snowfork/snowbridge/relayer/relays/beacon/config"
)

type Config struct {
Source SourceConfig `mapstructure:"source"`
Sink SinkConfig `mapstructure:"sink"`
InstantVerification bool `mapstructure:"instantVerification"`
Source SourceConfig `mapstructure:"source"`
Sink SinkConfig `mapstructure:"sink"`
InstantVerification bool `mapstructure:"instantVerification"`
Schedule ScheduleConfig `mapstructure:"schedule"`
}

type ScheduleConfig struct {
// ID of current relayer, starting from 0
ID uint64 `mapstructure:"id"`
// Number of total count of all relayers
TotalRelayerCount uint64 `mapstructure:"totalRelayerCount"`
// Sleep interval(in seconds) to check if message(nonce) has already been relayed
SleepInterval uint64 `mapstructure:"sleepInterval"`
}

func (r ScheduleConfig) Validate() error {
if r.TotalRelayerCount < 1 {
return errors.New("Number of relayer is not set")
}
if r.ID >= r.TotalRelayerCount {
return errors.New("ID of the Number of relayer is not set")
}
return nil
}

type SourceConfig struct {
Expand Down Expand Up @@ -44,5 +66,9 @@ func (c Config) Validate() error {
if c.Source.Contracts.Gateway == "" {
return fmt.Errorf("source setting [gateway] is not set")
}
err = c.Schedule.Validate()
if err != nil {
return fmt.Errorf("schedule config: %w", err)
}
return nil
}
133 changes: 87 additions & 46 deletions relayer/relays/execution/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Relay struct {
gatewayContract *contracts.Gateway
beaconHeader *header.Header
writer *parachain.ParachainWriter
headerCache *ethereum.HeaderCache
}

func NewRelay(
Expand Down Expand Up @@ -77,6 +78,7 @@ func (r *Relay) Start(ctx context.Context, eg *errgroup.Group) error {
if err != nil {
return err
}
r.headerCache = headerCache

address := common.HexToAddress(r.config.Source.Contracts.Gateway)
contract, err := contracts.NewGateway(address, ethconn.Client())
Expand All @@ -101,11 +103,13 @@ func (r *Relay) Start(ctx context.Context, eg *errgroup.Group) error {
)
r.beaconHeader = &beaconHeader

log.Info("Current relay's ID:", r.config.Schedule.ID)

for {
select {
case <-ctx.Done():
return nil
case <-time.After(6 * time.Second):
case <-time.After(60 * time.Second):
log.WithFields(log.Fields{
"channelId": r.config.Source.ChannelID,
}).Info("Polling Nonces")
Expand Down Expand Up @@ -142,53 +146,10 @@ func (r *Relay) Start(ctx context.Context, eg *errgroup.Group) error {
}

for _, ev := range events {
inboundMsg, err := r.makeInboundMessage(ctx, headerCache, ev)
if err != nil {
return fmt.Errorf("make outgoing message: %w", err)
}
logger := log.WithFields(log.Fields{
"paraNonce": paraNonce,
"ethNonce": ethNonce,
"msgNonce": ev.Nonce,
"address": ev.Raw.Address.Hex(),
"blockHash": ev.Raw.BlockHash.Hex(),
"blockNumber": ev.Raw.BlockNumber,
"txHash": ev.Raw.TxHash.Hex(),
"txIndex": ev.Raw.TxIndex,
"channelID": types.H256(ev.ChannelID).Hex(),
})

if ev.Nonce <= paraNonce {
logger.Warn("inbound message outdated, just skipped")
continue
}
nextBlockNumber := new(big.Int).SetUint64(ev.Raw.BlockNumber + 1)

blockHeader, err := ethconn.Client().HeaderByNumber(ctx, nextBlockNumber)
if err != nil {
return fmt.Errorf("get block header: %w", err)
}

// ParentBeaconRoot in https://eips.ethereum.org/EIPS/eip-4788 from Deneb onward
proof, err := beaconHeader.FetchExecutionProof(*blockHeader.ParentBeaconRoot, r.config.InstantVerification)
if errors.Is(err, header.ErrBeaconHeaderNotFinalized) {
logger.Warn("beacon header not finalized, just skipped")
continue
}
err = r.waitAndSend(ctx, ev)
if err != nil {
return fmt.Errorf("fetch execution header proof: %w", err)
}

err = r.writeToParachain(ctx, proof, inboundMsg)
if err != nil {
return fmt.Errorf("write to parachain: %w", err)
}

paraNonce, _ = r.fetchLatestParachainNonce()
if paraNonce != ev.Nonce {
return fmt.Errorf("inbound message fail to execute")
return fmt.Errorf("submit message: %w", err)
}
logger.Info("inbound message executed successfully")
}
}
}
Expand Down Expand Up @@ -388,3 +349,83 @@ func (r *Relay) makeInboundMessage(

return msg, nil
}

func (r *Relay) waitAndSend(ctx context.Context, ev *contracts.GatewayOutboundMessageAccepted) (err error) {
var paraNonce uint64
ethNonce := ev.Nonce
waitingPeriod := (ethNonce + r.config.Schedule.TotalRelayerCount - r.config.Schedule.ID) % r.config.Schedule.TotalRelayerCount

var cnt uint64
for {
paraNonce, err = r.fetchLatestParachainNonce()
if err != nil {
return fmt.Errorf("fetch latest parachain nonce: %w", err)
}
if ethNonce <= paraNonce {
log.Info(fmt.Sprintf("nonce %d picked up by another relayer, just skip", paraNonce))
return nil
}
if cnt == waitingPeriod {
break
}
time.Sleep(time.Duration(r.config.Schedule.SleepInterval) * time.Second)
cnt++
}
err = r.doSubmit(ctx, ev)
if err != nil {
return fmt.Errorf("submit inbound message: %w", err)
}

return nil
}

func (r *Relay) doSubmit(ctx context.Context, ev *contracts.GatewayOutboundMessageAccepted) error {
inboundMsg, err := r.makeInboundMessage(ctx, r.headerCache, ev)
if err != nil {
return fmt.Errorf("make outgoing message: %w", err)
}

logger := log.WithFields(log.Fields{
"ethNonce": ev.Nonce,
"msgNonce": ev.Nonce,
"address": ev.Raw.Address.Hex(),
"blockHash": ev.Raw.BlockHash.Hex(),
"blockNumber": ev.Raw.BlockNumber,
"txHash": ev.Raw.TxHash.Hex(),
"txIndex": ev.Raw.TxIndex,
"channelID": types.H256(ev.ChannelID).Hex(),
})

nextBlockNumber := new(big.Int).SetUint64(ev.Raw.BlockNumber + 1)

blockHeader, err := r.ethconn.Client().HeaderByNumber(ctx, nextBlockNumber)
if err != nil {
return fmt.Errorf("get block header: %w", err)
}

// ParentBeaconRoot in https://eips.ethereum.org/EIPS/eip-4788 from Deneb onward
proof, err := r.beaconHeader.FetchExecutionProof(*blockHeader.ParentBeaconRoot, r.config.InstantVerification)
if errors.Is(err, header.ErrBeaconHeaderNotFinalized) {
logger.Warn("beacon header not finalized, just skipped")
return nil
}
if err != nil {
return fmt.Errorf("fetch execution header proof: %w", err)
}

err = r.writeToParachain(ctx, proof, inboundMsg)
if err != nil {
return fmt.Errorf("write to parachain: %w", err)
}

paraNonce, err := r.fetchLatestParachainNonce()
if err != nil {
return fmt.Errorf("fetch latest parachain nonce: %w", err)
}
if paraNonce != ev.Nonce {
return fmt.Errorf("inbound message fail to execute")
}
logger.Info("inbound message executed successfully")

return nil
}
2 changes: 1 addition & 1 deletion web/packages/test/config/beacon-relay.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@
"endpoint": "ws://127.0.0.1:11144",
"maxWatchedExtrinsics": 8
},
"updateSlotInterval": 316
"updateSlotInterval": 30
}
}
7 changes: 6 additions & 1 deletion web/packages/test/config/execution-relay.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,10 @@
"maxWatchedExtrinsics": 8
}
},
"instantVerification": true
"instantVerification": false,
"schedule": {
"id": null,
"totalRelayerCount": 3,
"sleepInterval": 20
}
}
2 changes: 1 addition & 1 deletion web/packages/test/config/launch-config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ cumulus_based = true
## Penpal
[[parachains]]
id = 2000
chain = "penpal-rococo-2000"
chain = "penpal-westend-2000"
cumulus_based = true

[[parachains.collators]]
Expand Down
67 changes: 60 additions & 7 deletions web/packages/test/scripts/start-relayer.sh
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ config_relayer() {
' \
config/beacon-relay.json >$output_dir/beacon-relay.json

# Configure execution relay for assethub
# Configure execution relay for assethub-0
jq \
--arg eth_endpoint_ws $eth_endpoint_ws \
--arg k1 "$(address_for GatewayProxy)" \
Expand All @@ -115,8 +115,35 @@ config_relayer() {
.source.ethereum.endpoint = $eth_endpoint_ws
| .source.contracts.Gateway = $k1
| .source."channel-id" = $channelID
| .schedule.id = 0
' \
config/execution-relay.json >$output_dir/execution-relay-asset-hub.json
config/execution-relay.json >$output_dir/execution-relay-asset-hub-0.json

# Configure execution relay for assethub-1
jq \
--arg eth_endpoint_ws $eth_endpoint_ws \
--arg k1 "$(address_for GatewayProxy)" \
--arg channelID $ASSET_HUB_CHANNEL_ID \
'
.source.ethereum.endpoint = $eth_endpoint_ws
| .source.contracts.Gateway = $k1
| .source."channel-id" = $channelID
| .schedule.id = 1
' \
config/execution-relay.json >$output_dir/execution-relay-asset-hub-1.json

# Configure execution relay for assethub-2
jq \
--arg eth_endpoint_ws $eth_endpoint_ws \
--arg k1 "$(address_for GatewayProxy)" \
--arg channelID $ASSET_HUB_CHANNEL_ID \
'
.source.ethereum.endpoint = $eth_endpoint_ws
| .source.contracts.Gateway = $k1
| .source."channel-id" = $channelID
| .schedule.id = 2
' \
config/execution-relay.json >$output_dir/execution-relay-asset-hub-2.json

# Configure execution relay for penpal
jq \
Expand Down Expand Up @@ -211,15 +238,41 @@ start_relayer() {
done
) &

# Launch execution relay for assethub
# Launch execution relay for assethub-0
(
: >$output_dir/execution-relay-asset-hub.log
: >$output_dir/execution-relay-asset-hub-0.log
while :; do
echo "Starting execution relay (asset-hub) at $(date)"
echo "Starting execution relay (asset-hub-0) at $(date)"
"${relay_bin}" run execution \
--config $output_dir/execution-relay-asset-hub.json \
--config $output_dir/execution-relay-asset-hub-0.json \
--substrate.private-key "//ExecutionRelayAssetHub" \
>>"$output_dir"/execution-relay-asset-hub.log 2>&1 || true
>>"$output_dir"/execution-relay-asset-hub-0.log 2>&1 || true
sleep 20
done
) &

# Launch execution relay for assethub-1
(
: >$output_dir/execution-relay-asset-hub-1.log
while :; do
echo "Starting execution relay (asset-hub-1) at $(date)"
"${relay_bin}" run execution \
--config $output_dir/execution-relay-asset-hub-1.json \
--substrate.private-key "//Alice" \
>>"$output_dir"/execution-relay-asset-hub-1.log 2>&1 || true
sleep 20
done
) &

# Launch execution relay for assethub-2
(
: >$output_dir/execution-relay-asset-hub-2.log
while :; do
echo "Starting execution relay (asset-hub-2) at $(date)"
"${relay_bin}" run execution \
--config $output_dir/execution-relay-asset-hub-2.json \
--substrate.private-key "//Bob" \
>>"$output_dir"/execution-relay-asset-hub-2.log 2>&1 || true
sleep 20
done
) &
Expand Down