diff --git a/dandischema/tests/test_utils.py b/dandischema/tests/test_utils.py index 71d9e7ad..6e8838f3 100644 --- a/dandischema/tests/test_utils.py +++ b/dandischema/tests/test_utils.py @@ -1,9 +1,10 @@ -from typing import Dict, List, Optional, Union +from typing import Any, Dict, List, Optional, Union import pytest from ..utils import ( _ensure_newline, + find_objs, name2title, sanitize_value, strip_top_level_optional, @@ -88,3 +89,151 @@ def test_sanitize_value() -> None: assert sanitize_value("A;B") == "A-B" assert sanitize_value("A\\/B") == "A--B" assert sanitize_value("A\"'B") == "A--B" + + +@pytest.mark.parametrize( + "instance, schema_key, expected", + [ + # Single matching object. + pytest.param( + {"schemaKey": "Test", "data": 123}, + "Test", + [{"schemaKey": "Test", "data": 123}], + id="single-match", + ), + # No match. + pytest.param( + {"schemaKey": "NotMatch", "data": 123}, + "Test", + [], + id="no-match", + ), + # Empty dictionary should return an empty list. + pytest.param( + {}, + "Test", + [], + id="empty-dict", + ), + # Empty list should return an empty list. + pytest.param( + [], + "Test", + [], + id="empty-list", + ), + # Nested dictionary: the matching object is nested within another dictionary. + pytest.param( + {"level1": {"schemaKey": "Test", "info": "nested"}}, + "Test", + [{"schemaKey": "Test", "info": "nested"}], + id="nested-dict", + ), + # List of dictionaries: only those with matching schema key are returned. + pytest.param( + [ + {"schemaKey": "Test", "data": 1}, + {"schemaKey": "Test", "data": 2}, + {"schemaKey": "NotTest", "data": 3}, + ], + "Test", + [ + {"schemaKey": "Test", "data": 1}, + {"schemaKey": "Test", "data": 2}, + ], + id="list-of-dicts", + ), + # Mixed structure: nested dictionaries and lists. + pytest.param( + { + "a": {"schemaKey": "Test", "value": 1}, + "b": [ + {"schemaKey": "NotTest", "value": 2}, + {"schemaKey": "Test", "value": 3}, + ], + "c": "irrelevant", + "d": [{"e": {"schemaKey": "Test", "value": 4}}], + }, + "Test", + [ + {"schemaKey": "Test", "value": 1}, + {"schemaKey": "Test", "value": 3}, + {"schemaKey": "Test", "value": 4}, + ], + id="mixed-structure", + ), + # Non-collection type: integer. + pytest.param( + 42, + "Test", + [], + id="non-collection-int", + ), + # Non-collection type: string. + pytest.param( + "some string", + "Test", + [], + id="non-collection-string", + ), + # Non-collection type: float. + pytest.param( + 3.14, + "Test", + [], + id="non-collection-float", + ), + # Non-collection type: None. + pytest.param( + None, + "Test", + [], + id="non-collection-None", + ), + # Nested child: an object with the schema key contains a nested child that also + # has the schema key. + pytest.param( + {"schemaKey": "Test", "child": {"schemaKey": "Test", "data": "child"}}, + "Test", + [ + {"schemaKey": "Test", "child": {"schemaKey": "Test", "data": "child"}}, + {"schemaKey": "Test", "data": "child"}, + ], + id="nested-child", + ), + # List in field: + # The object with the given schema key has a field whose value is a list + # containing objects, some of which also have the given schema key. + pytest.param( + { + "schemaKey": "Test", + "items": [ + {"schemaKey": "Test", "data": "item1"}, + {"schemaKey": "Other", "data": "item2"}, + {"schemaKey": "Test", "data": "item3"}, + ], + }, + "Test", + [ + # The outer object is returned first... + { + "schemaKey": "Test", + "items": [ + {"schemaKey": "Test", "data": "item1"}, + {"schemaKey": "Other", "data": "item2"}, + {"schemaKey": "Test", "data": "item3"}, + ], + }, + # ...followed by the matching objects within the list. + {"schemaKey": "Test", "data": "item1"}, + {"schemaKey": "Test", "data": "item3"}, + ], + id="list-in-field", + ), + ], +) +def test_find_objs_parametrized( + instance: Any, schema_key: str, expected: list[dict] +) -> None: + result = find_objs(instance, schema_key) + assert result == expected diff --git a/dandischema/utils.py b/dandischema/utils.py index f82fac44..5c2117e1 100644 --- a/dandischema/utils.py +++ b/dandischema/utils.py @@ -136,3 +136,30 @@ def sanitize_value(value: str, field: str = "non-extension", sub: str = "-") -> if field != "extension": value = value.replace(".", sub) return value + + +def find_objs(instance: Any, schema_key: str) -> list[dict]: + """ + Find JSON objects, represented as dictionaries, that possess a specified schema key + as the value of their `"schemaKey"` field, from a data instance + + :param instance: The data instance to fetch JSON objects from + :param schema_key: The schema key + :return: The list of JSON objects with the specified schema key in the data instance + """ + + def find_objs_(data: Any) -> None: + if isinstance(data, dict): + if "schemaKey" in data and data["schemaKey"] == schema_key: + objs.append(data) + for value in data.values(): + find_objs_(value) + elif isinstance(data, list): + for item in data: + find_objs_(item) + else: + return + + objs: list[dict] = [] + find_objs_(instance) + return objs