Git Product home page Git Product logo

Comments (28)

douglasdavis avatar douglasdavis commented on May 27, 2024

from dask-histogram.

NJManganelli avatar NJManganelli commented on May 27, 2024

Yeah, I have stripped away some, but still need to remove the rest of the coffea stuff, and I will post that (near-)minimal reproducer here

from dask-histogram.

lgray avatar lgray commented on May 27, 2024

I couldn't get anything to reproduce just using dask.array.random even with a billion inputs. :-/

from dask-histogram.

NJManganelli avatar NJManganelli commented on May 27, 2024

So... non-trivial. The below is not a minimal reproducer yet, and it needs to be shown that swapping the input file to something else publicly available(maybe the CMS Open Data files?) will still show the bug

Iason should lay out how to install their environment and getting into it. It may not be integral, but I don't know if this will apply to other environment setups equally well (pip list freeze output is attached, though).

With Iason's package installed, and using a single file from the dataset being studied, I was able to reduce things to the below.

Things are finnicky, and several times it seemed a small change 'eliminated' the bug, and it's unclear if it did or it was chance, because sometimes it'll complete without showing discrepancies (as I discovered later on). Some things that seemed to result in this included: eliminating some of the fills (normally 2 per histogram), eliminating the eta histograms entirely, simplifying the filter_events function to be nearly-trivial (restrict to at least 2 electrons and accept all electrons with a trivially-true pass_selection)...

import os

import dask_awkward as dak
import awkward as ak
from coffea.lumi_tools import LumiMask

import hist
import matplotlib.pyplot as plt

from egamma_tnp.utils import get_nanoevents_file

def filter_events(events, pt):
    events = events[dak.num(events.Electron) >= 2]
    abs_eta = abs(events.Electron.eta)
    pass_eta_ebeegap = (abs_eta < 1.4442) | (abs_eta > 1.566)
    pass_tight_id = events.Electron.cutBased == 4
    pass_pt = events.Electron.pt > pt
    pass_eta = abs_eta <= 2.5
    pass_selection = pass_pt & pass_eta & pass_eta_ebeegap & pass_tight_id
    n_of_tags = dak.sum(pass_selection, axis=1)
    good_events = events[n_of_tags >= 2]
    good_locations = pass_selection[n_of_tags >= 2]

    return good_events, good_locations

def perform_tnp(events, pt, goldenjson):
    #if goldenjson:
    #    events = apply_lumimasking(events, goldenjson)
    good_events, good_locations = filter_events(events, pt)
    ele_for_tnp = good_events.Electron[good_locations]

    zcands1 = dak.combinations(ele_for_tnp, 2, fields=["tag", "probe"])
    zcands2 = dak.combinations(ele_for_tnp, 2, fields=["probe", "tag"])
    #p1, a1 = find_probes(zcands1.tag, zcands1.probe, good_events.TrigObj, pt)
    #p2, a2 = find_probes(zcands2.tag, zcands2.probe, good_events.TrigObj, pt)
    p1 = zcands1.probe#[zcands1.probe.eta > -1.3]
    a1 = zcands1.probe
    p2 = zcands2.probe#[zcands2.probe.eta > -1.3]
    a2 = zcands2.probe

    return p1, a1, p2, a2, 


def get_pt_and_eta_arrays(events, pt, goldenjson):
    p1, a1, p2, a2 = perform_tnp(events, pt, goldenjson)

    pt_pass1 = dak.flatten(p1.pt)
    pt_pass2 = dak.flatten(p2.pt)
    pt_all1 = dak.flatten(a1.pt)
    pt_all2 = dak.flatten(a2.pt)

    eta_pass1 = dak.flatten(p1.eta)
    eta_pass2 = dak.flatten(p2.eta)
    eta_all1 = dak.flatten(a1.eta)
    eta_all2 = dak.flatten(a2.eta)

    return (
        pt_pass1,
        pt_pass2,
        pt_all1,
        pt_all2,
        eta_pass1,
        eta_pass2,
        eta_all1,
        eta_all2,
    )

