Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
42ab953
feat(storage): parse finalize_time and server crc32c in async object …
chandra-siri May 26, 2026
d972c2c
feat(storage): implement rolling checksum and verification in reads r…
chandra-siri May 26, 2026
b4a7a84
feat(storage): integrate full-object checksum in AsyncMultiRangeDownl…
chandra-siri May 26, 2026
7110744
style(storage): apply user feedback on finalize_time and fix unused i…
chandra-siri May 26, 2026
a437b66
merge: integrate updated finalize_time checks and lint fixes from PR 1
chandra-siri May 26, 2026
84b19d1
merge: integrate all updates and lint fixes from PR 1 and PR 2
chandra-siri May 26, 2026
9704da7
merge: integrate latest main to resolve pytest-asyncio event loop fai…
chandra-siri May 27, 2026
31ef2e3
style(storage): run ruff format to resolve CI lint failures
chandra-siri May 27, 2026
25c39e2
merge: integrate PR 1 updates including ruff formatting and pytest-as…
chandra-siri May 27, 2026
978b30d
merge: integrate PR 1 and PR 2 updates including ruff formatting and …
chandra-siri May 27, 2026
c371f59
test(storage): add conftest autouse event loop fixture to resolve pyt…
chandra-siri May 27, 2026
21144f9
merge: integrate PR 1 updates including conftest.py event loop fixture
chandra-siri May 27, 2026
ce827d5
merge: integrate PR 1 and PR 2 updates including conftest.py event lo…
chandra-siri May 27, 2026
c470641
fix(storage): use second attribute instead of seconds for DatetimeWit…
chandra-siri May 27, 2026
3820424
merge: integrate PR 1 second attribute fixes
chandra-siri May 27, 2026
0b15f6d
merge: integrate PR 1 and PR 2 updates including second attribute fixes
chandra-siri May 27, 2026
2f49dcb
perf(storage): bypass rolling checksum updates when checksum validati…
chandra-siri May 27, 2026
ff33752
merge: integrate PR 2 updates including Gemini feedback fixes
chandra-siri May 27, 2026
9036202
Merge remote-tracking branch 'origin/main' into feat/downloader-check…
chandra-siri Jun 4, 2026
36511bc
style(storage): fix lints and reformat code
chandra-siri Jun 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@
from google.cloud.exceptions import NotFound

from google.cloud.storage._opentelemetry_tracing import (
create_trace_span as _base_create_trace_span,
_is_bucket_metadata_disabled,
)
from google.cloud.storage._opentelemetry_tracing import (
create_trace_span as _base_create_trace_span,
)
from google.cloud.storage.constants import _DEFAULT_TIMEOUT
from google.cloud.storage.retry import (
DEFAULT_RETRY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
from google.cloud.storage import __version__, _helpers
from google.cloud.storage._opentelemetry_tracing import (
HAS_OPENTELEMETRY,
_is_bucket_metadata_disabled,
create_trace_span,
enable_otel_traces,
_is_bucket_metadata_disabled,
)

logger = logging.getLogger(__name__)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
_DownloadState,
_ReadResumptionStrategy,
)
from google.cloud.storage.exceptions import DataCorruption

from ._utils import raise_if_no_fast_crc32c

Expand Down Expand Up @@ -219,8 +220,6 @@ def __init__(
)
generation = kwargs.pop("generation_number")

raise_if_no_fast_crc32c()

