Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 153 additions & 38 deletions hathor/nanocontracts/execution/block_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@

import hashlib
import traceback
from collections import defaultdict
from dataclasses import dataclass
from typing import TYPE_CHECKING, Callable, Iterator, Mapping, TypeAlias, cast

from hathor.feature_activation.utils import Features
from hathor.nanocontracts.exception import NCFail
from hathor.nanocontracts.exception import NCFail, NCInsufficientFunds
from hathor.nanocontracts.nano_runtime_version import NanoRuntimeVersion
from hathor.transaction import Block, Transaction
from hathorlib.nanocontracts.runner.call_info import CallInfo
from hathorlib.nanocontracts.runner.runner import MAX_SEQNUM_JUMP_SIZE
from hathorlib.nanocontracts.types import Address, BlueprintId, ContractId, NCRawArgs, VertexId

Expand All @@ -48,25 +48,30 @@
class NCTxExecutionSuccess:
"""Result type for successful NC execution."""
tx: Transaction
runner: 'Runner'
block_storage: 'NCBlockStorage'
runner: 'Runner | None' = None


@dataclass(slots=True, frozen=True)
class NCTxExecutionFailure:
"""Result type for failed NC execution.

`call_info` is None for cyclic-dependency failures, which never run a Runner.
`runner` is None for cyclic-dependency failures and failed transfer-header executions.
"""
tx: Transaction
call_info: CallInfo | None
block_storage: 'NCBlockStorage | None'
runner: 'Runner | None'
exception: NCFail
traceback: str
persist_block_storage: bool = False


@dataclass(slots=True, frozen=True)
class NCTxExecutionSkipped:
"""Result type for skipped NC execution (voided transactions)."""
tx: Transaction
block_storage: 'NCBlockStorage | None' = None
persist_block_storage: bool = False


NCTxExecutionResult: TypeAlias = NCTxExecutionSuccess | NCTxExecutionFailure | NCTxExecutionSkipped
Expand Down Expand Up @@ -136,7 +141,7 @@ def get_token_description(self, token_id: 'TokenUid') -> 'TokenDescription':

