Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
42ab953
feat(storage): parse finalize_time and server crc32c in async object …
chandra-siri May 26, 2026
d972c2c
feat(storage): implement rolling checksum and verification in reads r…
chandra-siri May 26, 2026
b4a7a84
feat(storage): integrate full-object checksum in AsyncMultiRangeDownl…
chandra-siri May 26, 2026
7110744
style(storage): apply user feedback on finalize_time and fix unused i…
chandra-siri May 26, 2026
a437b66
merge: integrate updated finalize_time checks and lint fixes from PR 1
chandra-siri May 26, 2026
84b19d1
merge: integrate all updates and lint fixes from PR 1 and PR 2
chandra-siri May 26, 2026
9704da7
merge: integrate latest main to resolve pytest-asyncio event loop fai…
chandra-siri May 27, 2026
31ef2e3
style(storage): run ruff format to resolve CI lint failures
chandra-siri May 27, 2026
25c39e2
merge: integrate PR 1 updates including ruff formatting and pytest-as…
chandra-siri May 27, 2026
978b30d
merge: integrate PR 1 and PR 2 updates including ruff formatting and …
chandra-siri May 27, 2026
c371f59
test(storage): add conftest autouse event loop fixture to resolve pyt…
chandra-siri May 27, 2026
21144f9
merge: integrate PR 1 updates including conftest.py event loop fixture
chandra-siri May 27, 2026
ce827d5
merge: integrate PR 1 and PR 2 updates including conftest.py event lo…
chandra-siri May 27, 2026
c470641
fix(storage): use second attribute instead of seconds for DatetimeWit…
chandra-siri May 27, 2026
3820424
merge: integrate PR 1 second attribute fixes
chandra-siri May 27, 2026
0b15f6d
merge: integrate PR 1 and PR 2 updates including second attribute fixes
chandra-siri May 27, 2026
2f49dcb
perf(storage): bypass rolling checksum updates when checksum validati…
chandra-siri May 27, 2026
ff33752
merge: integrate PR 2 updates including Gemini feedback fixes
chandra-siri May 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
_ReadResumptionStrategy,
)

from google.cloud.storage.exceptions import DataCorruption
from ._utils import raise_if_no_fast_crc32c

_MAX_READ_RANGES_PER_BIDI_READ_REQUEST = 100
Expand Down Expand Up @@ -219,8 +220,6 @@ def __init__(
)
generation = kwargs.pop("generation_number")

raise_if_no_fast_crc32c()