self.client = client
self.bucket_name = bucket_name
self.object_name = object_name
Expand All @@ -232,6 +231,8 @@ def __init__(
self._multiplexer: Optional[_StreamMultiplexer] = None
self.persisted_size: Optional[int] = None # updated after opening the stream
self._open_retries: int = 0
self.is_finalized: bool = False
self.full_obj_server_crc32c: Optional[int] = None

async def __aenter__(self):
"""Opens the underlying bidi-gRPC connection to read from the object."""
Expand Down Expand Up @@ -327,6 +328,8 @@ async def _do_open():
self.read_handle = self.read_obj_str.read_handle
if self.read_obj_str.persisted_size is not None:
self.persisted_size = self.read_obj_str.persisted_size
self.is_finalized = self.read_obj_str.is_finalized
self.full_obj_server_crc32c = self.read_obj_str.full_obj_server_crc32c

self._is_stream_open = True

Expand Down Expand Up @@ -363,6 +366,8 @@ async def factory():
self.generation = stream.generation_number
if stream.read_handle:
self.read_handle = stream.read_handle
self.is_finalized = stream.is_finalized
self.full_obj_server_crc32c = stream.full_obj_server_crc32c

self.read_obj_str = stream
self._is_stream_open = True
Expand All @@ -377,6 +382,7 @@ async def download_ranges(
lock: asyncio.Lock = None,
retry_policy: Optional[AsyncRetry] = None,
metadata: Optional[List[Tuple[str, str]]] = None,
enable_checksum: bool = True,
) -> None:
"""Downloads multiple byte ranges from the object into the buffers
provided by user with automatic retries.
Expand Down Expand Up @@ -412,6 +418,9 @@ async def download_ranges(
"Invalid input - length of read_ranges cannot be more than 1000"
)

if enable_checksum:
raise_if_no_fast_crc32c()

if not self._is_stream_open:
raise ValueError("Underlying bidi-gRPC stream is not open")

Expand All @@ -422,16 +431,30 @@ async def download_ranges(
download_states = {}
for read_range in read_ranges:
read_id = generate_random_56_bit_integer()
# Unpack tuple into self-documenting variable names to improve readability.
offset, length, user_buffer = read_range

# Heuristic to detect full object reads:
# - Implicit full object read: start offset is 0 and length is 0 (read all).
# - Explicit full object read: start offset is 0 and length matches the exact persisted size.
is_full_object_read = (offset == 0 and length == 0) or (
self.persisted_size is not None
and offset == 0
and length == self.persisted_size
)
download_states[read_id] = _DownloadState(
initial_offset=read_range[0],
initial_length=read_range[1],
user_buffer=read_range[2],
initial_offset=offset,
initial_length=length,
user_buffer=user_buffer,
is_full_object_read=is_full_object_read,
)

initial_state = {
"download_states": download_states,
"read_handle": self.read_handle,
"routing_token": None,
"enable_checksum": enable_checksum,
"full_obj_server_crc32c": self.full_obj_server_crc32c,
}

read_ids = set(download_states.keys())
Expand Down Expand Up @@ -519,12 +542,18 @@ async def generator():
strategy, send_and_recv_via_multiplexer
)

await retry_manager.execute(initial_state, retry_policy)
try:
await retry_manager.execute(initial_state, retry_policy)
except DataCorruption:
if self.is_stream_open:
await self.close()
raise

if initial_state.get("read_handle"):
self.read_handle = initial_state["read_handle"]
finally:
self._multiplexer.unregister(read_ids)
if self._multiplexer is not None:
self._multiplexer.unregister(read_ids)

async def close(self):
"""
Expand Down
86 changes: 86 additions & 0 deletions packages/google-cloud-storage/tests/system/test_zonal.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
ObjectCustomContextPayload,
)


pytestmark = pytest.mark.skipif(
os.getenv("RUN_ZONAL_SYSTEM_TESTS") != "True",
reason="Zonal system tests need to be explicitly enabled. This helps scheduling tests in Kokoro and Cloud Build.",
Expand Down Expand Up @@ -961,3 +962,88 @@ async def _run():
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))

event_loop.run_until_complete(_run())


