diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3cbc27f9..3188caa4 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -176,7 +176,7 @@ jobs: h5py lat_lon_parser netCDF4 - open-radar-data>=0.7.0 + open-radar-data>=0.8.0 packaging pandas pip diff --git a/ci/notebooktests.yml b/ci/notebooktests.yml index 0ee020be..77df211e 100644 --- a/ci/notebooktests.yml +++ b/ci/notebooktests.yml @@ -18,7 +18,7 @@ dependencies: - netCDF4 - notebook - numpy - - open-radar-data>=0.7.0 + - open-radar-data>=0.8.0 - pip - pyproj - pytest diff --git a/ci/unittests.yml b/ci/unittests.yml index 4f6807d8..f47c989e 100644 --- a/ci/unittests.yml +++ b/ci/unittests.yml @@ -14,7 +14,7 @@ dependencies: - lat_lon_parser - netCDF4 - numpy - - open-radar-data>=0.7.0 + - open-radar-data>=0.8.0 - pip - pyproj - pytest diff --git a/docs/history.md b/docs/history.md index c5567005..4972e8de 100644 --- a/docs/history.md +++ b/docs/history.md @@ -2,6 +2,7 @@ ## Development +* FIX: NEXRAD Level 2 reader drops MSG_31 records when an LDM Compressed Record carries MSG_2 (RDA Status) alongside the 120 radials — ``NEXRADRecordFile.init_record`` hard-coded a 120-message stride; per ICD ``2620010J`` §7.3.4 the LDM holds "120 radial messages (type 31) plus 0 or more RDA Status messages (type 2)". Replace the synthetic mod-120 stride with a byte-walk that detects end-of-LDM by buffer length, with a recnum-position cache for ``get_data``'s per-moment replay ({issue}`376`, {pull}`377`) by [@aladinor](https://github.com/aladinor) * FIX: ensure `to_cfradial2` correctly selects the default storage engine when none is provided, ({pull}`378`) by [@chfer](https://github.com/chfer) * MNT: Add ``cfradial1_sgp_file`` session fixture and refactor 8 tests in ``test_util.py``/``test_accessors.py`` to share it instead of inlining ``DATASETS.fetch("sample_sgp_data.nc")``. Fixture returns the filename so each test opens its own DataTree, avoiding cross-test mutation ({issue}`346`, {pull}`347`) by [@aladinor](https://github.com/aladinor) * FIX: IRIS reader rotates the first-loaded moment in each sweep by 1 ray — ``IrisRawFile._get_ray_record_offsets_and_data`` initialised ``j = -1`` so the first matching ray of the first-loaded moment was written to ``raw_data[-1]``; affects files without ``DB_XHDR`` (data-type bit 0) where ``DB_DBT`` becomes the rotated moment ({issue}`357`, {pull}`375`) by [@aladinor](https://github.com/aladinor) diff --git a/environment.yml b/environment.yml index b6fcda93..e328cae1 100644 --- a/environment.yml +++ b/environment.yml @@ -16,7 +16,7 @@ dependencies: - h5py - lat_lon_parser - netCDF4 - - open-radar-data>=0.7.0 + - open-radar-data>=0.8.0 - pyproj - pip: - xarray >= 2026.4.0 diff --git a/requirements_dev.txt b/requirements_dev.txt index 6ba6a7c5..34b99134 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -11,7 +11,7 @@ fsspec ruff nbconvert notebook -open_radar_data>=0.7.0 +open_radar_data>=0.8.0 boto3 cartopy s3fs diff --git a/tests/conftest.py b/tests/conftest.py index d76b510b..eb35b8c9 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -83,6 +83,17 @@ def nexradlevel2_file(): return DATASETS.fetch("KATX20130717_195021_V06") +@pytest.fixture(scope="session") +def nexradlevel2_ldm_stride_file(): + """KILX volume with an over-120-message LDM (#376 regression fixture). + + LDM 49 of this file contains 120 MSG_31 + 2 MSG_2 = 122 messages. + Pre-fix, xradar's mod-120 stride dropped the trailing 2 MSG_31s, + yielding sweep_10 = 358 (on-wire is 360). + """ + return DATASETS.fetch("KILX20230629_154426_V06") + + @pytest.fixture(scope="session") def nexrad_chunks_klot(tmp_path_factory): import tarfile diff --git a/tests/io/test_nexrad_level2.py b/tests/io/test_nexrad_level2.py index 563d4b25..936d9f47 100644 --- a/tests/io/test_nexrad_level2.py +++ b/tests/io/test_nexrad_level2.py @@ -8,7 +8,7 @@ import os import warnings from collections import OrderedDict -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock, PropertyMock, patch import numpy as np import pytest @@ -981,6 +981,100 @@ def test_bz2_compressed_buffer_path_real(nexradlevel2_bzfile): assert isinstance(fh._ldm[1], np.ndarray) assert fh._ldm[1].dtype == np.uint8 + # Cold entry at recnum=134 must transition cleanly into a sequential + # walk — pins the byte-walker against future regressions in the + # cross-LDM-stride logic (#376). + assert fh.init_next_record() + assert fh.record_number == 135 + + +def test_init_record_cold_compressed_non_134_raises(nexradlevel2_bzfile): + """Cold init_record into compressed data must start at recnum=134 (#376).""" + with open(nexradlevel2_bzfile, "rb") as f: + file_bytes = f.read() + with NEXRADLevel2File(file_bytes) as fh: + assert fh.is_compressed + with pytest.raises(ValueError, match="recnum=134"): + fh.init_record(200) + + +def test_init_record_non_sequential_uncached_raises(nexradlevel2_bzfile): + """init_record(N) past a sequential walk, with N uncached, must raise (#376).""" + with open(nexradlevel2_bzfile, "rb") as f: + file_bytes = f.read() + with NEXRADLevel2File(file_bytes) as fh: + fh.init_record(134) + fh.init_next_record() # populates cache for 135 + with pytest.raises(ValueError, match="non-sequential"): + fh.init_record(9999) # never visited, not in cache + + +def test_init_record_past_last_ldm_returns_false(nexradlevel2_bzfile): + """init_record past the last LDM returns False (EOF semantics) (#376).""" + with open(nexradlevel2_bzfile, "rb") as f: + file_bytes = f.read() + with NEXRADLevel2File(file_bytes) as fh: + # Walk to the very end so subsequent advances run out of LDMs. + rec = 134 + while fh.init_record(rec): + rec += 1 + # Next sequential advance must report EOF, not raise. + assert fh.init_record(rec) is False + + +def test_load_ldm_past_end_returns_false(nexradlevel2_bzfile): + """_load_ldm reports EOF when the requested LDM index is past the last (#376).""" + with open(nexradlevel2_bzfile, "rb") as f: + file_bytes = f.read() + with NEXRADLevel2File(file_bytes) as fh: + assert fh._load_ldm(len(fh.bz2_record_indices)) is False + + +def test_init_record_metadata_propagates_ldm_load_failure(nexradlevel2_bzfile): + """Branch A propagates _load_ldm failure when no LDM is available (#376).""" + with open(nexradlevel2_bzfile, "rb") as f: + file_bytes = f.read() + with NEXRADLevel2File(file_bytes) as fh: + # Empty the LDM index so even LDM 0 fails to load. + with patch.object( + type(fh), + "bz2_record_indices", + new_callable=PropertyMock, + return_value=np.array([], dtype=int), + ): + assert fh.init_record(0) is False + + +def test_first_compressed_call_with_metadata_only_returns_false(nexradlevel2_bzfile): + """First compressed init_record returns False when no data LDM exists (#376).""" + with open(nexradlevel2_bzfile, "rb") as f: + file_bytes = f.read() + with NEXRADLevel2File(file_bytes) as fh: + # Truncate the LDM list to just the metadata LDM (index 0). + only_meta = fh.bz2_record_indices[:1] + with patch.object( + type(fh), + "bz2_record_indices", + new_callable=PropertyMock, + return_value=only_meta, + ): + assert fh.init_record(134) is False + + +def test_ldm_stride_decodes_all_msg31(nexradlevel2_ldm_stride_file): + """KILX file regression for #376. + + LDM 49 has 122 messages (120 MSG_31 + 2 MSG_2). Pre-fix, xradar's + mod-120 stride dropped the trailing 2 MSG_31s; sweep_10 reported 358 + instead of the on-wire-correct 360. This pins the fix end-to-end. + """ + with NEXRADLevel2File(nexradlevel2_ldm_stride_file, loaddata=False) as nex: + per_sweep = [len(s) for s in nex.msg_31_header] + + # 13 sweeps total: 6 super-res (720 rays) + 7 standard (360 rays). + # Pre-fix, sweep_10 was 358 — the list-equality diff pinpoints the regression. + assert per_sweep == [720] * 6 + [360] * 7 + def test_nexradlevel2_missing_msg2_metadata(): """ diff --git a/xradar/io/backends/nexrad_level2.py b/xradar/io/backends/nexrad_level2.py index 7d27c7b6..bf2df679 100644 --- a/xradar/io/backends/nexrad_level2.py +++ b/xradar/io/backends/nexrad_level2.py @@ -340,6 +340,13 @@ def __init__(self, filename, **kwargs): self._rc = None self._ldm = dict() self._record_number = None + # Compressed-data byte-walker state (ICD 2620010J §7.3.4). + # An LDM block holds 120 MSG_31 + 0..N MSG_2 messages — variable count, + # not a fixed 120 stride. Track the active LDM and a recnum->position + # cache so sequential walks advance by message size and non-sequential + # jumps (e.g. per-moment replay in get_data) restore from cache. + self._current_ldm = None + self._recnum_pos_cache = {} @property def rh(self): @@ -387,60 +394,66 @@ def get_end(self, buf): size = size if size >= RECORD_BYTES else RECORD_BYTES return size - def init_record(self, recnum): - """Initialize record using given number.""" + def _load_ldm(self, ldm_idx): + """Decompress LDM `ldm_idx` into ``self._ldm`` if not already loaded. - # map record numbers to ldm compressed records - def get_ldm(recnum): - if recnum < 134: - return 0 - mod = ((recnum - 134) // 120) + 1 - return mod + Returns ``True`` on success, ``False`` if ``ldm_idx`` is past the last LDM. + """ + if ldm_idx >= len(self.bz2_record_indices): + return False + if self._ldm.get(ldm_idx) is not None: + return True + start = self.bz2_record_indices[ldm_idx] + size = int(self._fh[start : start + 4].view(dtype=">u4")[0]) + if self._fp is not None: + self._fp.seek(start + 4) + compressed = self._fp.read(size) + else: + compressed = self._fh[start + 4 : start + 4 + size].tobytes() + dec = bz2.BZ2Decompressor() + self._ldm[ldm_idx] = np.frombuffer(dec.decompress(compressed), dtype=np.uint8) + return True - if self.is_compressed: - ldm = get_ldm(recnum) - # get uncompressed ldm record - if self._ldm.get(ldm, None) is None: - # otherwise extract wanted ldm compressed record - if ldm >= len(self.bz2_record_indices): - return False - start = self.bz2_record_indices[ldm] - size = self._fh[start : start + 4].view(dtype=">u4")[0] - if self._fp is not None: - self._fp.seek(start + 4) - compressed = self._fp.read(size) - else: - compressed = self._fh[start + 4 : start + 4 + size].tobytes() - dec = bz2.BZ2Decompressor() - self._ldm[ldm] = np.frombuffer( - dec.decompress(compressed), dtype=np.uint8 - ) + def init_record(self, recnum): + """Initialize record using given number. - # retrieve wanted record and put into self.rh + Per ICD 2620010J §7.3.4, an LDM Compressed Record contains a variable + number of messages (120 MSG_31 + 0..N MSG_2 RDA Status). Cross-LDM + boundaries are detected by reaching the end of the decompressed buffer, + not by a fixed message-count stride. + """ + # Branch A: metadata records (always in LDM 0, fixed RECORD_BYTES stride) if recnum < 134: + if self.is_compressed and not self._load_ldm(0): + return False start = recnum * RECORD_BYTES - if not self.is_compressed: - # Only add volume header offset if header exists - if self.volume_header is not None: - start += 24 + if not self.is_compressed and self.volume_header is not None: + start += 24 stop = start + RECORD_BYTES + ldm = 0 + + # Branch B: uncompressed data records (sequential advance) + elif not self.is_compressed: + start = self.record_size + self.filepos + buf = self.fh[start + 12 : start + 12 + LEN_MSG_HEADER] + size = self.get_end(buf) + if not size: + return False + stop = start + size + + # Branch C: compressed data records (byte-walk with recnum-position cache) else: - if self.is_compressed: - # get index into current compressed ldm record - rnum = (recnum - 134) % 120 - start = self.record_size + self.filepos if rnum else 0 - buf = self._ldm[ldm][start + 12 : start + 12 + LEN_MSG_HEADER] - size = self.get_end(buf) - if not size: - return False - stop = start + size - else: - start = self.record_size + self.filepos - buf = self.fh[start + 12 : start + 12 + LEN_MSG_HEADER] - size = self.get_end(buf) - if not size: - return False - stop = start + size + ldm, start = self._resolve_compressed_data_position(recnum) + if ldm is None: + return False + buf = self._ldm[ldm][start + 12 : start + 12 + LEN_MSG_HEADER] + size = self.get_end(buf) + if not size: + return False + stop = start + size + self._current_ldm = ldm + self._recnum_pos_cache[recnum] = (ldm, start) + self.record_number = recnum self.record_size = stop - start if self.is_compressed: @@ -450,6 +463,48 @@ def get_ldm(recnum): self.filepos = start return self._check_record() + def _resolve_compressed_data_position(self, recnum): + """Return ``(ldm_idx, byte_offset)`` for compressed-data ``recnum >= 134``. + + Returns ``(None, None)`` past the last LDM. Raises if ``recnum`` is + non-sequential and not in the cache (current callers don't trigger this). + """ + # Cache hit: non-sequential jump (e.g. get_data per-moment replay). + # The cache is only populated after a successful _load_ldm, and the + # backend never evicts from self._ldm, so the LDM is already resident. + if recnum in self._recnum_pos_cache: + return self._recnum_pos_cache[recnum] + + # First compressed call: either cold entry at recnum=134 or sequential + # transition from metadata. Either way, start at LDM 1 byte 0. + if self._current_ldm is None: + if recnum != 134: + raise ValueError( + f"first compressed init_record must be recnum=134, got {recnum}" + ) + if not self._load_ldm(1): + return (None, None) + return (1, 0) + + # Otherwise must be a sequential advance from the prior record. + if recnum != self._record_number + 1: + raise ValueError( + f"non-sequential init_record({recnum}) into uncached recnum" + ) + + # Continue from the previous in-LDM byte position; advance to next + # LDM if the current buffer can no longer hold a message header. + ldm = self._current_ldm + start = self.filepos + self.record_size + while ldm < len(self.bz2_record_indices): + if not self._load_ldm(ldm): + return (None, None) + if start + 12 + LEN_MSG_HEADER <= len(self._ldm[ldm]): + return (ldm, start) + ldm += 1 + start = 0 + return (None, None) + def init_record_by_filepos(self, recnum, filepos): """Initialize record using given record number and position.""" start = filepos