diff --git a/bindings/distrdf/python/DistRDF/HeadNode.py b/bindings/distrdf/python/DistRDF/HeadNode.py index d0b3b677cb99a..9e408f0a49c3b 100644 --- a/bindings/distrdf/python/DistRDF/HeadNode.py +++ b/bindings/distrdf/python/DistRDF/HeadNode.py @@ -52,6 +52,7 @@ class TaskObjects: for a tree opened in the task and the value is the number of entries in that tree. This attribute is not None only in a TTree-based run. """ + rdf: Optional[ROOT.RDF.RNode] entries_in_trees: Optional[Ranges.TaskTreeEntries] @@ -106,7 +107,7 @@ def __init__(self, backend: BaseBackend, npartitions: Optional[int], localdf: RO self.rdf_node = localdf # A dictionary where the keys are the IDs of the objects to live visualize - # and the values are the corresponding callback functions + # and the values are the corresponding callback functions # This attribute only gets set in case the LiveVisualize() function is called self.drawables_dict: Optional[Dict[int, List[Optional[Callable]]]] = None @@ -192,7 +193,7 @@ def _execute_and_retrieve_results(self, mapper, local_nodes) -> TaskResult: # 2. Index of the node in the local_nodes list i, # 3. Name of the operation associated with the node - node.operation.name + node.operation.name, ) for i, node in enumerate(local_nodes) # Filter: Only include nodes requested by the user @@ -232,17 +233,20 @@ def execute_graph(self) -> None: self.exec_id = _graph_cache.ExecutionIdentifier(self.rdf_uuid, uuid.uuid4()) - - computation_graph_callable = partial(ComputationGraphGenerator.trigger_computation_graph, self._generate_graph_dict()) + computation_graph_callable = partial( + ComputationGraphGenerator.trigger_computation_graph, self._generate_graph_dict() + ) # Accumulate all code that needs to be declared in one string code_to_declare = "\n".join(self.backend.strings_to_declare.values()) - mapper = partial(distrdf_mapper, - build_rdf_from_range=self._generate_rdf_creator(), - computation_graph_callable=computation_graph_callable, - initialization_fn=self.backend.initialization, - code_to_declare=code_to_declare) + mapper = partial( + distrdf_mapper, + build_rdf_from_range=self._generate_rdf_creator(), + computation_graph_callable=computation_graph_callable, + initialization_fn=self.backend.initialization, + code_to_declare=code_to_declare, + ) # List of action nodes in the same order as values local_nodes = self._get_action_nodes() @@ -258,7 +262,7 @@ def execute_graph(self) -> None: # Perform any extra checks that may be needed according to the # type of the head node final_values = self._handle_returned_values(returned_values) - + # Set the value of every action node for node, value in zip(local_nodes, final_values): Utils.set_value_on_node(value, node, self.backend) @@ -275,8 +279,12 @@ def get_headnode(backend: BaseBackend, npartitions: int, *args) -> HeadNode: try: localdf = ROOT.RDataFrame(*args) except TypeError: - raise TypeError(("The arguments provided are not accepted by any RDataFrame constructor. " - "See the RDataFrame documentation for the accepted constructor argument types.")) + raise TypeError( + ( + "The arguments provided are not accepted by any RDataFrame constructor. " + "See the RDataFrame documentation for the accepted constructor argument types." + ) + ) if isinstance(args[0], ROOT.RDF.Experimental.RDatasetSpec): return RDatasetSpecHeadNode(backend, npartitions, localdf, *args) @@ -290,10 +298,13 @@ def get_headnode(backend: BaseBackend, npartitions: int, *args) -> HeadNode: return EmptySourceHeadNode(backend, npartitions, localdf, args[0]) else: raise RuntimeError( - (f"First argument {args[0]} of type {type(args[0])} is not " - "recognised as a supported argument for distributed RDataFrame. " - "Currently supported data sources are: TTree, RNTuple or an empty " - "data source.")) + ( + f"First argument {args[0]} of type {type(args[0])} is not " + "recognised as a supported argument for distributed RDataFrame. " + "Currently supported data sources are: TTree, RNTuple or an empty " + "data source." + ) + ) class EmptySourceHeadNode(HeadNode): @@ -330,15 +341,17 @@ def _build_ranges(self) -> List[Ranges.DataRange]: # Empty datasets cannot be processed distributedly if not self.nentries: raise RuntimeError( - ("Cannot build a distributed RDataFrame with zero entries. " - "Distributed computation will fail. ")) + ("Cannot build a distributed RDataFrame with zero entries. Distributed computation will fail. ") + ) # TODO: This shouldn't be triggered if entries == 1. The current minimum # amount of partitions is 2. We need a robust reducer that smartly # becomes no-op if npartitions == 1 to avoid this. if self.npartitions > self.nentries: # Restrict 'npartitions' if it's greater than 'nentries' - msg = ("Number of partitions {0} is greater than number of entries {1} " - "in the dataframe. Using {1} partition(s)".format(self.npartitions, self.nentries)) + msg = ( + "Number of partitions {0} is greater than number of entries {1} " + "in the dataframe. Using {1} partition(s)".format(self.npartitions, self.nentries) + ) warnings.warn(msg, UserWarning, stacklevel=2) self.npartitions = self.nentries return Ranges.get_balanced_ranges(self.nentries, self.npartitions, self.exec_id) @@ -361,7 +374,8 @@ def build_rdf_from_range(current_range): ROOT.Internal.RDF.ChangeEmptyEntryRange( ROOT.RDF.AsRNode(_graph_cache._RDF_REGISTER[current_range.exec_id]), - (current_range.start, current_range.end)) + (current_range.start, current_range.end), + ) return TaskObjects(_graph_cache._RDF_REGISTER[current_range.exec_id], None) @@ -430,12 +444,10 @@ def __init__(self, backend: BaseBackend, npartitions: Optional[int], localdf: RO # RDataFrame(tree, defaultBranches = {}) tree = args[0] # TTreeIndex is not supported in distributed RDataFrame - idx_found, friendname = ROOT.Internal.TreeUtils.TreeUsesIndexedFriends( - tree) + idx_found, friendname = ROOT.Internal.TreeUtils.TreeUsesIndexedFriends(tree) if idx_found: raise ValueError( - f"Friend tree '{friendname}' has a TTreeIndex. " - "This is not supported in distributed mode." + f"Friend tree '{friendname}' has a TTreeIndex. This is not supported in distributed mode." ) # Retrieve information about friend trees when user passes a TTree # or TChain object. @@ -473,10 +485,12 @@ def __init__(self, backend: BaseBackend, npartitions: Optional[int], localdf: RO def _build_ranges(self) -> List[Ranges.DataRange]: """Build the ranges for this dataset.""" - logger.debug("Building ranges from dataset info:\n" - "main treename: %s\n" - "names of subtrees: %s\n" - "input files: %s\n", self.maintreename, self.subtreenames, self.inputfiles) + logger.debug( + "Building ranges from dataset info:\nmain treename: %s\nnames of subtrees: %s\ninput files: %s\n", + self.maintreename, + self.subtreenames, + self.inputfiles, + ) if logger.isEnabledFor(logging.DEBUG): # Compute clusters and entries of the first tree in the dataset. @@ -493,9 +507,12 @@ def _build_ranges(self) -> List[Ranges.DataRange]: logger.debug( "The number of requested partitions could be higher than the maximum amount of " "chunks the dataset can be split in. Some tasks could be doing no work. Consider " - "setting the 'npartitions' parameter of the RDataFrame constructor to a lower value.") + "setting the 'npartitions' parameter of the RDataFrame constructor to a lower value." + ) - return Ranges.get_percentage_ranges(self.subtreenames, self.inputfiles, self.npartitions, self.friendinfo, self.exec_id, None) + return Ranges.get_percentage_ranges( + self.subtreenames, self.inputfiles, self.npartitions, self.friendinfo, self.exec_id, None + ) def _generate_rdf_creator(self) -> Callable[[Ranges.DataRange], TaskObjects]: """ @@ -504,8 +521,9 @@ def _generate_rdf_creator(self) -> Callable[[Ranges.DataRange], TaskObjects]: the TTree data source. """ - def attach_friend_info_if_present(current_range: Ranges.TreeRange, - ds: ROOT.RDF.Experimental.RDatasetSpec) -> None: + def attach_friend_info_if_present( + current_range: Ranges.TreeRange, ds: ROOT.RDF.Experimental.RDatasetSpec + ) -> None: """ Adds info about friend trees to the input chain. Also aligns the starting and ending entry of the friend chain cache to those of the @@ -513,7 +531,7 @@ def attach_friend_info_if_present(current_range: Ranges.TreeRange, """ # Gather information about friend trees. Check that we got an # RFriendInfo struct and that it's not empty - if (current_range.friendinfo is not None): + if current_range.friendinfo is not None: # If the friend is a TChain, the zipped information looks like: # (name, alias), (file1.root, file2.root, ...), (subname1, subname2, ...) # If the friend is a TTree, the file list is made of @@ -523,10 +541,12 @@ def attach_friend_info_if_present(current_range: Ranges.TreeRange, zipped_friendinfo = zip( current_range.friendinfo.fFriendNames, current_range.friendinfo.fFriendFileNames, - current_range.friendinfo.fFriendChainSubNames + current_range.friendinfo.fFriendChainSubNames, ) for (friend_name, friend_alias), friend_filenames, friend_chainsubnames in zipped_friendinfo: - friend_chainsubnames = friend_chainsubnames if len(friend_chainsubnames) > 0 else [friend_name]*len(friend_filenames) + friend_chainsubnames = ( + friend_chainsubnames if len(friend_chainsubnames) > 0 else [friend_name] * len(friend_filenames) + ) ds.WithGlobalFriends(friend_chainsubnames, friend_filenames, friend_alias) def build_rdf_from_range(current_range: Ranges.TreeRangePerc) -> TaskObjects: @@ -555,8 +575,8 @@ def build_rdf_from_range(current_range: Ranges.TreeRangePerc) -> TaskObjects: else: # Update it to the range of entries for this task ROOT.Internal.RDF.ChangeSpec( - ROOT.RDF.AsRNode(_graph_cache._RDF_REGISTER[current_range.exec_id]), - ROOT.std.move(ds)) + ROOT.RDF.AsRNode(_graph_cache._RDF_REGISTER[current_range.exec_id]), ROOT.std.move(ds) + ) return TaskObjects(_graph_cache._RDF_REGISTER[current_range.exec_id], entries_in_trees) @@ -569,8 +589,10 @@ def _handle_returned_values(self, values: TaskResult) -> Iterable: the dataset were processed during distributed execution. """ if values.mergeables is None: - raise RuntimeError("The distributed execution returned no values. " - "This can happen if all files in your dataset contain empty trees.") + raise RuntimeError( + "The distributed execution returned no values. " + "This can happen if all files in your dataset contain empty trees." + ) # User could have requested to read the same file multiple times indeed input_files_and_trees = [ @@ -581,10 +603,12 @@ def _handle_returned_values(self, values: TaskResult) -> Iterable: entries_in_trees = values.entries_in_trees # Keys should be exactly the same if files_counts.keys() != entries_in_trees.trees_with_entries.keys(): - raise RuntimeError("The specified input files and the files that were " - "actually processed are not the same:\n" - f"Input files: {list(files_counts.keys())}\n" - f"Processed files: {list(entries_in_trees.trees_with_entries.keys())}") + raise RuntimeError( + "The specified input files and the files that were " + "actually processed are not the same:\n" + f"Input files: {list(files_counts.keys())}\n" + f"Processed files: {list(entries_in_trees.trees_with_entries.keys())}" + ) # Multiply the entries of each tree by the number of times it was # requested by the user @@ -593,11 +617,14 @@ def _handle_returned_values(self, values: TaskResult) -> Iterable: total_dataset_entries = sum(entries_in_trees.trees_with_entries.values()) if entries_in_trees.processed_entries != total_dataset_entries: - raise RuntimeError(f"The dataset has {total_dataset_entries} entries, " - f"but {entries_in_trees.processed_entries} were processed.") + raise RuntimeError( + f"The dataset has {total_dataset_entries} entries, " + f"but {entries_in_trees.processed_entries} were processed." + ) return values.mergeables - + + class RDatasetSpecHeadNode(HeadNode): """ The head node of a computation graph where the RDataFrame data source is @@ -613,11 +640,11 @@ class RDatasetSpecHeadNode(HeadNode): used to construct the RDataFrame subtreenames (list[str]): List of tree names in the dataset. - + inputfiles (list[str]): List of file names where the dataset is stored. friendinfo (ROOT.Internal.TreeUtils.RFriendInfo, None): Optional - information about friend trees of the dataset. Retrieved from + information about friend trees of the dataset. Retrieved from RDatasetSpec.GetFriendInfo(). Defaults to None. """ @@ -634,11 +661,10 @@ def __init__(self, backend: BaseBackend, npartitions: Optional[int], localdf: RO super().__init__(backend, npartitions, localdf) # Information about friend trees, if they are present. - self.sampleMap: Dict[str, Ranges.SerializableRSample] = None - + self.sampleMap: Dict[str, Ranges.SerializableRSample] = None + self.friendinfo: Optional[ROOT.Internal.TreeUtils.RFriendInfo] = None - - + # Retrieve the RDatasetSpec that will be processed if isinstance(args[0], ROOT.RDF.Experimental.RDatasetSpec): # RDataFrame(rdatasetspec) @@ -647,16 +673,18 @@ def __init__(self, backend: BaseBackend, npartitions: Optional[int], localdf: RO else: raise RuntimeError( - f"First argument {args[0]} of type {type(args[0])} is not supported " - "in RDatasetSpecHeaNode") - - if (args[0].GetEntryRangeBegin() != 0): - warnings.warn("Setting global range is not supported in distributed parsing of RDatasetSpec, the set range will be ignored and all events will be processed.") - + f"First argument {args[0]} of type {type(args[0])} is not supported in RDatasetSpecHeaNode" + ) + + if args[0].GetEntryRangeBegin() != 0: + warnings.warn( + "Setting global range is not supported in distributed parsing of RDatasetSpec, the set range will be ignored and all events will be processed." + ) + # subtreenames: names of all subtrees in the chain or full path to the tree in the file it belongs to self.subtreenames = [str(treename) for treename in args[0].GetTreeNames()] self.inputfiles = [str(filename) for filename in args[0].GetFileNameGlobs()] - + # A map to map file id (filename/tree) to SerializableRSamples self.sampleMap = {} samples = ROOT.Internal.RDF.MoveOutSamples(args[0]) @@ -664,15 +692,17 @@ def __init__(self, backend: BaseBackend, npartitions: Optional[int], localdf: RO for sample in samples: trees = sample.GetTreeNames() files = sample.GetFileNameGlobs() - for file, tree in zip(files, trees): + for file, tree in zip(files, trees): sampleId = file + "/" + tree - self.sampleMap.update({sampleId : Ranges.SerializableRSample(sample)}) + self.sampleMap.update({sampleId: Ranges.SerializableRSample(sample)}) def _build_ranges(self) -> List[Ranges.DataRange]: """Build the ranges for this dataset.""" - logger.debug("Building ranges from dataset info:\n" - "names of subtrees: %s\n" - "input files: %s\n", self.subtreenames, self.inputfiles) + logger.debug( + "Building ranges from dataset info:\nnames of subtrees: %s\ninput files: %s\n", + self.subtreenames, + self.inputfiles, + ) if logger.isEnabledFor(logging.DEBUG): # Compute clusters and entries of the first tree in the dataset. @@ -681,8 +711,7 @@ def _build_ranges(self) -> List[Ranges.DataRange]: # Depending on the cluster setup, this may still be quite costly, so # we decide to pay the price only if the user explicitly requested # warning logging. - clusters, entries = ROOT.Internal.TreeUtils.GetClustersAndEntries( - self.subtreenames[0], self.inputfiles[0]) + clusters, entries = ROOT.Internal.TreeUtils.GetClustersAndEntries(self.subtreenames[0], self.inputfiles[0]) # The file could contain an empty tree. In that case, the estimate will not be computed. if entries > 0: partitionsperfile = self.npartitions / len(self.inputfiles) @@ -690,9 +719,12 @@ def _build_ranges(self) -> List[Ranges.DataRange]: logger.debug( "The number of requested partitions could be higher than the maximum amount of " "chunks the dataset can be split in. Some tasks could be doing no work. Consider " - "setting the 'npartitions' parameter of the RDataFrame constructor to a lower value.") - - return Ranges.get_percentage_ranges(self.subtreenames, self.inputfiles, self.npartitions, self.friendinfo, self.exec_id, self.sampleMap) + "setting the 'npartitions' parameter of the RDataFrame constructor to a lower value." + ) + + return Ranges.get_percentage_ranges( + self.subtreenames, self.inputfiles, self.npartitions, self.friendinfo, self.exec_id, self.sampleMap + ) def _generate_rdf_creator(self) -> Callable[[Ranges.DataRange], TaskObjects]: """ @@ -700,8 +732,9 @@ def _generate_rdf_creator(self) -> Callable[[Ranges.DataRange], TaskObjects]: RDataFrame on a distributed mapper for a given entry range. """ - def attach_friend_info_if_present(current_range: Ranges.TreeRange, - ds: ROOT.RDF.Experimental.RDatasetSpec) -> None: + def attach_friend_info_if_present( + current_range: Ranges.TreeRange, ds: ROOT.RDF.Experimental.RDatasetSpec + ) -> None: """ Adds info about friend trees to the input chain. Also aligns the starting and ending entry of the friend chain cache to those of the @@ -709,7 +742,7 @@ def attach_friend_info_if_present(current_range: Ranges.TreeRange, """ # Gather information about friend trees. Check that we got an # RFriendInfo struct and that it's not empty - if (current_range.friendinfo is not None): + if current_range.friendinfo is not None: # If the friend is a TChain, the zipped information looks like: # (name, alias), (file1.root, file2.root, ...), (subname1, subname2, ...) # If the friend is a TTree, the file list is made of @@ -719,16 +752,14 @@ def attach_friend_info_if_present(current_range: Ranges.TreeRange, zipped_friendinfo = zip( current_range.friendinfo.fFriendNames, current_range.friendinfo.fFriendFileNames, - current_range.friendinfo.fFriendChainSubNames + current_range.friendinfo.fFriendChainSubNames, ) for (friend_name, friend_alias), friend_filenames, friend_chainsubnames in zipped_friendinfo: friend_chainsubnames = ( - friend_chainsubnames if len(friend_chainsubnames) > 0 - else [friend_name]*len(friend_filenames) + friend_chainsubnames if len(friend_chainsubnames) > 0 else [friend_name] * len(friend_filenames) ) - ds.WithGlobalFriends( - friend_chainsubnames, friend_filenames, friend_alias) - + ds.WithGlobalFriends(friend_chainsubnames, friend_filenames, friend_alias) + def build_rdf_from_range(current_range: Ranges.TreeRangePerc) -> TaskObjects: """ Builds an RDataFrame instance for a distributed mapper. @@ -737,14 +768,13 @@ def build_rdf_from_range(current_range: Ranges.TreeRangePerc) -> TaskObjects: input range object. If the chain cannot be built, returns None. """ - clustered_range, entries_in_trees = Ranges.get_clustered_range_from_percs( - current_range) + clustered_range, entries_in_trees = Ranges.get_clustered_range_from_percs(current_range) if clustered_range is None: return TaskObjects(None, entries_in_trees) - + ds = ROOT.RDF.Experimental.RDatasetSpec() - + filenames_cluster = clustered_range.filenames treenames_cluster = clustered_range.treenames @@ -757,56 +787,53 @@ def build_rdf_from_range(current_range: Ranges.TreeRangePerc) -> TaskObjects: # files processed in a given task range (filenames_cluster) and the list of files belonging to the given sample # (mysample.GetFilenameGlobs). Additionally, we need a dictionary to correctly match files with their trees. - # Matching files with trees + # Matching files with trees treesInFileMap = {} fileids_cluster = [] - + for filename, treename in zip(filenames_cluster, treenames_cluster): fileid = filename + "/" + treename fileids_cluster.append(fileid) - treesInFileMap.update({fileid : treename}) + treesInFileMap.update({fileid: treename}) - # Making sure we don't double count samples - unique_samples = [] + # Making sure we don't double count samples + unique_samples = [] samplenames = [] - + for sample in clustered_range.samples: - mysample = sample._sample - samplename = mysample.GetSampleName() + samplename = sample.name if samplename not in samplenames: - unique_samples.append(mysample) + unique_samples.append(sample) samplenames.append(samplename) - + # Core: matching files with samples and adding samples to the spec that will be processed - for mysample in unique_samples: + for sample in unique_samples: sample_fileids = [ - filenameglob + "/" + treename - for filenameglob, treename in zip(mysample.GetFileNameGlobs(), mysample.GetTreeNames()) + filenameglob + "/" + treename for filenameglob, treename in zip(sample.filenames, sample.treenames) ] - - good_files = list((Counter(filenames_cluster) & Counter(mysample.GetFileNameGlobs())).elements()) - good_fileids = list((Counter(fileids_cluster) & Counter(sample_fileids)).elements()) - good_trees =[ - treesInFileMap.get(fileid) for fileid in good_fileids - ] - - ds.AddSample((mysample.GetSampleName(), good_trees, good_files, mysample.GetMetaData())) - - ds.WithGlobalRange( - (clustered_range.globalstart, clustered_range.globalend)) - + good_files = list((Counter(filenames_cluster) & Counter(sample.filenames)).elements()) + good_fileids = list((Counter(fileids_cluster) & Counter(sample_fileids)).elements()) + + good_trees = [treesInFileMap.get(fileid) for fileid in good_fileids] + + rmetadata = ROOT.RDF.Experimental.RMetaData() + ROOT.Internal.RDF.ImportJSON(rmetadata, sample.metadata) + ds.AddSample((sample.name, good_trees, good_files, rmetadata)) + + ds.WithGlobalRange((clustered_range.globalstart, clustered_range.globalend)) + attach_friend_info_if_present(clustered_range, ds) if current_range.exec_id not in _graph_cache._RDF_REGISTER: # Fill the cache with the new RDataFrame - _graph_cache._RDF_REGISTER[current_range.exec_id] = ROOT.RDataFrame(ds) - + _graph_cache._RDF_REGISTER[current_range.exec_id] = ROOT.RDataFrame(ds) + else: # Update it to the range of entries for this task ROOT.Internal.RDF.ChangeSpec( - ROOT.RDF.AsRNode( _graph_cache._RDF_REGISTER[current_range.exec_id]), - ROOT.std.move(ds)) + ROOT.RDF.AsRNode(_graph_cache._RDF_REGISTER[current_range.exec_id]), ROOT.std.move(ds) + ) return TaskObjects(_graph_cache._RDF_REGISTER[current_range.exec_id], entries_in_trees) @@ -819,8 +846,10 @@ def _handle_returned_values(self, values: TaskResult) -> Iterable: the dataset were processed during distributed execution. """ if values.mergeables is None: - raise RuntimeError("The distributed execution returned no values. " - "This can happen if all files in your dataset contain empty trees.") + raise RuntimeError( + "The distributed execution returned no values. " + "This can happen if all files in your dataset contain empty trees." + ) # User could have requested to read the same file multiple times indeed input_files_and_trees = [ @@ -831,24 +860,28 @@ def _handle_returned_values(self, values: TaskResult) -> Iterable: entries_in_trees = values.entries_in_trees # Keys should be exactly the same if files_counts.keys() != entries_in_trees.trees_with_entries.keys(): - raise RuntimeError("The specified input files and the files that were " - "actually processed are not the same:\n" - f"Input files: {list(files_counts.keys())}\n" - f"Processed files: {list(entries_in_trees.trees_with_entries.keys())}") + raise RuntimeError( + "The specified input files and the files that were " + "actually processed are not the same:\n" + f"Input files: {list(files_counts.keys())}\n" + f"Processed files: {list(entries_in_trees.trees_with_entries.keys())}" + ) # Multiply the entries of each tree by the number of times it was # requested by the user for fullpath in files_counts: entries_in_trees.trees_with_entries[fullpath] *= files_counts[fullpath] - total_dataset_entries = sum( - entries_in_trees.trees_with_entries.values()) + total_dataset_entries = sum(entries_in_trees.trees_with_entries.values()) if entries_in_trees.processed_entries != total_dataset_entries: - raise RuntimeError(f"The dataset has {total_dataset_entries} entries, " - f"but {entries_in_trees.processed_entries} were processed.") + raise RuntimeError( + f"The dataset has {total_dataset_entries} entries, " + f"but {entries_in_trees.processed_entries} were processed." + ) return values.mergeables - + + class RNTupleHeadNode(HeadNode): """ The head node of a computation graph where the RDataFrame data source is @@ -908,7 +941,6 @@ def build_rdf_from_range(current_range: Ranges.TreeRangePerc) -> TaskObjects: clustered_range.filenames, (clustered_range.globalstart, clustered_range.globalend), ), - entries_in_rntuples, ) @@ -949,7 +981,9 @@ def _handle_returned_values(self, values: TaskResult) -> Iterable: total_dataset_entries = sum(entries_in_rntuples.trees_with_entries.values()) if entries_in_rntuples.processed_entries != total_dataset_entries: - raise RuntimeError(f"The dataset has {total_dataset_entries} entries, " - f"but {entries_in_rntuples.processed_entries} were processed.") + raise RuntimeError( + f"The dataset has {total_dataset_entries} entries, " + f"but {entries_in_rntuples.processed_entries} were processed." + ) - return values.mergeables + return values.mergeables diff --git a/bindings/distrdf/python/DistRDF/Ranges.py b/bindings/distrdf/python/DistRDF/Ranges.py index 5fac94a702f25..f84594788bc5b 100644 --- a/bindings/distrdf/python/DistRDF/Ranges.py +++ b/bindings/distrdf/python/DistRDF/Ranges.py @@ -14,20 +14,20 @@ logger = logging.getLogger(__name__) -class SerializableRSample(): - + +class SerializableRSample: + """ + Stores the information relative to an RSample, removing knowledge of C++ types such that cppyy is not involved + during serialization/deserialization. + """ + def __init__(self, sample: ROOT.RDF.Experimental.RSample): - self._sample = sample - - def __getstate__(self): - return {"samplenames" : self._sample.GetSampleName(), "treenames" : self._sample.GetTreeNames() , "filenames" : self._sample.GetFileNameGlobs(), "metadata" : ROOT.Internal.RDF.ExportJSON(self._sample.GetMetaData())} - - - def __setstate__(self, state): - _metadata = ROOT.RDF.Experimental.RMetaData() - ROOT.Internal.RDF.ImportJSON(_metadata, state["metadata"]) - self._sample = ROOT.RDF.Experimental.RSample(state["samplenames"], state["treenames"], state["filenames"], _metadata) - + self.name: str = sample.GetSampleName() + self.treenames: list[str] = [str(treename) for treename in sample.GetTreeNames()] + self.filenames: list[str] = [str(filename) for filename in sample.GetFileNameGlobs()] + self.metadata: str = ROOT.Internal.RDF.ExportJSON(sample.GetMetaData()) + + @dataclass class DataRange: """ @@ -40,6 +40,7 @@ class DataRange: id: A sequential counter to identify this range. """ + exec_id: ExecutionIdentifier id: int @@ -55,6 +56,7 @@ class EmptySourceRange(DataRange): end (int): Ending entry of this range. """ + start: int end: int @@ -85,6 +87,7 @@ class TreeRangePerc(DataRange): range. Not None if the user provided a TTree or TChain in the distributed RDataFrame constructor. """ + treenames: List[str] filenames: List[str] first_file_idx: int @@ -93,6 +96,8 @@ class TreeRangePerc(DataRange): last_tree_end_perc: float friendinfo: Optional[ROOT.Internal.TreeUtils.RFriendInfo] samples: Optional[List[SerializableRSample]] + + @dataclass class TreeRange(DataRange): """ @@ -118,6 +123,7 @@ class TreeRange(DataRange): range. Not None if the user provided a TTree or TChain in the distributed RDataFrame constructor. """ + treenames: List[str] filenames: List[str] globalstart: int @@ -125,6 +131,7 @@ class TreeRange(DataRange): friendinfo: Optional[ROOT.Internal.TreeUtils.RFriendInfo] samples: Optional[List[SerializableRSample]] + @dataclass class TaskTreeEntries: """ @@ -134,9 +141,11 @@ class TaskTreeEntries: execution. It serves as a sanity check that exactly the total amount of entries in the dataset is processed in the application. """ + processed_entries: int = 0 trees_with_entries: Dict[str, int] = field(default_factory=dict) + def get_balanced_ranges(nentries, npartitions, exec_id: ExecutionIdentifier): """ Builds range pairs from the given values of the number of entries in @@ -180,6 +189,7 @@ def get_balanced_ranges(nentries, npartitions, exec_id: ExecutionIdentifier): return ranges + def get_rntuple_clusters_and_entries(ntuplename: str, filename: str) -> Tuple[List[int], int]: """ Retrieve cluster boundaries and number of entries of an RNTuple. @@ -196,9 +206,14 @@ def get_clusters_and_entries(rdf_uuid: str, datasetname: str, filename: str) -> return ROOT.Internal.TreeUtils.GetClustersAndEntries(datasetname, filename) -def get_percentage_ranges(treenames: List[str], filenames: List[str], npartitions: int, - friendinfo: Optional[ROOT.Internal.TreeUtils.RFriendInfo], - exec_id: ExecutionIdentifier, sampleMap: Optional[Dict[str, SerializableRSample]],) -> List[TreeRangePerc]: +def get_percentage_ranges( + treenames: List[str], + filenames: List[str], + npartitions: int, + friendinfo: Optional[ROOT.Internal.TreeUtils.RFriendInfo], + exec_id: ExecutionIdentifier, + sampleMap: Optional[Dict[str, SerializableRSample]], +) -> List[TreeRangePerc]: """ Create a list of tasks that will process the given trees partitioning them by percentages. @@ -215,7 +230,7 @@ def get_percentage_ranges(treenames: List[str], filenames: List[str], npartition # percentages = [0., 1.428, 2.857, 4.285, 5.714, 7.142, 8.571, 10.] # files_of_percentages = [0, 1, 2, 4, 5, 7, 8, 10] # percentages_wrt_files = [0., 0.428, 0.857, 0.285, 0.714, 0.142, 0.571, 0.] - percentages = [files_per_partition * i for i in range(npartitions+1)] + percentages = [files_per_partition * i for i in range(npartitions + 1)] files_of_percentages = [floor(percentage) for percentage in percentages] percentages_wrt_files = [perc - file for perc, file in zip(percentages, files_of_percentages)] @@ -247,7 +262,6 @@ def get_percentage_ranges(treenames: List[str], filenames: List[str], npartition # We need to transmit the full list of treenames and filenames to each # task, in order to properly align the full dataset considering friends. if sampleMap is not None: - samples = [] for filename, treename in zip(filenames, treenames): @@ -255,17 +269,35 @@ def get_percentage_ranges(treenames: List[str], filenames: List[str], npartition samples.append(sample) return [ - TreeRangePerc( - exec_id, rangeid, treenames, filenames, start_sample_idxs[rangeid], end_sample_idxs[rangeid], - first_tree_start_perc_tasks[rangeid], last_tree_end_perc_tasks[rangeid], friendinfo, samples) - for rangeid in range(npartitions) + TreeRangePerc( + exec_id, + rangeid, + treenames, + filenames, + start_sample_idxs[rangeid], + end_sample_idxs[rangeid], + first_tree_start_perc_tasks[rangeid], + last_tree_end_perc_tasks[rangeid], + friendinfo, + samples, + ) + for rangeid in range(npartitions) ] - else: + else: return [ TreeRangePerc( - exec_id, rangeid, treenames, filenames, start_sample_idxs[rangeid], end_sample_idxs[rangeid], - first_tree_start_perc_tasks[rangeid], last_tree_end_perc_tasks[rangeid], friendinfo, None) + exec_id, + rangeid, + treenames, + filenames, + start_sample_idxs[rangeid], + end_sample_idxs[rangeid], + first_tree_start_perc_tasks[rangeid], + last_tree_end_perc_tasks[rangeid], + friendinfo, + None, + ) for rangeid in range(npartitions) ] else: @@ -275,44 +307,58 @@ def get_percentage_ranges(treenames: List[str], filenames: List[str], npartition # from the full list of filenames. tasktreenames = [treenames[s:e] for s, e in zip(start_sample_idxs, end_sample_idxs)] taskfilenames = [filenames[s:e] for s, e in zip(start_sample_idxs, end_sample_idxs)] - + if sampleMap is not None: - tasksamples = [] - + for taskfilename, tasktreename in zip(taskfilenames, tasktreenames): tasksample = [] - for i in range (len(taskfilename)): + for i in range(len(taskfilename)): sample = sampleMap.get(taskfilename[i] + "/" + tasktreename[i]) - tasksample.append(sample) + tasksample.append(sample) tasksamples.append(tasksample) - + return [ - - TreeRangePerc( - exec_id, rangeid, tasktreenames[rangeid], taskfilenames[rangeid], 0, len(taskfilenames[rangeid]), - first_tree_start_perc_tasks[rangeid], last_tree_end_perc_tasks[rangeid], friendinfo, tasksamples[rangeid] - ) - for rangeid in range(npartitions) - ] - + TreeRangePerc( + exec_id, + rangeid, + tasktreenames[rangeid], + taskfilenames[rangeid], + 0, + len(taskfilenames[rangeid]), + first_tree_start_perc_tasks[rangeid], + last_tree_end_perc_tasks[rangeid], + friendinfo, + tasksamples[rangeid], + ) + for rangeid in range(npartitions) + ] + # On the other hand, when creating the TreeRangePerc tasks below, the # starting and ending indexes have to be task-local. In practice, the # task always starts from file index 0 and it always ends at file index # equal to the number of files assigned to that task. else: - return [ TreeRangePerc( - exec_id, rangeid, tasktreenames[rangeid], taskfilenames[rangeid], 0, len(taskfilenames[rangeid]), - first_tree_start_perc_tasks[rangeid], last_tree_end_perc_tasks[rangeid], friendinfo, None + exec_id, + rangeid, + tasktreenames[rangeid], + taskfilenames[rangeid], + 0, + len(taskfilenames[rangeid]), + first_tree_start_perc_tasks[rangeid], + last_tree_end_perc_tasks[rangeid], + friendinfo, + None, ) for rangeid in range(npartitions) ] -def get_entryrange_at_cluster_boundaries(percstart: float, percend: float, - entries: int, clusters: List[int]) -> Tuple[int, int, int]: +def get_entryrange_at_cluster_boundaries( + percstart: float, percend: float, entries: int, clusters: List[int] +) -> Tuple[int, int, int]: """ Computes the pair (start, end) entries of this tree, aligned at cluster boundaries. @@ -397,16 +443,15 @@ def get_clustered_range_from_percs(percrange: TreeRangePerc) -> Tuple[Optional[T # entries = [10, 10, 10, 10] # offsets = [0, 10, 20, 30] initial = (0,) - all_offsets = tuple(accumulate(initial+all_entries[:-1])) + all_offsets = tuple(accumulate(initial + all_entries[:-1])) # Connect each tree in each file with its number of entries trees_with_entries: Dict[str, int] = { filename + "/" + treename: entries - for filename, treename, entries - in zip( + for filename, treename, entries in zip( percrange.filenames[first_file_idx:last_file_idx], percrange.treenames[first_file_idx:last_file_idx], - all_entries[first_file_idx:last_file_idx] + all_entries[first_file_idx:last_file_idx], ) } @@ -418,8 +463,10 @@ def get_clustered_range_from_percs(percrange: TreeRangePerc) -> Tuple[Optional[T if (last_file_idx - first_file_idx) == 1: # Compute only once if there is only one file first_tree_startentry, last_tree_endentry, first_tree_entries_to_discard = get_entryrange_at_cluster_boundaries( - percrange.first_tree_start_perc, percrange.last_tree_end_perc, - all_entries[first_file_idx], all_clusters[first_file_idx] + percrange.first_tree_start_perc, + percrange.last_tree_end_perc, + all_entries[first_file_idx], + all_clusters[first_file_idx], ) entries_to_discard = first_tree_entries_to_discard else: @@ -427,7 +474,7 @@ def get_clustered_range_from_percs(percrange: TreeRangePerc) -> Tuple[Optional[T percrange.first_tree_start_perc, 1, all_entries[first_file_idx], all_clusters[first_file_idx] ) _, last_tree_endentry, last_tree_entries_to_discard = get_entryrange_at_cluster_boundaries( - 0, percrange.last_tree_end_perc, all_entries[last_file_idx-1], all_clusters[last_file_idx-1] + 0, percrange.last_tree_end_perc, all_entries[last_file_idx - 1], all_clusters[last_file_idx - 1] ) entries_to_discard = first_tree_entries_to_discard + last_tree_entries_to_discard @@ -446,11 +493,19 @@ def get_clustered_range_from_percs(percrange: TreeRangePerc) -> Tuple[Optional[T return None, TaskTreeEntries(0, trees_with_entries) globalstart = all_offsets[first_file_idx] + first_tree_startentry - globalend = all_offsets[last_file_idx-1] + last_tree_endentry + globalend = all_offsets[last_file_idx - 1] + last_tree_endentry entries_in_trees = TaskTreeEntries(globalend - globalstart, trees_with_entries) - treerange = TreeRange(percrange.exec_id, percrange.id, percrange.treenames, percrange.filenames, - globalstart, globalend, percrange.friendinfo, percrange.samples) + treerange = TreeRange( + percrange.exec_id, + percrange.id, + percrange.treenames, + percrange.filenames, + globalstart, + globalend, + percrange.friendinfo, + percrange.samples, + ) return treerange, entries_in_trees diff --git a/roottest/python/distrdf/backends/CMakeLists.txt b/roottest/python/distrdf/backends/CMakeLists.txt index 48c0f112b4dc5..e9d56e8145651 100644 --- a/roottest/python/distrdf/backends/CMakeLists.txt +++ b/roottest/python/distrdf/backends/CMakeLists.txt @@ -54,3 +54,8 @@ ROOT_ADD_PYUNITTEST(test_all test_all.py GENERIC ENVIRONMENT "${DISTRDF_ENVIRONM set_tests_properties( pyunittests-roottest-python-distrdf-backends-all PROPERTIES RESOURCE_LOCK "${DISTRDF_RESOURCE_LOCKS}" PROCESSORS 4 TIMEOUT 1200) + +ROOT_ADD_PYUNITTEST(test_standalone_fromspec test_standalone_fromspec.py GENERIC ENVIRONMENT "${DISTRDF_ENVIRONMENT_VARS}") +set_tests_properties( + pyunittests-roottest-python-distrdf-backends-standalone-fromspec + PROPERTIES RESOURCE_LOCK "${DISTRDF_RESOURCE_LOCKS}" PROCESSORS 4) diff --git a/roottest/python/distrdf/backends/test_standalone_fromspec.py b/roottest/python/distrdf/backends/test_standalone_fromspec.py new file mode 100644 index 0000000000000..0374f0a3f6c75 --- /dev/null +++ b/roottest/python/distrdf/backends/test_standalone_fromspec.py @@ -0,0 +1,57 @@ +import shlex +import sys + +import pytest +import ROOT + + +class TestFromSpec: + # RDataFrame is reading files containing CMSSW classes in this test, which will trigger a warning from TClass about + # missing class dictionaries. The warning is only triggered once for the entire duration of the process. Within the + # same process, we may be running the same test multiple times (once per parametrized backend in use). So we can't + # use programmatic warning catching e.g. with pytest.warns because it would always fail on consecutive runs of this + # test. We filter out that warning specifically instead + @pytest.mark.filterwarnings("ignore:no dictionary for class") + def test_fromspec_different_trees(self, payload): + """ + Test usage of FromSpec function when each sample has different trees + """ + + connection, _ = payload + + jsonfile = "../data/ttree/spec_differenttrees.json" + + df = ROOT.RDF.Experimental.FromSpec(jsonfile, executor=connection) + df_checkfilt = df.FilterAvailable("nElectron").Filter("nElectron > 2") + df_new = df.DefinePerSample("lum", 'rdfsampleinfo_.GetD("lum")') + df_filtered = df_new.Filter("lum == 100.") + df_filtered_two = df_new.Filter("lum == 200.") + + df_local = ROOT.RDF.Experimental.FromSpec(jsonfile) + df_new_local = df_local.DefinePerSample("lum", 'rdfsampleinfo_.GetD("lum")') + df_filtered_local = df_new_local.Filter("lum == 100.") + df_filtered_two_local = df_new_local.Filter("lum == 200.") + + df_checkfilt_count = df_checkfilt.Count() + df_filtered_count = df_filtered.Count() + df_filtered_local_count = df_filtered_local.Count() + df_filtered_two_count = df_filtered_two.Count() + df_filtered_two_local_count = df_filtered_two_local.Count() + + assert df_checkfilt_count.GetValue() == 1683 + assert df_filtered_count.GetValue() == 11000 + assert df_filtered_count.GetValue() == df_filtered_local_count.GetValue() + + assert df_filtered_two_count.GetValue() == 13020 + assert df_filtered_two_local_count.GetValue() == df_filtered_two_local_count.GetValue() + + +if __name__ == "__main__": + # The call to sys.exit is needed otherwise CTest would just ignore the + # results returned by pytest, even in case of errors. + # We ignore ResourceWarning about unclosed socket because of https://issues.apache.org/jira/browse/SPARK-38659 which + # has been fixed by https://github.com/apache/spark/pull/53200 and https://github.com/apache/spark/pull/53203 which + # may not be available in all test runner configurations + sys.exit( + pytest.main(args=shlex.split(f'{__file__} -x -vvv -Werror -Wignore:"unclosed