@pytest.mark.parametrize(
"read_start, read_length, enable_checksum",
[
(0, 0, True),
(0, 1024 * 1024, True),
(0, 0, False),
],
)
def test_mrd_checksum_validation(
storage_client,
blobs_to_delete,
event_loop,
grpc_client_direct,
read_start,
read_length,
enable_checksum,
):
"""
Tests full downloads with specified offset, length, and enable_checksum toggle on finalized objects.
"""
object_size = 1024 * 1024 # 1MB
object_name = f"test_mrd_chksum-{uuid.uuid4()}"

async def _run():
object_data = os.urandom(object_size)

writer = AsyncAppendableObjectWriter(
grpc_client_direct, _ZONAL_BUCKET, object_name
)
await writer.open()
await writer.append(object_data)
await writer.close(finalize_on_close=True)

async with AsyncMultiRangeDownloader(
grpc_client_direct, _ZONAL_BUCKET, object_name
) as mrd:
buffer = BytesIO()
await mrd.download_ranges(
[(read_start, read_length, buffer)], enable_checksum=enable_checksum
)
assert buffer.getvalue() == object_data

# cleanup
del writer
gc.collect()
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))

event_loop.run_until_complete(_run())


def test_mrd_checksum_unfinalized_appendable_skipped(
storage_client, blobs_to_delete, event_loop, grpc_client_direct
):
"""
Verifies that live, unfinalized appendable objects skip the full-object checksum check
naturally without raising any exceptions.
"""
object_name = f"test_mrd_chksum_unfin-{uuid.uuid4()}"

async def _run():
writer = AsyncAppendableObjectWriter(
grpc_client_direct, _ZONAL_BUCKET, object_name
)
await writer.open()
await writer.append(_BYTES_TO_UPLOAD)
await writer.flush() # Flushed but not finalized!

# Download the unfinalized appendable object with enable_checksum=True
async with AsyncMultiRangeDownloader(
grpc_client_direct, _ZONAL_BUCKET, object_name
) as mrd:
buffer = BytesIO()
# Since it's unfinalized, it should skip the checksum check without raising
await mrd.download_ranges([(0, 0, buffer)], enable_checksum=True)
assert buffer.getvalue() == _BYTES_TO_UPLOAD

# cleanup
await writer.close()
del writer
gc.collect()
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))

event_loop.run_until_complete(_run())
Original file line number Diff line number Diff line change
Expand Up @@ -308,12 +308,16 @@ async def test_downloading_without_opening_should_throw_error(self):
assert not mrd.is_stream_open

@mock.patch("google.cloud.storage.asyncio._utils.google_crc32c")
def test_init_raises_if_crc32c_c_extension_is_missing(self, mock_google_crc32c):
@pytest.mark.asyncio
async def test_download_ranges_raises_if_crc32c_c_extension_is_missing(
self, mock_google_crc32c
):
mock_google_crc32c.implementation = "python"
mock_client = mock.MagicMock()
mrd = AsyncMultiRangeDownloader(mock_client, "bucket", "object")

with pytest.raises(exceptions.FailedPrecondition) as exc_info:
AsyncMultiRangeDownloader(mock_client, "bucket", "object")
await mrd.download_ranges([(0, 10, BytesIO())])