self.client = client
self.bucket_name = bucket_name
self.object_name = object_name
Expand All @@ -232,6 +231,8 @@ def __init__(
self._multiplexer: Optional[_StreamMultiplexer] = None
self.persisted_size: Optional[int] = None # updated after opening the stream
self._open_retries: int = 0
self.is_finalized: bool = False
self.full_obj_server_crc32c: Optional[int] = None

async def __aenter__(self):
"""Opens the underlying bidi-gRPC connection to read from the object."""
Expand Down Expand Up @@ -327,6 +328,8 @@ async def _do_open():
self.read_handle = self.read_obj_str.read_handle
if self.read_obj_str.persisted_size is not None:
self.persisted_size = self.read_obj_str.persisted_size
self.is_finalized = self.read_obj_str.is_finalized
self.full_obj_server_crc32c = self.read_obj_str.full_obj_server_crc32c

self._is_stream_open = True

Expand Down Expand Up @@ -363,6 +366,8 @@ async def factory():
self.generation = stream.generation_number
if stream.read_handle:
self.read_handle = stream.read_handle
self.is_finalized = stream.is_finalized
self.full_obj_server_crc32c = stream.full_obj_server_crc32c

self.read_obj_str = stream
self._is_stream_open = True
Expand All @@ -377,6 +382,7 @@ async def download_ranges(
lock: asyncio.Lock = None,
retry_policy: Optional[AsyncRetry] = None,
metadata: Optional[List[Tuple[str, str]]] = None,
enable_checksum: bool = True,
) -> None:
"""Downloads multiple byte ranges from the object into the buffers
provided by user with automatic retries.
Expand Down Expand Up @@ -412,6 +418,9 @@ async def download_ranges(
"Invalid input - length of read_ranges cannot be more than 1000"
)

if enable_checksum:
raise_if_no_fast_crc32c()

if not self._is_stream_open:
raise ValueError("Underlying bidi-gRPC stream is not open")

Expand All @@ -422,16 +431,29 @@ async def download_ranges(
download_states = {}
for read_range in read_ranges:
read_id = generate_random_56_bit_integer()
# Unpack tuple into self-documenting variable names to improve readability.
offset, length, user_buffer = read_range

# Heuristic to detect full object reads:
# - Implicit full object read: start offset is 0 and length is 0 (read all).
# - Explicit full object read: start offset is 0 and length matches the exact persisted size.
is_full_object_read = (
(offset == 0 and length == 0) or
(self.persisted_size is not None and offset == 0 and length == self.persisted_size)
)
Comment thread
chandra-siri marked this conversation as resolved.
download_states[read_id] = _DownloadState(
initial_offset=read_range[0],
initial_length=read_range[1],
user_buffer=read_range[2],
initial_offset=offset,
initial_length=length,
user_buffer=user_buffer,
is_full_object_read=is_full_object_read,
)

initial_state = {
"download_states": download_states,
"read_handle": self.read_handle,
"routing_token": None,
"enable_checksum": enable_checksum,
"full_obj_server_crc32c": self.full_obj_server_crc32c,
}

read_ids = set(download_states.keys())
Expand Down Expand Up @@ -519,12 +541,18 @@ async def generator():
strategy, send_and_recv_via_multiplexer
)

await retry_manager.execute(initial_state, retry_policy)
try:
await retry_manager.execute(initial_state, retry_policy)
except DataCorruption:
if self.is_stream_open:
await self.close()
raise

if initial_state.get("read_handle"):
self.read_handle = initial_state["read_handle"]
finally:
self._multiplexer.unregister(read_ids)
if self._multiplexer is not None:
self._multiplexer.unregister(read_ids)

async def close(self):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
from typing import List, Optional, Tuple

from google.api_core.bidi_async import AsyncBidiRpc
Expand Down Expand Up @@ -79,6 +80,9 @@ def __init__(
self.socket_like_rpc: Optional[AsyncBidiRpc] = None
self._is_stream_open: bool = False
self.persisted_size: Optional[int] = None
self.is_finalized: bool = False
self.full_obj_server_crc32c: Optional[int] = None
self.object_metadata: Optional[_storage_v2.Object] = None

async def open(self, metadata: Optional[List[Tuple[str, str]]] = None) -> None:
"""Opens the bidi-gRPC connection to read from the object.
Expand Down Expand Up @@ -132,6 +136,21 @@ async def open(self, metadata: Optional[List[Tuple[str, str]]] = None) -> None:
self.generation_number = response.metadata.generation
# update persisted size
self.persisted_size = response.metadata.size
self.object_metadata = response.metadata
# Since full object checksum validation is only required for finalized objects,
# check finalize_time (which is DatetimeWithNanoseconds/datetime in production, or mocked in tests).
finalize_time = getattr(response.metadata, "finalize_time", None)
if finalize_time:
is_finalized_val = False
if isinstance(finalize_time, datetime.datetime):
is_finalized_val = True
elif hasattr(finalize_time, "seconds") and finalize_time.seconds > 0:
is_finalized_val = True

if is_finalized_val:
self.is_finalized = True
if hasattr(response.metadata, "checksums") and response.metadata.checksums:
self.full_obj_server_crc32c = response.metadata.checksums.crc32c

if response and response.read_handle:
self.read_handle = response.read_handle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,16 @@ class _DownloadState:
"""A helper class to track the state of a single range download."""

def __init__(
self, initial_offset: int, initial_length: int, user_buffer: IO[bytes]
self, initial_offset: int, initial_length: int, user_buffer: IO[bytes], is_full_object_read: bool = False
):
self.initial_offset = initial_offset
self.initial_length = initial_length
self.user_buffer = user_buffer
self.bytes_written = 0
self.next_expected_offset = initial_offset
self.is_complete = False
self.is_full_object_read = is_full_object_read
self.rolling_checksum = google_crc32c.Checksum() if is_full_object_read else None


class _ReadResumptionStrategy(_BaseResumptionStrategy):
Expand Down Expand Up @@ -90,6 +92,7 @@ def update_state_from_response(
)

download_states = state["download_states"]
checksum_enabled = state.get("enable_checksum", True)

for object_data_range in proto.object_data_ranges:
# Ignore empty ranges or ranges for IDs not in our state
Expand Down Expand Up @@ -125,7 +128,7 @@ def update_state_from_response(
checksummed_data = object_data_range.checksummed_data
data = checksummed_data.content

if checksummed_data.HasField("crc32c"):
if checksum_enabled and checksummed_data.HasField("crc32c"):
server_checksum = checksummed_data.crc32c
client_checksum = google_crc32c.value(data)
if server_checksum != client_checksum:
Expand All @@ -138,10 +141,14 @@ def update_state_from_response(
# Update State & Write Data
chunk_size = len(data)
read_state.user_buffer.write(data)

# Commit updates only after the write succeeds
if read_state.rolling_checksum is not None:
read_state.rolling_checksum.update(data)
read_state.bytes_written += chunk_size
read_state.next_expected_offset += chunk_size

# Final Byte Count Verification
# Final Byte Count & Full Object Checksum Verification
if object_data_range.range_end:
read_state.is_complete = True
if (
Expand All @@ -154,6 +161,22 @@ def update_state_from_response(
f"Expected {read_state.initial_length}, got {read_state.bytes_written}",
)

# Perform full-object checksum verification once the stream finishes.
if read_state.is_full_object_read and checksum_enabled:
full_obj_server_crc32c = state.get("full_obj_server_crc32c")
if full_obj_server_crc32c is not None:
# Use standard big-endian byte conversion to retrieve the rolling checksum value.
client_checksum = int.from_bytes(
read_state.rolling_checksum.digest(),
byteorder="big",
)
if client_checksum != full_obj_server_crc32c:
raise DataCorruption(
response,
f"Full object checksum mismatch for read_id {read_id}. "
f"Server authoritative crc32c: {full_obj_server_crc32c}, client calculated rolling: {client_checksum}.",
)

async def recover_state_on_failure(self, error: Exception, state: Any) -> None:
"""Handles BidiReadObjectRedirectedError for reads."""
routing_token, read_handle = _handle_redirect(error)
Expand Down
78 changes: 78 additions & 0 deletions packages/google-cloud-storage/tests/system/test_zonal.py
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a system test that fails if the validation is skipped or applied incorrectly?
If not, consider adding a test that simulates a validation failure to verify that the corruption error is bubbled up the stack.

Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
ObjectContexts,
ObjectCustomContextPayload,
)
from google.cloud.storage.exceptions import DataCorruption

pytestmark = pytest.mark.skipif(
os.getenv("RUN_ZONAL_SYSTEM_TESTS") != "True",
Expand Down Expand Up @@ -961,3 +962,80 @@ async def _run():
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))

event_loop.run_until_complete(_run())


@pytest.mark.parametrize(
"read_start, read_length, enable_checksum",
[
(0, 0, True),
(0, 1024 * 1024, True),
(0, 0, False),
],
)
def test_mrd_checksum_validation(
storage_client, blobs_to_delete, event_loop, grpc_client_direct, read_start, read_length, enable_checksum
):
"""
Tests full downloads with specified offset, length, and enable_checksum toggle on finalized objects.
"""
object_size = 1024 * 1024 # 1MB
object_name = f"test_mrd_chksum-{uuid.uuid4()}"

async def _run():
object_data = os.urandom(object_size)

writer = AsyncAppendableObjectWriter(
grpc_client_direct, _ZONAL_BUCKET, object_name
)
await writer.open()
await writer.append(object_data)
await writer.close(finalize_on_close=True)

async with AsyncMultiRangeDownloader(
grpc_client_direct, _ZONAL_BUCKET, object_name
) as mrd:
buffer = BytesIO()
await mrd.download_ranges([(read_start, read_length, buffer)], enable_checksum=enable_checksum)
assert buffer.getvalue() == object_data

# cleanup
del writer
gc.collect()
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))

