From fe6853ffa07602c865217e803bf3c612c90269df Mon Sep 17 00:00:00 2001 From: Vincenzo Eduardo Padulano Date: Fri, 3 Jul 2026 11:22:18 +0200 Subject: [PATCH 1/3] [df] Add standalone test of distributed FromSpec An issue occurs during deserialization of distributed tasks for an RDataFrame created via FromSpec. The issue is only visible if the application runs standalone. --- .../python/distrdf/backends/CMakeLists.txt | 5 ++ .../backends/test_standalone_fromspec.py | 57 +++++++++++++++++++ 2 files changed, 62 insertions(+) create mode 100644 roottest/python/distrdf/backends/test_standalone_fromspec.py diff --git a/roottest/python/distrdf/backends/CMakeLists.txt b/roottest/python/distrdf/backends/CMakeLists.txt index 48c0f112b4dc5..e9d56e8145651 100644 --- a/roottest/python/distrdf/backends/CMakeLists.txt +++ b/roottest/python/distrdf/backends/CMakeLists.txt @@ -54,3 +54,8 @@ ROOT_ADD_PYUNITTEST(test_all test_all.py GENERIC ENVIRONMENT "${DISTRDF_ENVIRONM set_tests_properties( pyunittests-roottest-python-distrdf-backends-all PROPERTIES RESOURCE_LOCK "${DISTRDF_RESOURCE_LOCKS}" PROCESSORS 4 TIMEOUT 1200) + +ROOT_ADD_PYUNITTEST(test_standalone_fromspec test_standalone_fromspec.py GENERIC ENVIRONMENT "${DISTRDF_ENVIRONMENT_VARS}") +set_tests_properties( + pyunittests-roottest-python-distrdf-backends-standalone-fromspec + PROPERTIES RESOURCE_LOCK "${DISTRDF_RESOURCE_LOCKS}" PROCESSORS 4) diff --git a/roottest/python/distrdf/backends/test_standalone_fromspec.py b/roottest/python/distrdf/backends/test_standalone_fromspec.py new file mode 100644 index 0000000000000..0374f0a3f6c75 --- /dev/null +++ b/roottest/python/distrdf/backends/test_standalone_fromspec.py @@ -0,0 +1,57 @@ +import shlex +import sys + +import pytest +import ROOT + + +class TestFromSpec: + # RDataFrame is reading files containing CMSSW classes in this test, which will trigger a warning from TClass about + # missing class dictionaries. The warning is only triggered once for the entire duration of the process. Within the + # same process, we may be running the same test multiple times (once per parametrized backend in use). So we can't + # use programmatic warning catching e.g. with pytest.warns because it would always fail on consecutive runs of this + # test. We filter out that warning specifically instead + @pytest.mark.filterwarnings("ignore:no dictionary for class") + def test_fromspec_different_trees(self, payload): + """ + Test usage of FromSpec function when each sample has different trees + """ + + connection, _ = payload + + jsonfile = "../data/ttree/spec_differenttrees.json" + + df = ROOT.RDF.Experimental.FromSpec(jsonfile, executor=connection) + df_checkfilt = df.FilterAvailable("nElectron").Filter("nElectron > 2") + df_new = df.DefinePerSample("lum", 'rdfsampleinfo_.GetD("lum")') + df_filtered = df_new.Filter("lum == 100.") + df_filtered_two = df_new.Filter("lum == 200.") + + df_local = ROOT.RDF.Experimental.FromSpec(jsonfile) + df_new_local = df_local.DefinePerSample("lum", 'rdfsampleinfo_.GetD("lum")') + df_filtered_local = df_new_local.Filter("lum == 100.") + df_filtered_two_local = df_new_local.Filter("lum == 200.") + + df_checkfilt_count = df_checkfilt.Count() + df_filtered_count = df_filtered.Count() + df_filtered_local_count = df_filtered_local.Count() + df_filtered_two_count = df_filtered_two.Count() + df_filtered_two_local_count = df_filtered_two_local.Count() + + assert df_checkfilt_count.GetValue() == 1683 + assert df_filtered_count.GetValue() == 11000 + assert df_filtered_count.GetValue() == df_filtered_local_count.GetValue() + + assert df_filtered_two_count.GetValue() == 13020 + assert df_filtered_two_local_count.GetValue() == df_filtered_two_local_count.GetValue() + + +if __name__ == "__main__": + # The call to sys.exit is needed otherwise CTest would just ignore the + # results returned by pytest, even in case of errors. + # We ignore ResourceWarning about unclosed socket because of https://issues.apache.org/jira/browse/SPARK-38659 which + # has been fixed by https://github.com/apache/spark/pull/53200 and https://github.com/apache/spark/pull/53203 which + # may not be available in all test runner configurations + sys.exit( + pytest.main(args=shlex.split(f'{__file__} -x -vvv -Werror -Wignore:"unclosed Date: Fri, 3 Jul 2026 11:25:32 +0200 Subject: [PATCH 2/3] [df] Improve RSample I/O for distributed RDataFrame usage Since https://github.com/root-project/root/commit/74fbf73ba59d1e454990c38ffb4a38c4708d2eda, initialization of cppyy is defered to a later point then before, when it used to be initialized at `import ROOT` time. This caused in a specific scenario of distributed RDataFrame usage a particular mix of behaviours that ended in a crash. In case of running a distributed RDataFrame application using `FromSpec`, the distributed tasks need to serialize the information relative to the various samples. This is done in the Python class SerializableRSample in Ranges.py. At serialization time, i.e. in __setstate__, the class was storing a payload with a few objects among which collections of strings (for names of files and datasets) with type std::vector. During the deserialization of a distributed task, the following would happen: 1. The ROOT module is deserialized, i.e. the __reduce__ method of the ROOT facade is called. 2. The __getstate__ method of the SerializableRSample class is called. 3. Inside of it, the members of the payload are accessed, thus its types must be deserialized, including std::vector. 4. cppyy functions are invoked in order to gather information about those types. 5. cppyy has not been initialized yet at this point, hence the crash. This commit simplifies the implementation of SerializableRSample so that it does not require cppyy anymore for I/O. --- bindings/distrdf/python/DistRDF/HeadNode.py | 35 +++++++++++---------- bindings/distrdf/python/DistRDF/Ranges.py | 25 ++++++++------- 2 files changed, 32 insertions(+), 28 deletions(-) diff --git a/bindings/distrdf/python/DistRDF/HeadNode.py b/bindings/distrdf/python/DistRDF/HeadNode.py index d0b3b677cb99a..6ed68cbc46b4a 100644 --- a/bindings/distrdf/python/DistRDF/HeadNode.py +++ b/bindings/distrdf/python/DistRDF/HeadNode.py @@ -766,33 +766,36 @@ def build_rdf_from_range(current_range: Ranges.TreeRangePerc) -> TaskObjects: fileids_cluster.append(fileid) treesInFileMap.update({fileid : treename}) - # Making sure we don't double count samples - unique_samples = [] + # Making sure we don't double count samples + unique_samples = [] samplenames = [] - + for sample in clustered_range.samples: - mysample = sample._sample - samplename = mysample.GetSampleName() + samplename = sample.name if samplename not in samplenames: - unique_samples.append(mysample) + unique_samples.append(sample) samplenames.append(samplename) - + # Core: matching files with samples and adding samples to the spec that will be processed - for mysample in unique_samples: + for sample in unique_samples: sample_fileids = [ filenameglob + "/" + treename - for filenameglob, treename in zip(mysample.GetFileNameGlobs(), mysample.GetTreeNames()) + for filenameglob, treename in zip(sample.filenames, sample.treenames) ] - - good_files = list((Counter(filenames_cluster) & Counter(mysample.GetFileNameGlobs())).elements()) - good_fileids = list((Counter(fileids_cluster) & Counter(sample_fileids)).elements()) - good_trees =[ + good_files = list((Counter(filenames_cluster) + & Counter(sample.filenames)).elements()) + good_fileids = list( + (Counter(fileids_cluster) & Counter(sample_fileids)).elements()) + + good_trees = [ treesInFileMap.get(fileid) for fileid in good_fileids ] - - ds.AddSample((mysample.GetSampleName(), good_trees, good_files, mysample.GetMetaData())) - + + rmetadata = ROOT.RDF.Experimental.RMetaData() + ROOT.Internal.RDF.ImportJSON(rmetadata, sample.metadata) + ds.AddSample((sample.name, good_trees, good_files, rmetadata)) + ds.WithGlobalRange( (clustered_range.globalstart, clustered_range.globalend)) diff --git a/bindings/distrdf/python/DistRDF/Ranges.py b/bindings/distrdf/python/DistRDF/Ranges.py index 5fac94a702f25..98dd936c57dd0 100644 --- a/bindings/distrdf/python/DistRDF/Ranges.py +++ b/bindings/distrdf/python/DistRDF/Ranges.py @@ -14,20 +14,21 @@ logger = logging.getLogger(__name__) -class SerializableRSample(): - + +class SerializableRSample: + """ + Stores the information relative to an RSample, removing knowledge of C++ types such that cppyy is not involved + during serialization/deserialization. + """ def __init__(self, sample: ROOT.RDF.Experimental.RSample): - self._sample = sample - - def __getstate__(self): - return {"samplenames" : self._sample.GetSampleName(), "treenames" : self._sample.GetTreeNames() , "filenames" : self._sample.GetFileNameGlobs(), "metadata" : ROOT.Internal.RDF.ExportJSON(self._sample.GetMetaData())} + self.name: str = sample.GetSampleName() + self.treenames: list[str] = [str(treename) + for treename in sample.GetTreeNames()] + self.filenames: list[str] = [str(filename) + for filename in sample.GetFileNameGlobs()] + self.metadata: str = ROOT.Internal.RDF.ExportJSON(sample.GetMetaData()) + - - def __setstate__(self, state): - _metadata = ROOT.RDF.Experimental.RMetaData() - ROOT.Internal.RDF.ImportJSON(_metadata, state["metadata"]) - self._sample = ROOT.RDF.Experimental.RSample(state["samplenames"], state["treenames"], state["filenames"], _metadata) - @dataclass class DataRange: """ From 6f003db205b46b7cac09e26fe78ccb70e7222d89 Mon Sep 17 00:00:00 2001 From: Vincenzo Eduardo Padulano Date: Fri, 3 Jul 2026 11:39:54 +0200 Subject: [PATCH 3/3] [df] Ruff fixes on changed files --- bindings/distrdf/python/DistRDF/HeadNode.py | 265 +++++++++++--------- bindings/distrdf/python/DistRDF/Ranges.py | 144 +++++++---- 2 files changed, 247 insertions(+), 162 deletions(-) diff --git a/bindings/distrdf/python/DistRDF/HeadNode.py b/bindings/distrdf/python/DistRDF/HeadNode.py index 6ed68cbc46b4a..9e408f0a49c3b 100644 --- a/bindings/distrdf/python/DistRDF/HeadNode.py +++ b/bindings/distrdf/python/DistRDF/HeadNode.py @@ -52,6 +52,7 @@ class TaskObjects: for a tree opened in the task and the value is the number of entries in that tree. This attribute is not None only in a TTree-based run. """ + rdf: Optional[ROOT.RDF.RNode] entries_in_trees: Optional[Ranges.TaskTreeEntries] @@ -106,7 +107,7 @@ def __init__(self, backend: BaseBackend, npartitions: Optional[int], localdf: RO self.rdf_node = localdf # A dictionary where the keys are the IDs of the objects to live visualize - # and the values are the corresponding callback functions + # and the values are the corresponding callback functions # This attribute only gets set in case the LiveVisualize() function is called self.drawables_dict: Optional[Dict[int, List[Optional[Callable]]]] = None @@ -192,7 +193,7 @@ def _execute_and_retrieve_results(self, mapper, local_nodes) -> TaskResult: # 2. Index of the node in the local_nodes list i, # 3. Name of the operation associated with the node - node.operation.name + node.operation.name, ) for i, node in enumerate(local_nodes) # Filter: Only include nodes requested by the user @@ -232,17 +233,20 @@ def execute_graph(self) -> None: self.exec_id = _graph_cache.ExecutionIdentifier(self.rdf_uuid, uuid.uuid4()) - - computation_graph_callable = partial(ComputationGraphGenerator.trigger_computation_graph, self._generate_graph_dict()) + computation_graph_callable = partial( + ComputationGraphGenerator.trigger_computation_graph, self._generate_graph_dict() + ) # Accumulate all code that needs to be declared in one string code_to_declare = "\n".join(self.backend.strings_to_declare.values()) - mapper = partial(distrdf_mapper, - build_rdf_from_range=self._generate_rdf_creator(), - computation_graph_callable=computation_graph_callable, - initialization_fn=self.backend.initialization, - code_to_declare=code_to_declare) + mapper = partial( + distrdf_mapper, + build_rdf_from_range=self._generate_rdf_creator(), + computation_graph_callable=computation_graph_callable, + initialization_fn=self.backend.initialization, + code_to_declare=code_to_declare, + ) # List of action nodes in the same order as values local_nodes = self._get_action_nodes() @@ -258,7 +262,7 @@ def execute_graph(self) -> None: # Perform any extra checks that may be needed according to the # type of the head node final_values = self._handle_returned_values(returned_values) - + # Set the value of every action node for node, value in zip(local_nodes, final_values): Utils.set_value_on_node(value, node, self.backend) @@ -275,8 +279,12 @@ def get_headnode(backend: BaseBackend, npartitions: int, *args) -> HeadNode: try: localdf = ROOT.RDataFrame(*args) except TypeError: - raise TypeError(("The arguments provided are not accepted by any RDataFrame constructor. " - "See the RDataFrame documentation for the accepted constructor argument types.")) + raise TypeError( + ( + "The arguments provided are not accepted by any RDataFrame constructor. " + "See the RDataFrame documentation for the accepted constructor argument types." + ) + ) if isinstance(args[0], ROOT.RDF.Experimental.RDatasetSpec): return RDatasetSpecHeadNode(backend, npartitions, localdf, *args) @@ -290,10 +298,13 @@ def get_headnode(backend: BaseBackend, npartitions: int, *args) -> HeadNode: return EmptySourceHeadNode(backend, npartitions, localdf, args[0]) else: raise RuntimeError( - (f"First argument {args[0]} of type {type(args[0])} is not " - "recognised as a supported argument for distributed RDataFrame. " - "Currently supported data sources are: TTree, RNTuple or an empty " - "data source.")) + ( + f"First argument {args[0]} of type {type(args[0])} is not " + "recognised as a supported argument for distributed RDataFrame. " + "Currently supported data sources are: TTree, RNTuple or an empty " + "data source." + ) + ) class EmptySourceHeadNode(HeadNode): @@ -330,15 +341,17 @@ def _build_ranges(self) -> List[Ranges.DataRange]: # Empty datasets cannot be processed distributedly if not self.nentries: raise RuntimeError( - ("Cannot build a distributed RDataFrame with zero entries. " - "Distributed computation will fail. ")) + ("Cannot build a distributed RDataFrame with zero entries. Distributed computation will fail. ") + ) # TODO: This shouldn't be triggered if entries == 1. The current minimum # amount of partitions is 2. We need a robust reducer that smartly # becomes no-op if npartitions == 1 to avoid this. if self.npartitions > self.nentries: # Restrict 'npartitions' if it's greater than 'nentries' - msg = ("Number of partitions {0} is greater than number of entries {1} " - "in the dataframe. Using {1} partition(s)".format(self.npartitions, self.nentries)) + msg = ( + "Number of partitions {0} is greater than number of entries {1} " + "in the dataframe. Using {1} partition(s)".format(self.npartitions, self.nentries) + ) warnings.warn(msg, UserWarning, stacklevel=2) self.npartitions = self.nentries return Ranges.get_balanced_ranges(self.nentries, self.npartitions, self.exec_id) @@ -361,7 +374,8 @@ def build_rdf_from_range(current_range): ROOT.Internal.RDF.ChangeEmptyEntryRange( ROOT.RDF.AsRNode(_graph_cache._RDF_REGISTER[current_range.exec_id]), - (current_range.start, current_range.end)) + (current_range.start, current_range.end), + ) return TaskObjects(_graph_cache._RDF_REGISTER[current_range.exec_id], None) @@ -430,12 +444,10 @@ def __init__(self, backend: BaseBackend, npartitions: Optional[int], localdf: RO # RDataFrame(tree, defaultBranches = {}) tree = args[0] # TTreeIndex is not supported in distributed RDataFrame - idx_found, friendname = ROOT.Internal.TreeUtils.TreeUsesIndexedFriends( - tree) + idx_found, friendname = ROOT.Internal.TreeUtils.TreeUsesIndexedFriends(tree) if idx_found: raise ValueError( - f"Friend tree '{friendname}' has a TTreeIndex. " - "This is not supported in distributed mode." + f"Friend tree '{friendname}' has a TTreeIndex. This is not supported in distributed mode." ) # Retrieve information about friend trees when user passes a TTree # or TChain object. @@ -473,10 +485,12 @@ def __init__(self, backend: BaseBackend, npartitions: Optional[int], localdf: RO def _build_ranges(self) -> List[Ranges.DataRange]: """Build the ranges for this dataset.""" - logger.debug("Building ranges from dataset info:\n" - "main treename: %s\n" - "names of subtrees: %s\n" - "input files: %s\n", self.maintreename, self.subtreenames, self.inputfiles) + logger.debug( + "Building ranges from dataset info:\nmain treename: %s\nnames of subtrees: %s\ninput files: %s\n", + self.maintreename, + self.subtreenames, + self.inputfiles, + ) if logger.isEnabledFor(logging.DEBUG): # Compute clusters and entries of the first tree in the dataset. @@ -493,9 +507,12 @@ def _build_ranges(self) -> List[Ranges.DataRange]: logger.debug( "The number of requested partitions could be higher than the maximum amount of " "chunks the dataset can be split in. Some tasks could be doing no work. Consider " - "setting the 'npartitions' parameter of the RDataFrame constructor to a lower value.") + "setting the 'npartitions' parameter of the RDataFrame constructor to a lower value." + ) - return Ranges.get_percentage_ranges(self.subtreenames, self.inputfiles, self.npartitions, self.friendinfo, self.exec_id, None) + return Ranges.get_percentage_ranges( + self.subtreenames, self.inputfiles, self.npartitions, self.friendinfo, self.exec_id, None + ) def _generate_rdf_creator(self) -> Callable[[Ranges.DataRange], TaskObjects]: """ @@ -504,8 +521,9 @@ def _generate_rdf_creator(self) -> Callable[[Ranges.DataRange], TaskObjects]: the TTree data source. """ - def attach_friend_info_if_present(current_range: Ranges.TreeRange, - ds: ROOT.RDF.Experimental.RDatasetSpec) -> None: + def attach_friend_info_if_present( + current_range: Ranges.TreeRange, ds: ROOT.RDF.Experimental.RDatasetSpec + ) -> None: """ Adds info about friend trees to the input chain. Also aligns the starting and ending entry of the friend chain cache to those of the @@ -513,7 +531,7 @@ def attach_friend_info_if_present(current_range: Ranges.TreeRange, """ # Gather information about friend trees. Check that we got an # RFriendInfo struct and that it's not empty - if (current_range.friendinfo is not None): + if current_range.friendinfo is not None: # If the friend is a TChain, the zipped information looks like: # (name, alias), (file1.root, file2.root, ...), (subname1, subname2, ...) # If the friend is a TTree, the file list is made of @@ -523,10 +541,12 @@ def attach_friend_info_if_present(current_range: Ranges.TreeRange, zipped_friendinfo = zip( current_range.friendinfo.fFriendNames, current_range.friendinfo.fFriendFileNames, - current_range.friendinfo.fFriendChainSubNames + current_range.friendinfo.fFriendChainSubNames, ) for (friend_name, friend_alias), friend_filenames, friend_chainsubnames in zipped_friendinfo: - friend_chainsubnames = friend_chainsubnames if len(friend_chainsubnames) > 0 else [friend_name]*len(friend_filenames) + friend_chainsubnames = ( + friend_chainsubnames if len(friend_chainsubnames) > 0 else [friend_name] * len(friend_filenames) + ) ds.WithGlobalFriends(friend_chainsubnames, friend_filenames, friend_alias) def build_rdf_from_range(current_range: Ranges.TreeRangePerc) -> TaskObjects: @@ -555,8 +575,8 @@ def build_rdf_from_range(current_range: Ranges.TreeRangePerc) -> TaskObjects: else: # Update it to the range of entries for this task ROOT.Internal.RDF.ChangeSpec( - ROOT.RDF.AsRNode(_graph_cache._RDF_REGISTER[current_range.exec_id]), - ROOT.std.move(ds)) + ROOT.RDF.AsRNode(_graph_cache._RDF_REGISTER[current_range.exec_id]), ROOT.std.move(ds) + ) return TaskObjects(_graph_cache._RDF_REGISTER[current_range.exec_id], entries_in_trees) @@ -569,8 +589,10 @@ def _handle_returned_values(self, values: TaskResult) -> Iterable: the dataset were processed during distributed execution. """ if values.mergeables is None: - raise RuntimeError("The distributed execution returned no values. " - "This can happen if all files in your dataset contain empty trees.") + raise RuntimeError( + "The distributed execution returned no values. " + "This can happen if all files in your dataset contain empty trees." + ) # User could have requested to read the same file multiple times indeed input_files_and_trees = [ @@ -581,10 +603,12 @@ def _handle_returned_values(self, values: TaskResult) -> Iterable: entries_in_trees = values.entries_in_trees # Keys should be exactly the same if files_counts.keys() != entries_in_trees.trees_with_entries.keys(): - raise RuntimeError("The specified input files and the files that were " - "actually processed are not the same:\n" - f"Input files: {list(files_counts.keys())}\n" - f"Processed files: {list(entries_in_trees.trees_with_entries.keys())}") + raise RuntimeError( + "The specified input files and the files that were " + "actually processed are not the same:\n" + f"Input files: {list(files_counts.keys())}\n" + f"Processed files: {list(entries_in_trees.trees_with_entries.keys())}" + ) # Multiply the entries of each tree by the number of times it was # requested by the user @@ -593,11 +617,14 @@ def _handle_returned_values(self, values: TaskResult) -> Iterable: total_dataset_entries = sum(entries_in_trees.trees_with_entries.values()) if entries_in_trees.processed_entries != total_dataset_entries: - raise RuntimeError(f"The dataset has {total_dataset_entries} entries, " - f"but {entries_in_trees.processed_entries} were processed.") + raise RuntimeError( + f"The dataset has {total_dataset_entries} entries, " + f"but {entries_in_trees.processed_entries} were processed." + ) return values.mergeables - + + class RDatasetSpecHeadNode(HeadNode): """ The head node of a computation graph where the RDataFrame data source is @@ -613,11 +640,11 @@ class RDatasetSpecHeadNode(HeadNode): used to construct the RDataFrame subtreenames (list[str]): List of tree names in the dataset. - + inputfiles (list[str]): List of file names where the dataset is stored. friendinfo (ROOT.Internal.TreeUtils.RFriendInfo, None): Optional - information about friend trees of the dataset. Retrieved from + information about friend trees of the dataset. Retrieved from RDatasetSpec.GetFriendInfo(). Defaults to None. """ @@ -634,11 +661,10 @@ def __init__(self, backend: BaseBackend, npartitions: Optional[int], localdf: RO super().__init__(backend, npartitions, localdf) # Information about friend trees, if they are present. - self.sampleMap: Dict[str, Ranges.SerializableRSample] = None - + self.sampleMap: Dict[str, Ranges.SerializableRSample] = None + self.friendinfo: Optional[ROOT.Internal.TreeUtils.RFriendInfo] = None - - + # Retrieve the RDatasetSpec that will be processed if isinstance(args[0], ROOT.RDF.Experimental.RDatasetSpec): # RDataFrame(rdatasetspec) @@ -647,16 +673,18 @@ def __init__(self, backend: BaseBackend, npartitions: Optional[int], localdf: RO else: raise RuntimeError( - f"First argument {args[0]} of type {type(args[0])} is not supported " - "in RDatasetSpecHeaNode") - - if (args[0].GetEntryRangeBegin() != 0): - warnings.warn("Setting global range is not supported in distributed parsing of RDatasetSpec, the set range will be ignored and all events will be processed.") - + f"First argument {args[0]} of type {type(args[0])} is not supported in RDatasetSpecHeaNode" + ) + + if args[0].GetEntryRangeBegin() != 0: + warnings.warn( + "Setting global range is not supported in distributed parsing of RDatasetSpec, the set range will be ignored and all events will be processed." + ) + # subtreenames: names of all subtrees in the chain or full path to the tree in the file it belongs to self.subtreenames = [str(treename) for treename in args[0].GetTreeNames()] self.inputfiles = [str(filename) for filename in args[0].GetFileNameGlobs()] - + # A map to map file id (filename/tree) to SerializableRSamples self.sampleMap = {} samples = ROOT.Internal.RDF.MoveOutSamples(args[0]) @@ -664,15 +692,17 @@ def __init__(self, backend: BaseBackend, npartitions: Optional[int], localdf: RO for sample in samples: trees = sample.GetTreeNames() files = sample.GetFileNameGlobs() - for file, tree in zip(files, trees): + for file, tree in zip(files, trees): sampleId = file + "/" + tree - self.sampleMap.update({sampleId : Ranges.SerializableRSample(sample)}) + self.sampleMap.update({sampleId: Ranges.SerializableRSample(sample)}) def _build_ranges(self) -> List[Ranges.DataRange]: """Build the ranges for this dataset.""" - logger.debug("Building ranges from dataset info:\n" - "names of subtrees: %s\n" - "input files: %s\n", self.subtreenames, self.inputfiles) + logger.debug( + "Building ranges from dataset info:\nnames of subtrees: %s\ninput files: %s\n", + self.subtreenames, + self.inputfiles, + ) if logger.isEnabledFor(logging.DEBUG): # Compute clusters and entries of the first tree in the dataset. @@ -681,8 +711,7 @@ def _build_ranges(self) -> List[Ranges.DataRange]: # Depending on the cluster setup, this may still be quite costly, so # we decide to pay the price only if the user explicitly requested # warning logging. - clusters, entries = ROOT.Internal.TreeUtils.GetClustersAndEntries( - self.subtreenames[0], self.inputfiles[0]) + clusters, entries = ROOT.Internal.TreeUtils.GetClustersAndEntries(self.subtreenames[0], self.inputfiles[0]) # The file could contain an empty tree. In that case, the estimate will not be computed. if entries > 0: partitionsperfile = self.npartitions / len(self.inputfiles) @@ -690,9 +719,12 @@ def _build_ranges(self) -> List[Ranges.DataRange]: logger.debug( "The number of requested partitions could be higher than the maximum amount of " "chunks the dataset can be split in. Some tasks could be doing no work. Consider " - "setting the 'npartitions' parameter of the RDataFrame constructor to a lower value.") - - return Ranges.get_percentage_ranges(self.subtreenames, self.inputfiles, self.npartitions, self.friendinfo, self.exec_id, self.sampleMap) + "setting the 'npartitions' parameter of the RDataFrame constructor to a lower value." + ) + + return Ranges.get_percentage_ranges( + self.subtreenames, self.inputfiles, self.npartitions, self.friendinfo, self.exec_id, self.sampleMap + ) def _generate_rdf_creator(self) -> Callable[[Ranges.DataRange], TaskObjects]: """ @@ -700,8 +732,9 @@ def _generate_rdf_creator(self) -> Callable[[Ranges.DataRange], TaskObjects]: RDataFrame on a distributed mapper for a given entry range. """ - def attach_friend_info_if_present(current_range: Ranges.TreeRange, - ds: ROOT.RDF.Experimental.RDatasetSpec) -> None: + def attach_friend_info_if_present( + current_range: Ranges.TreeRange, ds: ROOT.RDF.Experimental.RDatasetSpec + ) -> None: """ Adds info about friend trees to the input chain. Also aligns the starting and ending entry of the friend chain cache to those of the @@ -709,7 +742,7 @@ def attach_friend_info_if_present(current_range: Ranges.TreeRange, """ # Gather information about friend trees. Check that we got an # RFriendInfo struct and that it's not empty - if (current_range.friendinfo is not None): + if current_range.friendinfo is not None: # If the friend is a TChain, the zipped information looks like: # (name, alias), (file1.root, file2.root, ...), (subname1, subname2, ...) # If the friend is a TTree, the file list is made of @@ -719,16 +752,14 @@ def attach_friend_info_if_present(current_range: Ranges.TreeRange, zipped_friendinfo = zip( current_range.friendinfo.fFriendNames, current_range.friendinfo.fFriendFileNames, - current_range.friendinfo.fFriendChainSubNames + current_range.friendinfo.fFriendChainSubNames, ) for (friend_name, friend_alias), friend_filenames, friend_chainsubnames in zipped_friendinfo: friend_chainsubnames = ( - friend_chainsubnames if len(friend_chainsubnames) > 0 - else [friend_name]*len(friend_filenames) + friend_chainsubnames if len(friend_chainsubnames) > 0 else [friend_name] * len(friend_filenames) ) - ds.WithGlobalFriends( - friend_chainsubnames, friend_filenames, friend_alias) - + ds.WithGlobalFriends(friend_chainsubnames, friend_filenames, friend_alias) + def build_rdf_from_range(current_range: Ranges.TreeRangePerc) -> TaskObjects: """ Builds an RDataFrame instance for a distributed mapper. @@ -737,14 +768,13 @@ def build_rdf_from_range(current_range: Ranges.TreeRangePerc) -> TaskObjects: input range object. If the chain cannot be built, returns None. """ - clustered_range, entries_in_trees = Ranges.get_clustered_range_from_percs( - current_range) + clustered_range, entries_in_trees = Ranges.get_clustered_range_from_percs(current_range) if clustered_range is None: return TaskObjects(None, entries_in_trees) - + ds = ROOT.RDF.Experimental.RDatasetSpec() - + filenames_cluster = clustered_range.filenames treenames_cluster = clustered_range.treenames @@ -757,14 +787,14 @@ def build_rdf_from_range(current_range: Ranges.TreeRangePerc) -> TaskObjects: # files processed in a given task range (filenames_cluster) and the list of files belonging to the given sample # (mysample.GetFilenameGlobs). Additionally, we need a dictionary to correctly match files with their trees. - # Matching files with trees + # Matching files with trees treesInFileMap = {} fileids_cluster = [] - + for filename, treename in zip(filenames_cluster, treenames_cluster): fileid = filename + "/" + treename fileids_cluster.append(fileid) - treesInFileMap.update({fileid : treename}) + treesInFileMap.update({fileid: treename}) # Making sure we don't double count samples unique_samples = [] @@ -779,37 +809,31 @@ def build_rdf_from_range(current_range: Ranges.TreeRangePerc) -> TaskObjects: # Core: matching files with samples and adding samples to the spec that will be processed for sample in unique_samples: sample_fileids = [ - filenameglob + "/" + treename - for filenameglob, treename in zip(sample.filenames, sample.treenames) + filenameglob + "/" + treename for filenameglob, treename in zip(sample.filenames, sample.treenames) ] - good_files = list((Counter(filenames_cluster) - & Counter(sample.filenames)).elements()) - good_fileids = list( - (Counter(fileids_cluster) & Counter(sample_fileids)).elements()) + good_files = list((Counter(filenames_cluster) & Counter(sample.filenames)).elements()) + good_fileids = list((Counter(fileids_cluster) & Counter(sample_fileids)).elements()) - good_trees = [ - treesInFileMap.get(fileid) for fileid in good_fileids - ] + good_trees = [treesInFileMap.get(fileid) for fileid in good_fileids] rmetadata = ROOT.RDF.Experimental.RMetaData() ROOT.Internal.RDF.ImportJSON(rmetadata, sample.metadata) ds.AddSample((sample.name, good_trees, good_files, rmetadata)) - ds.WithGlobalRange( - (clustered_range.globalstart, clustered_range.globalend)) - + ds.WithGlobalRange((clustered_range.globalstart, clustered_range.globalend)) + attach_friend_info_if_present(clustered_range, ds) if current_range.exec_id not in _graph_cache._RDF_REGISTER: # Fill the cache with the new RDataFrame - _graph_cache._RDF_REGISTER[current_range.exec_id] = ROOT.RDataFrame(ds) - + _graph_cache._RDF_REGISTER[current_range.exec_id] = ROOT.RDataFrame(ds) + else: # Update it to the range of entries for this task ROOT.Internal.RDF.ChangeSpec( - ROOT.RDF.AsRNode( _graph_cache._RDF_REGISTER[current_range.exec_id]), - ROOT.std.move(ds)) + ROOT.RDF.AsRNode(_graph_cache._RDF_REGISTER[current_range.exec_id]), ROOT.std.move(ds) + ) return TaskObjects(_graph_cache._RDF_REGISTER[current_range.exec_id], entries_in_trees) @@ -822,8 +846,10 @@ def _handle_returned_values(self, values: TaskResult) -> Iterable: the dataset were processed during distributed execution. """ if values.mergeables is None: - raise RuntimeError("The distributed execution returned no values. " - "This can happen if all files in your dataset contain empty trees.") + raise RuntimeError( + "The distributed execution returned no values. " + "This can happen if all files in your dataset contain empty trees." + ) # User could have requested to read the same file multiple times indeed input_files_and_trees = [ @@ -834,24 +860,28 @@ def _handle_returned_values(self, values: TaskResult) -> Iterable: entries_in_trees = values.entries_in_trees # Keys should be exactly the same if files_counts.keys() != entries_in_trees.trees_with_entries.keys(): - raise RuntimeError("The specified input files and the files that were " - "actually processed are not the same:\n" - f"Input files: {list(files_counts.keys())}\n" - f"Processed files: {list(entries_in_trees.trees_with_entries.keys())}") + raise RuntimeError( + "The specified input files and the files that were " + "actually processed are not the same:\n" + f"Input files: {list(files_counts.keys())}\n" + f"Processed files: {list(entries_in_trees.trees_with_entries.keys())}" + ) # Multiply the entries of each tree by the number of times it was # requested by the user for fullpath in files_counts: entries_in_trees.trees_with_entries[fullpath] *= files_counts[fullpath] - total_dataset_entries = sum( - entries_in_trees.trees_with_entries.values()) + total_dataset_entries = sum(entries_in_trees.trees_with_entries.values()) if entries_in_trees.processed_entries != total_dataset_entries: - raise RuntimeError(f"The dataset has {total_dataset_entries} entries, " - f"but {entries_in_trees.processed_entries} were processed.") + raise RuntimeError( + f"The dataset has {total_dataset_entries} entries, " + f"but {entries_in_trees.processed_entries} were processed." + ) return values.mergeables - + + class RNTupleHeadNode(HeadNode): """ The head node of a computation graph where the RDataFrame data source is @@ -911,7 +941,6 @@ def build_rdf_from_range(current_range: Ranges.TreeRangePerc) -> TaskObjects: clustered_range.filenames, (clustered_range.globalstart, clustered_range.globalend), ), - entries_in_rntuples, ) @@ -952,7 +981,9 @@ def _handle_returned_values(self, values: TaskResult) -> Iterable: total_dataset_entries = sum(entries_in_rntuples.trees_with_entries.values()) if entries_in_rntuples.processed_entries != total_dataset_entries: - raise RuntimeError(f"The dataset has {total_dataset_entries} entries, " - f"but {entries_in_rntuples.processed_entries} were processed.") + raise RuntimeError( + f"The dataset has {total_dataset_entries} entries, " + f"but {entries_in_rntuples.processed_entries} were processed." + ) - return values.mergeables + return values.mergeables diff --git a/bindings/distrdf/python/DistRDF/Ranges.py b/bindings/distrdf/python/DistRDF/Ranges.py index 98dd936c57dd0..f84594788bc5b 100644 --- a/bindings/distrdf/python/DistRDF/Ranges.py +++ b/bindings/distrdf/python/DistRDF/Ranges.py @@ -20,12 +20,11 @@ class SerializableRSample: Stores the information relative to an RSample, removing knowledge of C++ types such that cppyy is not involved during serialization/deserialization. """ + def __init__(self, sample: ROOT.RDF.Experimental.RSample): self.name: str = sample.GetSampleName() - self.treenames: list[str] = [str(treename) - for treename in sample.GetTreeNames()] - self.filenames: list[str] = [str(filename) - for filename in sample.GetFileNameGlobs()] + self.treenames: list[str] = [str(treename) for treename in sample.GetTreeNames()] + self.filenames: list[str] = [str(filename) for filename in sample.GetFileNameGlobs()] self.metadata: str = ROOT.Internal.RDF.ExportJSON(sample.GetMetaData()) @@ -41,6 +40,7 @@ class DataRange: id: A sequential counter to identify this range. """ + exec_id: ExecutionIdentifier id: int @@ -56,6 +56,7 @@ class EmptySourceRange(DataRange): end (int): Ending entry of this range. """ + start: int end: int @@ -86,6 +87,7 @@ class TreeRangePerc(DataRange): range. Not None if the user provided a TTree or TChain in the distributed RDataFrame constructor. """ + treenames: List[str] filenames: List[str] first_file_idx: int @@ -94,6 +96,8 @@ class TreeRangePerc(DataRange): last_tree_end_perc: float friendinfo: Optional[ROOT.Internal.TreeUtils.RFriendInfo] samples: Optional[List[SerializableRSample]] + + @dataclass class TreeRange(DataRange): """ @@ -119,6 +123,7 @@ class TreeRange(DataRange): range. Not None if the user provided a TTree or TChain in the distributed RDataFrame constructor. """ + treenames: List[str] filenames: List[str] globalstart: int @@ -126,6 +131,7 @@ class TreeRange(DataRange): friendinfo: Optional[ROOT.Internal.TreeUtils.RFriendInfo] samples: Optional[List[SerializableRSample]] + @dataclass class TaskTreeEntries: """ @@ -135,9 +141,11 @@ class TaskTreeEntries: execution. It serves as a sanity check that exactly the total amount of entries in the dataset is processed in the application. """ + processed_entries: int = 0 trees_with_entries: Dict[str, int] = field(default_factory=dict) + def get_balanced_ranges(nentries, npartitions, exec_id: ExecutionIdentifier): """ Builds range pairs from the given values of the number of entries in @@ -181,6 +189,7 @@ def get_balanced_ranges(nentries, npartitions, exec_id: ExecutionIdentifier): return ranges + def get_rntuple_clusters_and_entries(ntuplename: str, filename: str) -> Tuple[List[int], int]: """ Retrieve cluster boundaries and number of entries of an RNTuple. @@ -197,9 +206,14 @@ def get_clusters_and_entries(rdf_uuid: str, datasetname: str, filename: str) -> return ROOT.Internal.TreeUtils.GetClustersAndEntries(datasetname, filename) -def get_percentage_ranges(treenames: List[str], filenames: List[str], npartitions: int, - friendinfo: Optional[ROOT.Internal.TreeUtils.RFriendInfo], - exec_id: ExecutionIdentifier, sampleMap: Optional[Dict[str, SerializableRSample]],) -> List[TreeRangePerc]: +def get_percentage_ranges( + treenames: List[str], + filenames: List[str], + npartitions: int, + friendinfo: Optional[ROOT.Internal.TreeUtils.RFriendInfo], + exec_id: ExecutionIdentifier, + sampleMap: Optional[Dict[str, SerializableRSample]], +) -> List[TreeRangePerc]: """ Create a list of tasks that will process the given trees partitioning them by percentages. @@ -216,7 +230,7 @@ def get_percentage_ranges(treenames: List[str], filenames: List[str], npartition # percentages = [0., 1.428, 2.857, 4.285, 5.714, 7.142, 8.571, 10.] # files_of_percentages = [0, 1, 2, 4, 5, 7, 8, 10] # percentages_wrt_files = [0., 0.428, 0.857, 0.285, 0.714, 0.142, 0.571, 0.] - percentages = [files_per_partition * i for i in range(npartitions+1)] + percentages = [files_per_partition * i for i in range(npartitions + 1)] files_of_percentages = [floor(percentage) for percentage in percentages] percentages_wrt_files = [perc - file for perc, file in zip(percentages, files_of_percentages)] @@ -248,7 +262,6 @@ def get_percentage_ranges(treenames: List[str], filenames: List[str], npartition # We need to transmit the full list of treenames and filenames to each # task, in order to properly align the full dataset considering friends. if sampleMap is not None: - samples = [] for filename, treename in zip(filenames, treenames): @@ -256,17 +269,35 @@ def get_percentage_ranges(treenames: List[str], filenames: List[str], npartition samples.append(sample) return [ - TreeRangePerc( - exec_id, rangeid, treenames, filenames, start_sample_idxs[rangeid], end_sample_idxs[rangeid], - first_tree_start_perc_tasks[rangeid], last_tree_end_perc_tasks[rangeid], friendinfo, samples) - for rangeid in range(npartitions) + TreeRangePerc( + exec_id, + rangeid, + treenames, + filenames, + start_sample_idxs[rangeid], + end_sample_idxs[rangeid], + first_tree_start_perc_tasks[rangeid], + last_tree_end_perc_tasks[rangeid], + friendinfo, + samples, + ) + for rangeid in range(npartitions) ] - else: + else: return [ TreeRangePerc( - exec_id, rangeid, treenames, filenames, start_sample_idxs[rangeid], end_sample_idxs[rangeid], - first_tree_start_perc_tasks[rangeid], last_tree_end_perc_tasks[rangeid], friendinfo, None) + exec_id, + rangeid, + treenames, + filenames, + start_sample_idxs[rangeid], + end_sample_idxs[rangeid], + first_tree_start_perc_tasks[rangeid], + last_tree_end_perc_tasks[rangeid], + friendinfo, + None, + ) for rangeid in range(npartitions) ] else: @@ -276,44 +307,58 @@ def get_percentage_ranges(treenames: List[str], filenames: List[str], npartition # from the full list of filenames. tasktreenames = [treenames[s:e] for s, e in zip(start_sample_idxs, end_sample_idxs)] taskfilenames = [filenames[s:e] for s, e in zip(start_sample_idxs, end_sample_idxs)] - + if sampleMap is not None: - tasksamples = [] - + for taskfilename, tasktreename in zip(taskfilenames, tasktreenames): tasksample = [] - for i in range (len(taskfilename)): + for i in range(len(taskfilename)): sample = sampleMap.get(taskfilename[i] + "/" + tasktreename[i]) - tasksample.append(sample) + tasksample.append(sample) tasksamples.append(tasksample) - + return [ - - TreeRangePerc( - exec_id, rangeid, tasktreenames[rangeid], taskfilenames[rangeid], 0, len(taskfilenames[rangeid]), - first_tree_start_perc_tasks[rangeid], last_tree_end_perc_tasks[rangeid], friendinfo, tasksamples[rangeid] - ) - for rangeid in range(npartitions) - ] - + TreeRangePerc( + exec_id, + rangeid, + tasktreenames[rangeid], + taskfilenames[rangeid], + 0, + len(taskfilenames[rangeid]), + first_tree_start_perc_tasks[rangeid], + last_tree_end_perc_tasks[rangeid], + friendinfo, + tasksamples[rangeid], + ) + for rangeid in range(npartitions) + ] + # On the other hand, when creating the TreeRangePerc tasks below, the # starting and ending indexes have to be task-local. In practice, the # task always starts from file index 0 and it always ends at file index # equal to the number of files assigned to that task. else: - return [ TreeRangePerc( - exec_id, rangeid, tasktreenames[rangeid], taskfilenames[rangeid], 0, len(taskfilenames[rangeid]), - first_tree_start_perc_tasks[rangeid], last_tree_end_perc_tasks[rangeid], friendinfo, None + exec_id, + rangeid, + tasktreenames[rangeid], + taskfilenames[rangeid], + 0, + len(taskfilenames[rangeid]), + first_tree_start_perc_tasks[rangeid], + last_tree_end_perc_tasks[rangeid], + friendinfo, + None, ) for rangeid in range(npartitions) ] -def get_entryrange_at_cluster_boundaries(percstart: float, percend: float, - entries: int, clusters: List[int]) -> Tuple[int, int, int]: +def get_entryrange_at_cluster_boundaries( + percstart: float, percend: float, entries: int, clusters: List[int] +) -> Tuple[int, int, int]: """ Computes the pair (start, end) entries of this tree, aligned at cluster boundaries. @@ -398,16 +443,15 @@ def get_clustered_range_from_percs(percrange: TreeRangePerc) -> Tuple[Optional[T # entries = [10, 10, 10, 10] # offsets = [0, 10, 20, 30] initial = (0,) - all_offsets = tuple(accumulate(initial+all_entries[:-1])) + all_offsets = tuple(accumulate(initial + all_entries[:-1])) # Connect each tree in each file with its number of entries trees_with_entries: Dict[str, int] = { filename + "/" + treename: entries - for filename, treename, entries - in zip( + for filename, treename, entries in zip( percrange.filenames[first_file_idx:last_file_idx], percrange.treenames[first_file_idx:last_file_idx], - all_entries[first_file_idx:last_file_idx] + all_entries[first_file_idx:last_file_idx], ) } @@ -419,8 +463,10 @@ def get_clustered_range_from_percs(percrange: TreeRangePerc) -> Tuple[Optional[T if (last_file_idx - first_file_idx) == 1: # Compute only once if there is only one file first_tree_startentry, last_tree_endentry, first_tree_entries_to_discard = get_entryrange_at_cluster_boundaries( - percrange.first_tree_start_perc, percrange.last_tree_end_perc, - all_entries[first_file_idx], all_clusters[first_file_idx] + percrange.first_tree_start_perc, + percrange.last_tree_end_perc, + all_entries[first_file_idx], + all_clusters[first_file_idx], ) entries_to_discard = first_tree_entries_to_discard else: @@ -428,7 +474,7 @@ def get_clustered_range_from_percs(percrange: TreeRangePerc) -> Tuple[Optional[T percrange.first_tree_start_perc, 1, all_entries[first_file_idx], all_clusters[first_file_idx] ) _, last_tree_endentry, last_tree_entries_to_discard = get_entryrange_at_cluster_boundaries( - 0, percrange.last_tree_end_perc, all_entries[last_file_idx-1], all_clusters[last_file_idx-1] + 0, percrange.last_tree_end_perc, all_entries[last_file_idx - 1], all_clusters[last_file_idx - 1] ) entries_to_discard = first_tree_entries_to_discard + last_tree_entries_to_discard @@ -447,11 +493,19 @@ def get_clustered_range_from_percs(percrange: TreeRangePerc) -> Tuple[Optional[T return None, TaskTreeEntries(0, trees_with_entries) globalstart = all_offsets[first_file_idx] + first_tree_startentry - globalend = all_offsets[last_file_idx-1] + last_tree_endentry + globalend = all_offsets[last_file_idx - 1] + last_tree_endentry entries_in_trees = TaskTreeEntries(globalend - globalstart, trees_with_entries) - treerange = TreeRange(percrange.exec_id, percrange.id, percrange.treenames, percrange.filenames, - globalstart, globalend, percrange.friendinfo, percrange.samples) + treerange = TreeRange( + percrange.exec_id, + percrange.id, + percrange.treenames, + percrange.filenames, + globalstart, + globalend, + percrange.friendinfo, + percrange.samples, + ) return treerange, entries_in_trees