class NCBlockExecutor:
"""
Pure execution of nano contract transactions in a block.
Pure execution of nano contract and transfer-header transactions in a block.

This class contains the core NC execution logic without any side effects.
It yields execution events that can be processed by a caller to apply
Expand Down Expand Up @@ -197,15 +202,17 @@ def execute_block(
parent_root_id = parent_meta.nc_block_root_id
assert parent_root_id is not None

nc_calls: list[Transaction] = []
stateful_txs: list[Transaction] = []
for tx in block.iter_transactions_in_this_block():
if not tx.is_nano_contract():
if not tx.is_nano_contract() and not tx.has_transfer_header():
continue
nc_calls.append(tx)
stateful_txs.append(tx)

sorted_txs = self._nc_calls_sorter(block, nc_calls) if nc_calls else None
nc_sorted_calls = sorted_txs.sorted if sorted_txs else tuple()
nc_cyclic_txs = sorted_txs.cyclic if sorted_txs else tuple()
sorted_txs = self._nc_calls_sorter(block, stateful_txs) if stateful_txs else None
stateful_sorted_txs = sorted_txs.sorted if sorted_txs else tuple()
cyclic_txs = sorted_txs.cyclic if sorted_txs else tuple()
nc_sorted_calls = tuple(tx for tx in stateful_sorted_txs if tx.is_nano_contract())
nc_cyclic_txs = tuple(tx for tx in cyclic_txs if tx.is_nano_contract())
block_storage = self._nc_storage_factory.get_block_storage(parent_root_id)
assert block_storage.get_root_id() == parent_root_id
features = Features.from_vertex(settings=self._settings, feature_service=self._feature_service, vertex=block)
Expand All @@ -218,7 +225,7 @@ def execute_block(
nc_cyclic_fails=nc_cyclic_txs,
)

for tx in nc_cyclic_txs:
for tx in cyclic_txs:
yield NCBeginTransaction(tx=tx, rng_seed=b'')
try:
# Dummy raise just to create an exception context and convert
Expand All @@ -227,18 +234,22 @@ def execute_block(
except NCFail as e:
yield NCTxExecutionFailure(
tx=tx,
call_info=None,
block_storage=None,
runner=None,
exception=e,
traceback=traceback.format_exc(),
)
yield NCEndTransaction(tx=tx)

seed_hasher = hashlib.sha256(block.hash)

for tx in nc_sorted_calls:
current_root_id = parent_root_id

for tx in stateful_sorted_txs:
tx_block_storage = self._nc_storage_factory.get_block_storage(current_root_id)
# Compute RNG seed for this transaction
seed_hasher.update(tx.hash)
seed_hasher.update(block_storage.get_root_id())
seed_hasher.update(current_root_id)
rng_seed = seed_hasher.digest()

yield NCBeginTransaction(tx=tx, rng_seed=rng_seed)
Expand All @@ -247,17 +258,30 @@ def execute_block(
result = self.execute_transaction(
runtime_version=features.nano_runtime_version,
tx=tx,
block_storage=block_storage,
block_storage=tx_block_storage,
rng_seed=rng_seed,
should_skip=should_skip,
)
yield result

match result:
case NCTxExecutionSuccess(block_storage=result_block_storage):
current_root_id = result_block_storage.get_root_id()
case NCTxExecutionFailure(block_storage=result_block_storage, persist_block_storage=True):
assert result_block_storage is not None
current_root_id = result_block_storage.get_root_id()
case NCTxExecutionSkipped(block_storage=result_block_storage, persist_block_storage=True):
assert result_block_storage is not None
current_root_id = result_block_storage.get_root_id()
case _:
pass

yield NCEndTransaction(tx=tx)

# Compute final root ID without committing
final_root_id = block_storage.get_root_id()
if not nc_sorted_calls:
final_root_id = current_root_id
block_storage = self._nc_storage_factory.get_block_storage(final_root_id)
if not stateful_sorted_txs:
assert final_root_id == parent_root_id
yield NCEndBlock(
block=block,
Expand Down Expand Up @@ -288,24 +312,38 @@ def execute_transaction(
"""
if should_skip(tx):
# Skip transactions based on the caller-provided predicate.
# Check if seqnum needs to be updated.
nc_header = tx.get_nano_header()
nc_address = Address(nc_header.nc_address)
seqnum = block_storage.get_address_seqnum(nc_address)
if nc_header.nc_seqnum > seqnum:
block_storage.set_address_seqnum(nc_address, nc_header.nc_seqnum)
if tx.is_nano_contract():
nc_header = tx.get_nano_header()
nc_address = Address(nc_header.nc_address)
seqnum = block_storage.get_address_seqnum(nc_address)
if nc_header.nc_seqnum > seqnum:
block_storage.set_address_seqnum(nc_address, nc_header.nc_seqnum)
return NCTxExecutionSkipped(tx=tx, block_storage=block_storage, persist_block_storage=True)
return NCTxExecutionSkipped(tx=tx)

if not tx.is_nano_contract():
try:
self._verify_transfer_header_balances(block_storage, tx)
self._verify_transfer_header_seqnums(block_storage, tx)
self._apply_transfer_header_diffs(block_storage, self._get_transfer_header_diffs(tx))
self._apply_transfer_header_seqnums(block_storage, tx)
except NCFail as e:
return NCTxExecutionFailure(
tx=tx,
block_storage=None,
runner=None,
exception=e,
traceback=traceback.format_exc(),
)
return NCTxExecutionSuccess(tx=tx, block_storage=block_storage)

runner = self._runner_factory.create(
runtime_version=runtime_version,
block_storage=block_storage,
seed=rng_seed,
)

try:
assert isinstance(tx, Transaction)