event_loop.run_until_complete(_run())


def test_mrd_checksum_unfinalized_appendable_skipped(
storage_client, blobs_to_delete, event_loop, grpc_client_direct
):
"""
Verifies that live, unfinalized appendable objects skip the full-object checksum check
naturally without raising any exceptions.
"""
object_name = f"test_mrd_chksum_unfin-{uuid.uuid4()}"

async def _run():
writer = AsyncAppendableObjectWriter(
grpc_client_direct, _ZONAL_BUCKET, object_name
)
await writer.open()
await writer.append(_BYTES_TO_UPLOAD)
await writer.flush() # Flushed but not finalized!

# Download the unfinalized appendable object with enable_checksum=True
async with AsyncMultiRangeDownloader(
grpc_client_direct, _ZONAL_BUCKET, object_name
) as mrd:
buffer = BytesIO()
# Since it's unfinalized, it should skip the checksum check without raising
await mrd.download_ranges([(0, 0, buffer)], enable_checksum=True)
assert buffer.getvalue() == _BYTES_TO_UPLOAD

# cleanup
await writer.close()
del writer
gc.collect()
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))

event_loop.run_until_complete(_run())
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,24 @@ def test_initialization(self):
self.assertEqual(state.bytes_written, 0)
self.assertEqual(state.next_expected_offset, initial_offset)
self.assertFalse(state.is_complete)
self.assertFalse(state.is_full_object_read)
self.assertIsNone(state.rolling_checksum)