assert "The google-crc32c package is not installed with C support" in str(
exc_info.value
Expand Down Expand Up @@ -579,3 +583,127 @@ async def staged_recv():

# Assert
mock_logger.info.assert_any_call("Resuming download (attempt 2) for 1 ranges.")

@mock.patch(
"google.cloud.storage.asyncio.async_multi_range_downloader._AsyncReadObjectStream"
)
@pytest.mark.asyncio
async def test_open_populates_checksum_properties(
self, mock_cls_async_read_object_stream
):
# Arrange
mock_client = mock.MagicMock()
mock_client.grpc_client = mock.AsyncMock()
mock_stream = mock_cls_async_read_object_stream.return_value
mock_stream.open = AsyncMock()
mock_stream.generation_number = 123
mock_stream.persisted_size = 100
mock_stream.read_handle = b"h"
mock_stream.is_finalized = True
mock_stream.full_obj_server_crc32c = 999

mrd = AsyncMultiRangeDownloader(mock_client, "bucket", "object")
assert mrd.is_finalized is False
assert mrd.full_obj_server_crc32c is None

# Act
await mrd.open()

# Assert
assert mrd.is_finalized is True
assert mrd.full_obj_server_crc32c == 999

@mock.patch(
"google.cloud.storage.asyncio.async_multi_range_downloader._ReadResumptionStrategy"
)
@mock.patch(
"google.cloud.storage.asyncio.async_multi_range_downloader._BidiStreamRetryManager"
)
@mock.patch(
"google.cloud.storage.asyncio.async_multi_range_downloader._AsyncReadObjectStream"
)
@pytest.mark.asyncio
async def test_download_ranges_configures_full_object_read_state(
self,
mock_cls_async_read_object_stream,
mock_retry_manager_cls,
mock_strategy_cls,
):
# Arrange
mock_client = mock.MagicMock()
mock_client.grpc_client = mock.AsyncMock()
mock_stream = mock_cls_async_read_object_stream.return_value
mock_stream.open = AsyncMock()
mock_stream.persisted_size = 100
mock_stream.is_finalized = True
mock_stream.full_obj_server_crc32c = 999

mrd = await AsyncMultiRangeDownloader.create_mrd(mock_client, "b", "o")

mock_retry_manager = mock_retry_manager_cls.return_value
mock_retry_manager.execute = AsyncMock()

# Act
# Implicit full read (0, 0) and explicit full read (0, persisted_size=100)
ranges = [(0, 0, BytesIO()), (0, 100, BytesIO()), (10, 20, BytesIO())]
await mrd.download_ranges(ranges, enable_checksum=True)

# Assert
mock_retry_manager.execute.assert_called_once()
initial_state = mock_retry_manager.execute.call_args[0][0]

download_states = initial_state["download_states"]
assert len(download_states) == 3

states_list = list(download_states.values())
# First state: (0, 0) -> is_full_object_read is True
assert states_list[0].is_full_object_read is True
assert states_list[0].rolling_checksum is not None

# Second state: (0, 100) -> is_full_object_read is True
assert states_list[1].is_full_object_read is True
assert states_list[1].rolling_checksum is not None

# Third state: (10, 20) -> is_full_object_read is False
assert states_list[2].is_full_object_read is False
assert states_list[2].rolling_checksum is None

# State values for enable_checksum and crc32c
assert initial_state["enable_checksum"] is True
assert initial_state["full_obj_server_crc32c"] == 999

@mock.patch(
"google.cloud.storage.asyncio.async_multi_range_downloader._ReadResumptionStrategy"
)
@mock.patch(
"google.cloud.storage.asyncio.async_multi_range_downloader._BidiStreamRetryManager"
)
@mock.patch(
"google.cloud.storage.asyncio.async_multi_range_downloader._AsyncReadObjectStream"
)
@pytest.mark.asyncio
async def test_download_ranges_closes_on_datacorruption(
self,
mock_cls_async_read_object_stream,
mock_retry_manager_cls,
mock_strategy_cls,
):
# Arrange
mock_client = mock.MagicMock()
mock_client.grpc_client = mock.AsyncMock()
mock_stream = mock_cls_async_read_object_stream.return_value
mock_stream.open = AsyncMock()

mrd = await AsyncMultiRangeDownloader.create_mrd(mock_client, "b", "o")
mrd.close = AsyncMock()

mock_retry_manager = mock_retry_manager_cls.return_value
mock_retry_manager.execute = AsyncMock(
side_effect=DataCorruption(None, "corrupted")
)

# Act & Assert
with pytest.raises(DataCorruption):
await mrd.download_ranges([(0, 0, BytesIO())])

mrd.close.assert_called_once()
1 change: 1 addition & 0 deletions packages/google-cloud-storage/tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.

import asyncio

import pytest


Expand Down
Loading