def get_tnp_histograms(events, pt, goldenjson):
    import json
    import os

    import hist
    from hist.dask import Hist

    ptbins = [5, 10, 15, 20, 22, 26, 28, 30, 32, 34, 36, 38, 40, 45, 50, 60, 80, 100, 150, 250, 400]
    etabins = [-2.5, -2.4, -2.3, -2.2, -2.1, -2.0, -1.9, -1.8, -1.7, -1.566, -1.4442, -1.3, -1.2, -1.1, -1.0, -0.9, -0.8, -0.7, -0.6, -0.5, -0.4, -0.3, -0.2, -0.1, 0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1, 1.1, 1.2, 1.3, 1.4442, 1.566, 1.7, 1.8, 1.9, 2.0, 2.1, 2.2, 2.3, 2.4, 2.5]
    (
        pt_pass1,
        pt_pass2,
        pt_all1,
        pt_all2,
        eta_pass1,
        eta_pass2,
        eta_all1,
        eta_all2,
    ) = get_pt_and_eta_arrays(events, pt, goldenjson)
   
    ptaxis = hist.axis.Variable(ptbins, name="pt")
    hpt_all = Hist(ptaxis)
    hpt_pass = Hist(ptaxis)

    etaaxis = hist.axis.Variable(etabins, name="eta")
    heta_all = Hist(etaaxis)
    heta_pass = Hist(etaaxis)

    hpt_pass.fill(pt_pass1)
    hpt_pass.fill(pt_pass2)
    #hpt_all.fill(pt_all1)
    #hpt_all.fill(pt_all2)
    heta_pass.fill(eta_pass1)
    heta_pass.fill(eta_pass2)
    #heta_all.fill(eta_all1)
    #heta_all.fill(eta_all2)

    #return hpt_pass, hpt_all, heta_pass, heta_all
    return hpt_pass, heta_pass


def get_and_compute_tnp_histograms(events, pt, goldenjson, scheduler, progress):
    import dask
    from dask.diagnostics import ProgressBar

    (
        hpt_pass,
        #hpt_all,
        eta_pass,
        #eta_all,
    ) = get_tnp_histograms(events, pt, goldenjson)

    if progress:
        pbar = ProgressBar()
        pbar.register()

    res = dask.compute(
        hpt_pass,
        #hpt_all,
        eta_pass,
        #eta_all,
        scheduler=scheduler,
    )

    if progress:
        pbar.unregister()

    return res

class TagNProbe:
    def __init__(
        self,
        names,
        trigger_pt,
        *,
        goldenjson=None,
        toquery=False,
        redirect=False,
        custom_redirector="root://cmsxrootd.fnal.gov/",
        invalid=False,
        preprocess=False,
        preprocess_args={},
    ):
        """Tag and Probe for HLT Trigger efficiency from NanoAOD.

        Parameters
        ----------
            names : str or list of str
                The dataset names to query that can contain wildcards or a list of file paths.
            trigger_pt : int or float
                The Pt threshold of the trigger.
            goldenjson : str, optional
                The golden json to use for luminosity masking. The default is None.
            toquery : bool, optional
                Whether to query DAS for the dataset names. The default is False.
            redirect : bool, optional
                Whether to add an xrootd redirector to the files. The default is False.
            custom_redirector : str, optional
                The xrootd redirector to add to the files. The default is "root://cmsxrootd.fnal.gov/".
                Only used if redirect is True.
            invalid : bool, optional
                Whether to include invalid files. The default is False.
                Only used if toquery is True.
            preprocess : bool, optional
                Whether to preprocess the files using coffea.dataset_tools.preprocess().
                The default is False.
            preprocess_args : dict, optional
                Extra arguments to pass to coffea.dataset_tools.preprocess(). The default is {}.
        """
        self.names = names
        self.pt = trigger_pt - 1
        self.goldenjson = goldenjson
        self.toquery = toquery
        self.redirect = redirect
        self.custom_redirector = custom_redirector
        self.invalid = invalid
        self.events = None
        self.preprocess = preprocess
        self.preprocess_args = preprocess_args
        self.file = get_nanoevents_file(
            self.names,
            toquery=self.toquery,
            redirect=self.redirect,
            custom_redirector=self.custom_redirector,
            invalid=self.invalid,
            preprocess=self.preprocess,
            preprocess_args=self.preprocess_args,
        )
        if goldenjson is not None and not os.path.isfile(goldenjson):
            raise ValueError(f"Golden JSON {goldenjson} does not exist.")
            
    def load_events(self, from_root_args={}):
        """Load the events from the names.

        Parameters
        ----------
            from_root_args : dict, optional
                Extra arguments to pass to coffea.nanoevents.NanoEventsFactory.from_root().
                The default is {}.
        """
        from coffea.nanoevents import NanoEventsFactory

        self.events = NanoEventsFactory.from_root(
            self.file,
            permit_dask=True,
            **from_root_args,
        ).events()