# Check seqnum.
nano_header = tx.get_nano_header()

if nano_header.is_creating_a_new_contract():
Expand All @@ -314,18 +352,12 @@ def execute_transaction(
contract_id = ContractId(VertexId(nano_header.nc_id))

assert nano_header.nc_seqnum >= 0
current_seqnum = runner.block_storage.get_address_seqnum(
Address(nano_header.nc_address)
)
current_seqnum = runner.block_storage.get_address_seqnum(Address(nano_header.nc_address))
diff = nano_header.nc_seqnum - current_seqnum
if diff <= 0 or diff > MAX_SEQNUM_JUMP_SIZE:
# Fail execution if seqnum is invalid.
runner._last_call_info = runner._build_call_info(contract_id)
# TODO: Set the seqnum in this case?
raise NCFail(f'invalid seqnum (diff={diff})')
runner.block_storage.set_address_seqnum(
Address(nano_header.nc_address), nano_header.nc_seqnum
)
runner.block_storage.set_address_seqnum(Address(nano_header.nc_address), nano_header.nc_seqnum)

vertex_metadata = tx.get_metadata()
assert vertex_metadata.first_block is not None, (
Expand All @@ -348,19 +380,102 @@ def execute_transaction(
# and at this point no tokens pending creation, so we can validate the balances
self._verify_transparent_balance_after_execution(tx, block_storage, runner)
except NCFail as e:
self._ensure_runner_has_last_call_info(tx, runner)
runner.discard_pending_changes()
return NCTxExecutionFailure(
tx=tx,
call_info=runner.get_last_call_info(),
block_storage=block_storage,
runner=runner,
exception=e,
traceback=traceback.format_exc(),
persist_block_storage=True,
)

# Commit is intentionally outside the NCFail handling path.
# A failure here indicates critical state corruption and must propagate.
runner.commit_pending_changes()

return NCTxExecutionSuccess(tx=tx, runner=runner)
return NCTxExecutionSuccess(tx=tx, block_storage=block_storage, runner=runner)

def _get_transfer_header_diffs(self, tx: Transaction) -> dict[tuple['Address', 'TokenUid'], int]:
from hathor.nanocontracts.types import Address, TokenUid

diffs: defaultdict[tuple[Address, TokenUid], int] = defaultdict(int)
if not tx.has_transfer_header():
return dict(diffs)

transfer_header = tx.get_transfer_header()
for txin in transfer_header.inputs:
token_uid = TokenUid(tx.get_token_uid(txin.token_index))
input_address = transfer_header.addresses[txin.address_index]
diffs[(Address(input_address.address), token_uid)] -= txin.amount

for txout in transfer_header.outputs:
token_uid = TokenUid(tx.get_token_uid(txout.token_index))
diffs[(Address(txout.address), token_uid)] += txout.amount

return dict(diffs)

def _verify_transfer_header_balances(
self,
block_storage: 'NCBlockStorage',
tx: Transaction,
) -> None:
transfer_header_diffs = self._get_transfer_header_diffs(tx)
for (address, token_uid), diff in transfer_header_diffs.items():
if diff >= 0:
continue

balance = block_storage.get_address_balance(address, token_uid)
if balance + diff < 0:
raise NCInsufficientFunds(
f'insufficient transfer-header balance for address={address.hex()} '
f'token={token_uid.hex()}: available={balance} required={-diff}'
)

def _verify_transfer_header_seqnums(self, block_storage: 'NCBlockStorage', tx: Transaction) -> None:
if not tx.has_transfer_header():
return

transfer_header = tx.get_transfer_header()
for input_address in transfer_header.addresses:
current_seqnum = block_storage.get_address_seqnum(Address(input_address.address))
diff = input_address.seqnum - current_seqnum
if diff <= 0 or diff > MAX_SEQNUM_JUMP_SIZE:
raise NCFail(f'invalid transfer-header seqnum (diff={diff})')

def _apply_transfer_header_diffs(
self,
block_storage: 'NCBlockStorage',
transfer_header_diffs: dict[tuple['Address', 'TokenUid'], int],
) -> None:
from hathor.nanocontracts.types import Amount

for (address, token_uid), diff in transfer_header_diffs.items():
if diff == 0:
continue
block_storage.add_address_balance(address, Amount(diff), token_uid)

def _apply_transfer_header_seqnums(self, block_storage: 'NCBlockStorage', tx: Transaction) -> None:
if not tx.has_transfer_header():
return

transfer_header = tx.get_transfer_header()
for input_address in transfer_header.addresses:
block_storage.set_address_seqnum(Address(input_address.address), input_address.seqnum)

def _ensure_runner_has_last_call_info(self, tx: Transaction, runner: 'Runner') -> None:
from hathor.nanocontracts.types import ContractId, VertexId

if runner._last_call_info is not None:
return

nano_header = tx.get_nano_header()
if nano_header.is_creating_a_new_contract():
contract_id = ContractId(VertexId(tx.hash))
else:
contract_id = ContractId(VertexId(nano_header.nc_id))
runner._last_call_info = runner._build_call_info(contract_id)

def _verify_transparent_balance_after_execution(
self,
Expand Down
40 changes: 36 additions & 4 deletions hathor/nanocontracts/execution/consensus_block_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,13 @@ def _apply_effect(
# as that is added by the executor itself after a failure
assert NC_EXECUTION_FAIL_ID not in tx_meta.voided_by

case NCTxExecutionSuccess(tx=tx, runner=runner):
case NCTxExecutionSuccess(tx=tx, block_storage=block_storage, runner=runner):
from hathor.nanocontracts.runner.call_info import CallType

if runner is None:
block_storage.commit()
return

tx_meta = tx.get_metadata()

tx_meta.nc_execution = NCExecutionState.SUCCESS
Expand All @@ -262,6 +266,7 @@ def _apply_effect(
# for the block_storage. This ensures we will have a clean database with
# no orphan nodes.
runner.commit()
block_storage.commit()

# Derive call_info, nc_calls_records, and events from runner
call_info = runner.get_last_call_info()
Expand Down Expand Up @@ -297,7 +302,28 @@ def _apply_effect(
# Save logs
self._nc_log_storage.save_logs(tx, call_info.nc_logger.__entries__, None)

case NCTxExecutionFailure(tx=tx, call_info=call_info, exception=exception, traceback=tb):
case NCTxExecutionFailure(
tx=tx,
block_storage=block_storage,
runner=runner,
exception=exception,
traceback=tb,
persist_block_storage=persist_block_storage,
):
if persist_block_storage:
assert block_storage is not None
block_storage.commit()

if runner is None and not tx.is_nano_contract():
self.log.info(
'transfer-header execution failed',
tx=tx.hash.hex(),
error=repr(exception),
cause=repr(exception.__cause__),
)
on_failure(tx)
return

# Log the failure
kwargs: dict[str, Any] = {}
if tx.name:
Expand All @@ -315,10 +341,16 @@ def _apply_effect(
on_failure(tx)

# Save logs with exception info
log_entries = call_info.nc_logger.__entries__ if call_info is not None else []
failure_call_info = runner.get_last_call_info() if runner is not None else None
log_entries = failure_call_info.nc_logger.__entries__ if failure_call_info is not None else []
self._nc_log_storage.save_logs(tx, log_entries, (exception, tb))

case NCTxExecutionSkipped(tx=tx):
case NCTxExecutionSkipped(tx=tx, block_storage=block_storage, persist_block_storage=persist_block_storage):
if persist_block_storage:
assert block_storage is not None
block_storage.commit()
if not tx.is_nano_contract():
return
tx_meta = tx.get_metadata()
tx_meta.nc_execution = NCExecutionState.SKIPPED
context.save(tx)
Expand Down
Loading
Loading