Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 19 additions & 16 deletions bindings/distrdf/python/DistRDF/HeadNode.py
Original file line number Diff line number Diff line change
Expand Up @@ -766,33 +766,36 @@ def build_rdf_from_range(current_range: Ranges.TreeRangePerc) -> TaskObjects:
fileids_cluster.append(fileid)
treesInFileMap.update({fileid : treename})

# Making sure we don't double count samples
unique_samples = []
# Making sure we don't double count samples
unique_samples = []
samplenames = []

for sample in clustered_range.samples:
mysample = sample._sample
samplename = mysample.GetSampleName()
samplename = sample.name
if samplename not in samplenames:
unique_samples.append(mysample)
unique_samples.append(sample)
samplenames.append(samplename)
Comment thread
vepadulano marked this conversation as resolved.

# Core: matching files with samples and adding samples to the spec that will be processed
for mysample in unique_samples:
for sample in unique_samples:
sample_fileids = [
filenameglob + "/" + treename
for filenameglob, treename in zip(mysample.GetFileNameGlobs(), mysample.GetTreeNames())
for filenameglob, treename in zip(sample.filenames, sample.treenames)
]

good_files = list((Counter(filenames_cluster) & Counter(mysample.GetFileNameGlobs())).elements())
good_fileids = list((Counter(fileids_cluster) & Counter(sample_fileids)).elements())

good_trees =[
good_files = list((Counter(filenames_cluster)
& Counter(sample.filenames)).elements())
good_fileids = list(
(Counter(fileids_cluster) & Counter(sample_fileids)).elements())

good_trees = [
treesInFileMap.get(fileid) for fileid in good_fileids
]

ds.AddSample((mysample.GetSampleName(), good_trees, good_files, mysample.GetMetaData()))


rmetadata = ROOT.RDF.Experimental.RMetaData()
ROOT.Internal.RDF.ImportJSON(rmetadata, sample.metadata)
ds.AddSample((sample.name, good_trees, good_files, rmetadata))

ds.WithGlobalRange(
(clustered_range.globalstart, clustered_range.globalend))

Expand Down
25 changes: 13 additions & 12 deletions bindings/distrdf/python/DistRDF/Ranges.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,21 @@

logger = logging.getLogger(__name__)

class SerializableRSample():


class SerializableRSample:
"""
Stores the information relative to an RSample, removing knowledge of C++ types such that cppyy is not involved
during serialization/deserialization.
"""
def __init__(self, sample: ROOT.RDF.Experimental.RSample):
self._sample = sample

def __getstate__(self):
return {"samplenames" : self._sample.GetSampleName(), "treenames" : self._sample.GetTreeNames() , "filenames" : self._sample.GetFileNameGlobs(), "metadata" : ROOT.Internal.RDF.ExportJSON(self._sample.GetMetaData())}
self.name: str = sample.GetSampleName()
self.treenames: list[str] = [str(treename)
for treename in sample.GetTreeNames()]
self.filenames: list[str] = [str(filename)
for filename in sample.GetFileNameGlobs()]
self.metadata: str = ROOT.Internal.RDF.ExportJSON(sample.GetMetaData())



def __setstate__(self, state):
_metadata = ROOT.RDF.Experimental.RMetaData()
ROOT.Internal.RDF.ImportJSON(_metadata, state["metadata"])
self._sample = ROOT.RDF.Experimental.RSample(state["samplenames"], state["treenames"], state["filenames"], _metadata)

@dataclass
class DataRange:
"""
Expand Down
5 changes: 5 additions & 0 deletions roottest/python/distrdf/backends/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,8 @@ ROOT_ADD_PYUNITTEST(test_all test_all.py GENERIC ENVIRONMENT "${DISTRDF_ENVIRONM
set_tests_properties(
pyunittests-roottest-python-distrdf-backends-all
PROPERTIES RESOURCE_LOCK "${DISTRDF_RESOURCE_LOCKS}" PROCESSORS 4 TIMEOUT 1200)

ROOT_ADD_PYUNITTEST(test_standalone_fromspec test_standalone_fromspec.py GENERIC ENVIRONMENT "${DISTRDF_ENVIRONMENT_VARS}")
set_tests_properties(
pyunittests-roottest-python-distrdf-backends-standalone-fromspec
PROPERTIES RESOURCE_LOCK "${DISTRDF_RESOURCE_LOCKS}" PROCESSORS 4)
57 changes: 57 additions & 0 deletions roottest/python/distrdf/backends/test_standalone_fromspec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import shlex
import sys

import pytest
import ROOT


class TestFromSpec:
# RDataFrame is reading files containing CMSSW classes in this test, which will trigger a warning from TClass about
# missing class dictionaries. The warning is only triggered once for the entire duration of the process. Within the
# same process, we may be running the same test multiple times (once per parametrized backend in use). So we can't
# use programmatic warning catching e.g. with pytest.warns because it would always fail on consecutive runs of this
# test. We filter out that warning specifically instead
@pytest.mark.filterwarnings("ignore:no dictionary for class")
def test_fromspec_different_trees(self, payload):
"""
Test usage of FromSpec function when each sample has different trees
"""

connection, _ = payload

jsonfile = "../data/ttree/spec_differenttrees.json"

df = ROOT.RDF.Experimental.FromSpec(jsonfile, executor=connection)
df_checkfilt = df.FilterAvailable("nElectron").Filter("nElectron > 2")
df_new = df.DefinePerSample("lum", 'rdfsampleinfo_.GetD("lum")')
df_filtered = df_new.Filter("lum == 100.")
df_filtered_two = df_new.Filter("lum == 200.")

df_local = ROOT.RDF.Experimental.FromSpec(jsonfile)
df_new_local = df_local.DefinePerSample("lum", 'rdfsampleinfo_.GetD("lum")')
df_filtered_local = df_new_local.Filter("lum == 100.")
df_filtered_two_local = df_new_local.Filter("lum == 200.")

df_checkfilt_count = df_checkfilt.Count()
df_filtered_count = df_filtered.Count()
df_filtered_local_count = df_filtered_local.Count()
df_filtered_two_count = df_filtered_two.Count()
df_filtered_two_local_count = df_filtered_two_local.Count()

assert df_checkfilt_count.GetValue() == 1683
assert df_filtered_count.GetValue() == 11000
assert df_filtered_count.GetValue() == df_filtered_local_count.GetValue()

assert df_filtered_two_count.GetValue() == 13020
assert df_filtered_two_local_count.GetValue() == df_filtered_two_local_count.GetValue()


if __name__ == "__main__":
# The call to sys.exit is needed otherwise CTest would just ignore the
# results returned by pytest, even in case of errors.
# We ignore ResourceWarning about unclosed socket because of https://issues.apache.org/jira/browse/SPARK-38659 which
# has been fixed by https://github.com/apache/spark/pull/53200 and https://github.com/apache/spark/pull/53203 which
# may not be available in all test runner configurations
sys.exit(
pytest.main(args=shlex.split(f'{__file__} -x -vvv -Werror -Wignore:"unclosed <socket.socket":ResourceWarning'))
)