def test_initialization_with_full_object_read(self):
"""Test that _DownloadState initializes correctly when is_full_object_read is True."""
initial_offset = 10
initial_length = 100
user_buffer = io.BytesIO()
state_full = _DownloadState(initial_offset, initial_length, user_buffer, is_full_object_read=True)

self.assertEqual(state_full.initial_offset, initial_offset)
self.assertEqual(state_full.initial_length, initial_length)
self.assertEqual(state_full.user_buffer, user_buffer)
self.assertEqual(state_full.bytes_written, 0)
self.assertEqual(state_full.next_expected_offset, initial_offset)
self.assertFalse(state_full.is_complete)
self.assertTrue(state_full.is_full_object_read)
self.assertIsNotNone(state_full.rolling_checksum)


class TestReadResumptionStrategy(unittest.TestCase):
Expand All @@ -53,12 +71,12 @@ def setUp(self):

self.state = {"download_states": {}, "read_handle": None, "routing_token": None}

def _add_download(self, read_id, offset=0, length=100, buffer=None):
def _add_download(self, read_id, offset=0, length=100, buffer=None, is_full_object_read=False):
"""Helper to inject a download state into the correct nested location."""
if buffer is None:
buffer = io.BytesIO()
state = _DownloadState(
initial_offset=offset, initial_length=length, user_buffer=buffer
initial_offset=offset, initial_length=length, user_buffer=buffer, is_full_object_read=is_full_object_read
)
self.state["download_states"][read_id] = state
return state
Expand Down Expand Up @@ -358,3 +376,40 @@ async def run():

# Token should remain unchanged
self.assertEqual(self.state["routing_token"], "existing-token")

def test_update_state_full_object_checksum_success(self):
"""Test that full object checksum verification succeeds on range_end."""
read_state = self._add_download(_READ_ID, offset=0, length=9, is_full_object_read=True)
self.state["enable_checksum"] = True
self.state["full_obj_server_crc32c"] = google_crc32c.value(b"testdata1")

resp1 = self._create_response(b"test", _READ_ID, offset=0)
self.strategy.update_state_from_response(resp1, self.state)

resp2 = self._create_response(b"data1", _READ_ID, offset=4, range_end=True)
self.strategy.update_state_from_response(resp2, self.state)

self.assertTrue(read_state.is_complete)
self.assertEqual(read_state.bytes_written, 9)

def test_update_state_full_object_checksum_failure(self):
"""Test that full object checksum verification raises DataCorruption on mismatch at range_end."""
self._add_download(_READ_ID, offset=0, length=9, is_full_object_read=True)
self.state["enable_checksum"] = True
self.state["full_obj_server_crc32c"] = 111111 # Wrong server checksum!

resp1 = self._create_response(b"test", _READ_ID, offset=0)
self.strategy.update_state_from_response(resp1, self.state)

resp2 = self._create_response(b"data1", _READ_ID, offset=4, range_end=True)
with self.assertRaisesRegex(DataCorruption, "Full object checksum mismatch"):
self.strategy.update_state_from_response(resp2, self.state)

def test_update_state_checksum_mismatch_ignored_when_disabled(self):
"""Test that a CRC32C mismatch is ignored when enable_checksum is False."""
self._add_download(_READ_ID)
self.state["enable_checksum"] = False
response = self._create_response(b"data", _READ_ID, offset=0, crc=999999)

# Should NOT raise DataCorruption!
self.strategy.update_state_from_response(response, self.state)
Loading