tag_n_probe = TagNProbe(
    ["../d45e45dd-5dd8-4cf3-a03a-141a6ff45d44.root"],
    32,
    #goldenjson="../json/Cert_Collisions2023_366442_370790_Golden.json",
    goldenjson = None,
    toquery=False,
    redirect=False,
    preprocess=False,
    preprocess_args={"maybe_step_size": 100_000},
)
print("Done preprocessing")

print("Starting to load events")
tag_n_probe.load_events(
    from_root_args={"uproot_options": {"timeout": 120}, "chunks_per_file": 10}
)

tag_n_probe = TagNProbe(
    ["../d45e45dd-5dd8-4cf3-a03a-141a6ff45d44.root"],
    32,
    #goldenjson="../json/Cert_Collisions2023_366442_370790_Golden.json",
    goldenjson = None,
    toquery=False,
    redirect=False,
    preprocess=False,
    preprocess_args={"maybe_step_size": 100_000},
)
print("Done preprocessing")

print("Starting to load events")
tag_n_probe.load_events(
    from_root_args={"uproot_options": {"timeout": 120}, "chunks_per_file": 10}
)

tmp1 = get_and_compute_tnp_histograms(tag_n_probe.events, 25.0, None, "threads", True)
tmp2 = get_and_compute_tnp_histograms(tag_n_probe.events, 25.0, None, "threads", True)
tmp3 = get_and_compute_tnp_histograms(tag_n_probe.events, 25.0, None, "threads", True)
tmp4 = get_and_compute_tnp_histograms(tag_n_probe.events, 25.0, None, "threads", True)

stk = hist.Stack.from_dict(
    {
     "threads_1": tmp1[0] - tmp1[0],
     "threads_2": tmp2[0] - tmp1[0],
     "threads_3": tmp3[0] - tmp2[0],
     "threads_4": tmp4[0] - tmp3[0],
    }
)
stk.plot()
plt.legend()

print((tmp1[0] - tmp2[0]).values())
print((tmp3[0] - tmp2[0]).values())

iason_packages.txt

from dask-histogram.

ikrommyd avatar ikrommyd commented on May 27, 2024

https://github.com/iasonkrom/egamma-tnp/blob/debug_dask_histogram/debug_dask_histogram.ipynb

I'm also working here now that I got some time. I've removed all the classes and kept only the basic functionality of my code in one notebook. No install of egamma-tnp is required and the only requirements are what is imported (coffea, awkward, hist....etc). I'll keep striping down the code to make it as minimal as possible.

However, @NJManganelli pointed out (and it can also be seen in the snippet), the bug is also there if you run on only one file. However I don't see when I'm running on two files locally while I see it on one file on a cluster. Makes me think....is it architecture based? I have an apple sillicon mac. Anyways, I'll try it also on my old x86 ubuntu laptop and get to the bottom of this.

I'll also try to find a file that is publicly accesible and not private CMS data. I will be commiting to that notebook every time I strip something down and keep updating here

from dask-histogram.

lgray avatar lgray commented on May 27, 2024

I think the first thing to do is to attempt a reproducer without nanoevents.

from dask-histogram.

ikrommyd avatar ikrommyd commented on May 27, 2024

Ok I got something, it IS INDEED architecture based. I'm getting the same effect on my intel ubuntu laptop, LPC and LXPLUS while the effect is not there on my m1 mac.

If you look at the notebook's last cell, the arrays in the cell output should be the same and the underflow bin should always be empty since I'm doing pass_pt = events.Electron.pt > pt but that's not the case. Also note that the sum always stays the same.

I will try to strip out as much stuff as possible to try and find whether a specific awkward operation is causing the bug and also try to reproduce it in plain uproot.

from dask-histogram.

lgray avatar lgray commented on May 27, 2024

Oh fun - I've been doing this on my laptop which is also an m1. I'll try my very simple tests on an x86 machine.

from dask-histogram.

ikrommyd avatar ikrommyd commented on May 27, 2024

At least, we know it's not a scaling issue that happens only on clusters and/or over big datasets

