Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ jobs:
h5py
lat_lon_parser
netCDF4
open-radar-data>=0.7.0
open-radar-data>=0.8.0
packaging
pandas
pip
Expand Down
2 changes: 1 addition & 1 deletion ci/notebooktests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ dependencies:
- netCDF4
- notebook
- numpy
- open-radar-data>=0.7.0
- open-radar-data>=0.8.0
- pip
- pyproj
- pytest
Expand Down
2 changes: 1 addition & 1 deletion ci/unittests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ dependencies:
- lat_lon_parser
- netCDF4
- numpy
- open-radar-data>=0.7.0
- open-radar-data>=0.8.0
- pip
- pyproj
- pytest
Expand Down
1 change: 1 addition & 0 deletions docs/history.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Development

* FIX: NEXRAD Level 2 reader drops MSG_31 records when an LDM Compressed Record carries MSG_2 (RDA Status) alongside the 120 radials — ``NEXRADRecordFile.init_record`` hard-coded a 120-message stride; per ICD ``2620010J`` §7.3.4 the LDM holds "120 radial messages (type 31) plus 0 or more RDA Status messages (type 2)". Replace the synthetic mod-120 stride with a byte-walk that detects end-of-LDM by buffer length, with a recnum-position cache for ``get_data``'s per-moment replay ({issue}`376`, {pull}`377`) by [@aladinor](https://github.com/aladinor)
* FIX: ensure `to_cfradial2` correctly selects the default storage engine when none is provided, ({pull}`378`) by [@chfer](https://github.com/chfer)
* MNT: Add ``cfradial1_sgp_file`` session fixture and refactor 8 tests in ``test_util.py``/``test_accessors.py`` to share it instead of inlining ``DATASETS.fetch("sample_sgp_data.nc")``. Fixture returns the filename so each test opens its own DataTree, avoiding cross-test mutation ({issue}`346`, {pull}`347`) by [@aladinor](https://github.com/aladinor)
* FIX: IRIS reader rotates the first-loaded moment in each sweep by 1 ray — ``IrisRawFile._get_ray_record_offsets_and_data`` initialised ``j = -1`` so the first matching ray of the first-loaded moment was written to ``raw_data[-1]``; affects files without ``DB_XHDR`` (data-type bit 0) where ``DB_DBT`` becomes the rotated moment ({issue}`357`, {pull}`375`) by [@aladinor](https://github.com/aladinor)
Expand Down
2 changes: 1 addition & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ dependencies:
- h5py
- lat_lon_parser
- netCDF4
- open-radar-data>=0.7.0
- open-radar-data>=0.8.0
- pyproj
- pip:
- xarray >= 2026.4.0
Expand Down
2 changes: 1 addition & 1 deletion requirements_dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ fsspec
ruff
nbconvert
notebook
open_radar_data>=0.7.0
open_radar_data>=0.8.0
boto3
cartopy
s3fs
Expand Down
11 changes: 11 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,17 @@ def nexradlevel2_file():
return DATASETS.fetch("KATX20130717_195021_V06")


@pytest.fixture(scope="session")
def nexradlevel2_ldm_stride_file():
"""KILX volume with an over-120-message LDM (#376 regression fixture).

LDM 49 of this file contains 120 MSG_31 + 2 MSG_2 = 122 messages.
Pre-fix, xradar's mod-120 stride dropped the trailing 2 MSG_31s,
yielding sweep_10 = 358 (on-wire is 360).
"""
return DATASETS.fetch("KILX20230629_154426_V06")


@pytest.fixture(scope="session")
def nexrad_chunks_klot(tmp_path_factory):
import tarfile
Expand Down
96 changes: 95 additions & 1 deletion tests/io/test_nexrad_level2.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import os
import warnings
from collections import OrderedDict
from unittest.mock import MagicMock, patch
from unittest.mock import MagicMock, PropertyMock, patch

import numpy as np
import pytest
Expand Down Expand Up @@ -981,6 +981,100 @@ def test_bz2_compressed_buffer_path_real(nexradlevel2_bzfile):
assert isinstance(fh._ldm[1], np.ndarray)
assert fh._ldm[1].dtype == np.uint8

# Cold entry at recnum=134 must transition cleanly into a sequential
# walk — pins the byte-walker against future regressions in the
# cross-LDM-stride logic (#376).
assert fh.init_next_record()
assert fh.record_number == 135


def test_init_record_cold_compressed_non_134_raises(nexradlevel2_bzfile):
"""Cold init_record into compressed data must start at recnum=134 (#376)."""
with open(nexradlevel2_bzfile, "rb") as f:
file_bytes = f.read()
with NEXRADLevel2File(file_bytes) as fh:
assert fh.is_compressed
with pytest.raises(ValueError, match="recnum=134"):
fh.init_record(200)


def test_init_record_non_sequential_uncached_raises(nexradlevel2_bzfile):
"""init_record(N) past a sequential walk, with N uncached, must raise (#376)."""
with open(nexradlevel2_bzfile, "rb") as f:
file_bytes = f.read()
with NEXRADLevel2File(file_bytes) as fh:
fh.init_record(134)
fh.init_next_record() # populates cache for 135
with pytest.raises(ValueError, match="non-sequential"):
fh.init_record(9999) # never visited, not in cache


def test_init_record_past_last_ldm_returns_false(nexradlevel2_bzfile):
"""init_record past the last LDM returns False (EOF semantics) (#376)."""
with open(nexradlevel2_bzfile, "rb") as f:
file_bytes = f.read()
with NEXRADLevel2File(file_bytes) as fh:
# Walk to the very end so subsequent advances run out of LDMs.
rec = 134
while fh.init_record(rec):
rec += 1
# Next sequential advance must report EOF, not raise.
assert fh.init_record(rec) is False


def test_load_ldm_past_end_returns_false(nexradlevel2_bzfile):
"""_load_ldm reports EOF when the requested LDM index is past the last (#376)."""
with open(nexradlevel2_bzfile, "rb") as f:
file_bytes = f.read()
with NEXRADLevel2File(file_bytes) as fh:
assert fh._load_ldm(len(fh.bz2_record_indices)) is False


def test_init_record_metadata_propagates_ldm_load_failure(nexradlevel2_bzfile):
"""Branch A propagates _load_ldm failure when no LDM is available (#376)."""
with open(nexradlevel2_bzfile, "rb") as f:
file_bytes = f.read()
with NEXRADLevel2File(file_bytes) as fh:
# Empty the LDM index so even LDM 0 fails to load.
with patch.object(
type(fh),
"bz2_record_indices",
new_callable=PropertyMock,
return_value=np.array([], dtype=int),
):
assert fh.init_record(0) is False


def test_first_compressed_call_with_metadata_only_returns_false(nexradlevel2_bzfile):
"""First compressed init_record returns False when no data LDM exists (#376)."""
with open(nexradlevel2_bzfile, "rb") as f:
file_bytes = f.read()
with NEXRADLevel2File(file_bytes) as fh:
# Truncate the LDM list to just the metadata LDM (index 0).
only_meta = fh.bz2_record_indices[:1]
with patch.object(
type(fh),
"bz2_record_indices",
new_callable=PropertyMock,
return_value=only_meta,
):
assert fh.init_record(134) is False


def test_ldm_stride_decodes_all_msg31(nexradlevel2_ldm_stride_file):
"""KILX file regression for #376.

LDM 49 has 122 messages (120 MSG_31 + 2 MSG_2). Pre-fix, xradar's
mod-120 stride dropped the trailing 2 MSG_31s; sweep_10 reported 358
instead of the on-wire-correct 360. This pins the fix end-to-end.
"""
with NEXRADLevel2File(nexradlevel2_ldm_stride_file, loaddata=False) as nex:
per_sweep = [len(s) for s in nex.msg_31_header]

# 13 sweeps total: 6 super-res (720 rays) + 7 standard (360 rays).
# Pre-fix, sweep_10 was 358 — the list-equality diff pinpoints the regression.
assert per_sweep == [720] * 6 + [360] * 7


def test_nexradlevel2_missing_msg2_metadata():
"""
Expand Down
149 changes: 102 additions & 47 deletions xradar/io/backends/nexrad_level2.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,13 @@ def __init__(self, filename, **kwargs):
self._rc = None
self._ldm = dict()
self._record_number = None
# Compressed-data byte-walker state (ICD 2620010J §7.3.4).
# An LDM block holds 120 MSG_31 + 0..N MSG_2 messages — variable count,
# not a fixed 120 stride. Track the active LDM and a recnum->position
# cache so sequential walks advance by message size and non-sequential
# jumps (e.g. per-moment replay in get_data) restore from cache.
self._current_ldm = None
self._recnum_pos_cache = {}

@property
def rh(self):
Expand Down Expand Up @@ -387,60 +394,66 @@ def get_end(self, buf):
size = size if size >= RECORD_BYTES else RECORD_BYTES
return size

def init_record(self, recnum):
"""Initialize record using given number."""
def _load_ldm(self, ldm_idx):
"""Decompress LDM `ldm_idx` into ``self._ldm`` if not already loaded.

# map record numbers to ldm compressed records
def get_ldm(recnum):
if recnum < 134:
return 0
mod = ((recnum - 134) // 120) + 1
return mod
Returns ``True`` on success, ``False`` if ``ldm_idx`` is past the last LDM.
"""
if ldm_idx >= len(self.bz2_record_indices):
return False
if self._ldm.get(ldm_idx) is not None:
return True
start = self.bz2_record_indices[ldm_idx]
size = int(self._fh[start : start + 4].view(dtype=">u4")[0])
if self._fp is not None:
self._fp.seek(start + 4)
compressed = self._fp.read(size)
else:
compressed = self._fh[start + 4 : start + 4 + size].tobytes()
dec = bz2.BZ2Decompressor()
self._ldm[ldm_idx] = np.frombuffer(dec.decompress(compressed), dtype=np.uint8)
return True

if self.is_compressed:
ldm = get_ldm(recnum)
# get uncompressed ldm record
if self._ldm.get(ldm, None) is None:
# otherwise extract wanted ldm compressed record
if ldm >= len(self.bz2_record_indices):
return False
start = self.bz2_record_indices[ldm]
size = self._fh[start : start + 4].view(dtype=">u4")[0]
if self._fp is not None:
self._fp.seek(start + 4)
compressed = self._fp.read(size)
else:
compressed = self._fh[start + 4 : start + 4 + size].tobytes()
dec = bz2.BZ2Decompressor()
self._ldm[ldm] = np.frombuffer(
dec.decompress(compressed), dtype=np.uint8
)
def init_record(self, recnum):
"""Initialize record using given number.

# retrieve wanted record and put into self.rh
Per ICD 2620010J §7.3.4, an LDM Compressed Record contains a variable
number of messages (120 MSG_31 + 0..N MSG_2 RDA Status). Cross-LDM
boundaries are detected by reaching the end of the decompressed buffer,
not by a fixed message-count stride.
"""
# Branch A: metadata records (always in LDM 0, fixed RECORD_BYTES stride)
if recnum < 134:
if self.is_compressed and not self._load_ldm(0):
return False
start = recnum * RECORD_BYTES
if not self.is_compressed:
# Only add volume header offset if header exists
if self.volume_header is not None:
start += 24
if not self.is_compressed and self.volume_header is not None:
start += 24
stop = start + RECORD_BYTES
ldm = 0

# Branch B: uncompressed data records (sequential advance)
elif not self.is_compressed:
start = self.record_size + self.filepos
buf = self.fh[start + 12 : start + 12 + LEN_MSG_HEADER]
size = self.get_end(buf)
if not size:
return False
stop = start + size

# Branch C: compressed data records (byte-walk with recnum-position cache)
else:
if self.is_compressed:
# get index into current compressed ldm record
rnum = (recnum - 134) % 120
start = self.record_size + self.filepos if rnum else 0
buf = self._ldm[ldm][start + 12 : start + 12 + LEN_MSG_HEADER]
size = self.get_end(buf)
if not size:
return False
stop = start + size
else:
start = self.record_size + self.filepos
buf = self.fh[start + 12 : start + 12 + LEN_MSG_HEADER]
size = self.get_end(buf)
if not size:
return False
stop = start + size
ldm, start = self._resolve_compressed_data_position(recnum)
if ldm is None:
return False
buf = self._ldm[ldm][start + 12 : start + 12 + LEN_MSG_HEADER]
size = self.get_end(buf)
if not size:
return False
stop = start + size
self._current_ldm = ldm
self._recnum_pos_cache[recnum] = (ldm, start)

self.record_number = recnum
self.record_size = stop - start
if self.is_compressed:
Expand All @@ -450,6 +463,48 @@ def get_ldm(recnum):
self.filepos = start
return self._check_record()

def _resolve_compressed_data_position(self, recnum):
"""Return ``(ldm_idx, byte_offset)`` for compressed-data ``recnum >= 134``.

Returns ``(None, None)`` past the last LDM. Raises if ``recnum`` is
non-sequential and not in the cache (current callers don't trigger this).
"""
# Cache hit: non-sequential jump (e.g. get_data per-moment replay).
# The cache is only populated after a successful _load_ldm, and the
# backend never evicts from self._ldm, so the LDM is already resident.
if recnum in self._recnum_pos_cache:
return self._recnum_pos_cache[recnum]

# First compressed call: either cold entry at recnum=134 or sequential
# transition from metadata. Either way, start at LDM 1 byte 0.
if self._current_ldm is None:
if recnum != 134:
raise ValueError(
f"first compressed init_record must be recnum=134, got {recnum}"
)
if not self._load_ldm(1):
return (None, None)
return (1, 0)

# Otherwise must be a sequential advance from the prior record.
if recnum != self._record_number + 1:
raise ValueError(
f"non-sequential init_record({recnum}) into uncached recnum"
)

# Continue from the previous in-LDM byte position; advance to next
# LDM if the current buffer can no longer hold a message header.
ldm = self._current_ldm
start = self.filepos + self.record_size
while ldm < len(self.bz2_record_indices):
if not self._load_ldm(ldm):
return (None, None)
if start + 12 + LEN_MSG_HEADER <= len(self._ldm[ldm]):
return (ldm, start)
ldm += 1
start = 0
return (None, None)

def init_record_by_filepos(self, recnum, filepos):
"""Initialize record using given record number and position."""
start = filepos
Expand Down
Loading