from dask-histogram.

lgray avatar lgray commented on May 27, 2024

What's a full path on xrootd for the file you're using?

from dask-histogram.

lgray avatar lgray commented on May 27, 2024
import hist.dask as hda
import dask.array as da
import dask_awkward as dak
import uproot
import pickle

from distributed import Client

if __name__ == "__main__":


    events = uproot.dask({"./d45e45dd-5dd8-4cf3-a03a-141a6ff45d44.root": "Events"}, steps_per_file=10)
    
    dx = hda.Hist.new.Variable([0, 10, 20, 30, 40, 80, 120, 200], name="x").Double()

    dx.fill(x=dak.flatten(events.Electron_pt[events.Electron_pt > 32]))

    with Client() as client:
        for _ in range(10):
            print(dx.compute().values(flow=True))

Does not reproduce the error, adding back more.

from dask-histogram.

lgray avatar lgray commented on May 27, 2024
import hist.dask as hda
import dask.array as da
import dask_awkward as dak
import uproot
from coffea.nanoevents import NanoEventsFactory
import pickle

from distributed import Client

if __name__ == "__main__":


    #events = uproot.dask({"./d45e45dd-5dd8-4cf3-a03a-141a6ff45d44.root": "Events"}, steps_per_file=10)
    events = NanoEventsFactory.from_root({"./d45e45dd-5dd8-4cf3-a03a-141a6ff45d44.root": "Events"}, permit_dask=True, chunks_per_file=10).events()
    
    dx = hda.Hist.new.Variable([0, 10, 20, 30, 40, 80, 120, 200], name="x").Double()

    #dx.fill(x=dak.flatten(events.Electron_pt[events.Electron_pt > 32]))
    dx.fill(x=dak.flatten(events.Electron.pt[events.Electron.pt > 32]))

    with Client() as client:
        for _ in range(10):
            print(dx.compute().values(flow=True))

Also does not reproduce the behavior. It is therefore unlikely to be nanoevents.

from dask-histogram.

lgray avatar lgray commented on May 27, 2024
import hist.dask as hda
import dask.array as da
import dask_awkward as dak
import uproot
from coffea.nanoevents import NanoEventsFactory
import pickle

from distributed import Client

if __name__ == "__main__":


    #events = uproot.dask({"./d45e45dd-5dd8-4cf3-a03a-141a6ff45d44.root": "Events"}, steps_per_file=10, open_files=False)                                                                                                                            
    events = NanoEventsFactory.from_root({"./d45e45dd-5dd8-4cf3-a03a-141a6ff45d44.root": "Events"}, permit_dask=True, chunks_per_file=10).events()

    dx = hda.Hist.new.Variable([0, 10, 20, 30, 40, 80, 120, 200], name="x").Double()

    #combos = dak.combinations(events.Electron_pt, 2, fields=["first", "second"])                                                                                                                                                                    
    combos = dak.combinations(events.Electron, 2, fields=["first", "second"])

    #dx.fill(x=dak.flatten(combos.second[combos.first > 32]))                                                                                                                                                                                        
    dx.fill(x=dak.flatten(combos.second.pt[combos.first.pt > 32]))

    with Client() as client:
        for _ in range(10):
            print(dx.compute().values(flow=True))

Here is a minimal reproducer. If you uncomment raw uproot and recomment the nanoevents the reproducibility issue goes away.

This does not yet clarify if it is an issue with nanoevents or an issue with zipping/broadcasting.

from dask-histogram.

lgray avatar lgray commented on May 27, 2024

even more minimal, no coffea needed (@agoose77 @jpivarski):

import hist.dask as hda
import dask.array as da
import dask_awkward as dak
import uproot
import pickle

from distributed import Client

if __name__ == "__main__":


    events = uproot.dask({"./d45e45dd-5dd8-4cf3-a03a-141a6ff45d44.root": "Events"}, steps_per_file=10, open_files=False)

    dx = hda.Hist.new.Variable([0, 10, 20, 30, 40, 80, 120, 200], name="x").Double()

    electrons = dak.zip({"pt": events.Electron_pt})

    combos = dak.combinations(electrons, 2, fields=["first", "second"])

    dx.fill(x=dak.flatten(combos.second.pt[combos.first.pt > 32]))

    with Client() as client:
        for _ in range(10):
            print(dx.compute().values(flow=True))

from dask-histogram.

martindurant avatar martindurant commented on May 27, 2024

^ do you still get the problem here with sync/threaded ?

from dask-histogram.

lgray avatar lgray commented on May 27, 2024

anecdotally yes - but I'll check with this minimal repro now.

from dask-histogram.

lgray avatar lgray commented on May 27, 2024

threads but not sync for the variations from run to run - however the input data are mangled, there should not be any entries in the underflow bin and there are.

from dask-histogram.

lgray avatar lgray commented on May 27, 2024

Strangely if I set threads_per_worker=1 on the Client() the error persists.

from dask-histogram.

ikrommyd avatar ikrommyd commented on May 27, 2024

This was also the case in my runs with the full package. threads_per_worker=1 wasn't fixing it but it was also there the sync scheduler

from dask-histogram.

lgray avatar lgray commented on May 27, 2024

Here is a reproducer without uproot, distributed, or threads:

import hist.dask as hda
import dask.array as da
import dask_awkward as dak
import numpy as np

if __name__ == "__main__":

    dar = 200*da.random.random((100_000, 3)).astype(np.float32) + 32
    dak_ar = dak.from_dask_array(dar)

    dx = hda.Hist.new.Variable([0, 10, 20, 30, 40, 80, 120, 200], name="x").Double()

    electrons = dak.zip({"pt": dak_ar})

    combos = dak.combinations(electrons, 2, fields=["first", "second"])

    dx.fill(x=dak.flatten(combos.second.pt))

    for _ in range(10):
        print(dx.compute(scheduler="sync").values(flow=True))

if I remove dak.combinations the problem goes away entirely.

from dask-histogram.

ikrommyd avatar ikrommyd commented on May 27, 2024

You're doing all this on x86 right?

from dask-histogram.

lgray avatar lgray commented on May 27, 2024

Yes.

from dask-histogram.

lgray avatar lgray commented on May 27, 2024

OK I have a reproducer without dask.

import hist
import numpy as np
import awkward as ak

if __name__ == "__main__":

    ar = ak.Array(200*np.random.random((100_000, 3)).astype(np.float32) + 32)

    electrons = ak.zip({"pt": ar})

    combos = ak.combinations(electrons, 2, fields=["first", "second"])

    for _ in range(10):
        x = hist.Hist.new.Variable([0, 10, 20, 30, 40, 80, 120, 200], name="x").Double()
        print(x.fill(x=ak.flatten(combos.second.pt)).values(flow=True))

This fails and results in mangled, irreproducible data.

import hist
import numpy as np
import awkward as ak

if __name__ == "__main__":

    ar = ak.Array(200*np.random.random((100_000, 3)).astype(np.float32) + 32)

    combos = ak.combinations(ar, 2, fields=["first", "second"])

    for _ in range(10):
        x = hist.Hist.new.Variable([0, 10, 20, 30, 40, 80, 120, 200], name="x").Double()
        print(x.fill(x=ak.flatten(combos.second)).values(flow=True))

Executes perfectly fine so it is some interaction between zip and combinations!
Though I haven't removed hist from the possible list of suspects.

@agoose77 @jpivarski - I will open an issue on awkward. This is critical.

from dask-histogram.

lgray avatar lgray commented on May 27, 2024

Attempting this on arm64 (apple silicon) there are no problems.

from dask-histogram.

ikrommyd avatar ikrommyd commented on May 27, 2024

I just noticed that in the pure awkward case, the result is wrong but still consistent at every step of the for loop. Is it because in the daskified case, the array is a different sample at every compute call?

from dask-histogram.

ikrommyd avatar ikrommyd commented on May 27, 2024

Also, when I was using the entire egamma-tnp tool with dask, it was giving random AND wrong histograms every time I ran it while there was no rng anywhere.

from dask-histogram.

lgray avatar lgray commented on May 27, 2024

That's fine, the point of the rng here is just to produce some data and remove dependencies on uproot to narrow the places to look.

Through this we identified that there's a memory issue on x86 for the current release of boost histgram.

Amusingly, if you use numpy histograms (or dask-histogram's numpy interface) all these problems go away.

from dask-histogram.

ikrommyd avatar ikrommyd commented on May 27, 2024

@lgray you can also close this one as complete

from dask-histogram.

Related Issues (12)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.