Automatic documentation generated from docstrings

Channel dataclass

Source code in mass2/core/channel.py
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
@dataclass(frozen=True)  # noqa: PLR0904
class Channel:
    df: pl.DataFrame = field(repr=False)
    header: ChannelHeader = field(repr=True)
    npulses: int
    subframediv: int | None = None
    noise: NoiseChannel | None = field(default=None, repr=False)
    good_expr: pl.Expr = field(default_factory=alwaysTrue)
    df_history: list[pl.DataFrame] = field(default_factory=list, repr=False)
    steps: Recipe = field(default_factory=Recipe.new_empty, repr=False)
    steps_elapsed_s: list[float] = field(default_factory=list)
    transform_raw: Callable | None = None

    @property
    def shortname(self):
        return self.header.description

    def mo_stepplots(self):
        desc_ind = {step.description: i for i, step in enumerate(self.steps)}
        first_non_summarize_step = self.steps[0]
        for step in self.steps:
            if isinstance(step, SummarizeStep):
                continue
            first_non_summarize_step = step
            break
        mo_ui = mo.ui.dropdown(
            desc_ind,
            value=first_non_summarize_step.description,
            label=f"choose step for ch {self.header.ch_num}",
        )

        def show():
            return self._mo_stepplots_explicit(mo_ui)

        def step_ind():
            return mo_ui.value

        mo_ui.show = show
        mo_ui.step_ind = step_ind
        return mo_ui

    def _mo_stepplots_explicit(self, mo_ui):
        step_ind = mo_ui.step_ind()
        self.step_plot(step_ind)
        fig = plt.gcf()
        return mo.vstack([mo_ui, misc.show(fig)])

    def get_step(self, index):
        if index < 0:
            # normalize the index to a positive index
            index = len(self.steps) + index
        step = self.steps[index]
        return step, index

    def step_plot(self, step_ind, **kwargs):
        step, step_ind = self.get_step(step_ind)
        if step_ind + 1 == len(self.df_history):
            df_after = self.df
        else:
            df_after = self.df_history[step_ind + 1]
        return step.dbg_plot(df_after, **kwargs)

    def hist(
        self,
        col,
        bin_edges,
        use_good_expr=True,
        use_expr=pl.lit(True),
    ):
        if use_good_expr and self.good_expr is not True:
            # True doesn't implement .and_, haven't found a exper literal equivalent that does
            # so we special case True
            filter_expr = self.good_expr.and_(use_expr)
        else:
            filter_expr = use_expr

        # Group by the specified column and filter using good_expr
        df_small = (self.df.lazy().filter(filter_expr).select(col)).collect()

        values = df_small[col]
        bin_centers, counts = misc.hist_of_series(values, bin_edges)
        return bin_centers, counts

    def plot_hist(
        self,
        col,
        bin_edges,
        axis=None,
        use_good_expr=True,
        use_expr=pl.lit(True),
    ):
        if axis is None:
            _, ax = plt.subplots()  # Create a new figure if no axis is provided
        else:
            ax = axis

        bin_centers, counts = self.hist(col, bin_edges=bin_edges, use_good_expr=use_good_expr, use_expr=use_expr)
        _, step_size = misc.midpoints_and_step_size(bin_edges)
        plt.step(bin_centers, counts, where="mid")

        # Customize the plot
        ax.set_xlabel(str(col))
        ax.set_ylabel(f"Counts per {step_size:.02f} unit bin")
        ax.set_title(f"Histogram of {col} for {self.shortname}")

        plt.tight_layout()
        return bin_centers, counts

    def plot_hists(
        self,
        col,
        bin_edges,
        group_by_col,
        axis=None,
        use_good_expr=True,
        use_expr=pl.lit(True),
        skip_none=True,
    ):
        """
        Plots histograms for the given column, grouped by the specified column.

        Parameters:
        - col (str): The column name to plot.
        - bin_edges (array-like): The edges of the bins for the histogram.
        - group_by_col (str): The column name to group by. This is required.
        - axis (matplotlib.Axes, optional): The axis to plot on. If None, a new figure is created.
        """
        if axis is None:
            _, ax = plt.subplots()  # Create a new figure if no axis is provided
        else:
            ax = axis

        if use_good_expr and self.good_expr is not True:
            # True doesn't implement .and_, haven't found a exper literal equivalent that does
            # so we special case True
            filter_expr = self.good_expr.and_(use_expr)
        else:
            filter_expr = use_expr

        # Group by the specified column and filter using good_expr
        df_small = (self.df.lazy().filter(filter_expr).select(col, group_by_col)).collect().sort(group_by_col, descending=False)

        # Plot a histogram for each group
        counts_dict = {}
        for (group_name,), group_data in df_small.group_by(group_by_col, maintain_order=True):
            if group_name is None and skip_none:
                continue
            # Get the data for the column to plot
            values = group_data[col]
            _, step_size = misc.midpoints_and_step_size(bin_edges)
            bin_centers, counts = misc.hist_of_series(values, bin_edges)
            counts_dict[group_name] = counts
            plt.step(bin_centers, counts, where="mid", label=str(group_name))
            # Plot the histogram for the current group
            # if group_name == "EBIT":
            #     ax.hist(values, bins=bin_edges, alpha=0.9, color="k", label=str(group_name))
            # else:
            #     ax.hist(values, bins=bin_edges, alpha=0.5, label=str(group_name))
            # bin_centers, counts = misc.hist_of_series(values, bin_edges)
            # plt.plot(bin_centers, counts, label=group_name)
        # Customize the plot
        ax.set_xlabel(str(col))
        ax.set_ylabel(f"Counts per {step_size:.02f} unit bin")
        ax.set_title(f"Histogram of {col} grouped by {group_by_col}")

        # Add a legend to label the groups
        ax.legend(title=group_by_col)

        plt.tight_layout()
        return bin_centers, counts_dict

    def plot_scatter(
        self,
        x_col,
        y_col,
        color_col=None,
        use_expr=pl.lit(True),
        use_good_expr=pl.lit(True),
        skip_none=True,
        ax=None,
    ):
        if ax is None:
            plt.figure()
            ax = plt.gca()
        plt.sca(ax)  # set current axis so I can use plt api
        filter_expr = self.good_expr.and_(use_expr)
        df_small = self.df.lazy().filter(filter_expr).select(x_col, y_col, color_col).collect()
        for (name,), data in df_small.group_by(color_col, maintain_order=True):
            if name is None and skip_none and color_col is not None:
                continue
            plt.plot(
                data.select(x_col).to_series(),
                data.select(y_col).to_series(),
                ".",
                label=name,
            )
        plt.xlabel(str(x_col))
        plt.ylabel(str(y_col))
        title_str = f"""{self.header.description}
        use_expr={str(use_expr)}
        good_expr={str(self.good_expr)}"""
        plt.title(title_str)
        if color_col is not None:
            plt.legend(title=color_col)
        plt.tight_layout()

    def good_series(self, col, use_expr=pl.lit(True)):
        return mass2.misc.good_series(self.df, col, self.good_expr, use_expr)

    def rough_cal_combinatoric(
        self,
        line_names,
        uncalibrated_col,
        calibrated_col,
        ph_smoothing_fwhm,
        n_extra=3,
        use_expr=pl.lit(True),
    ) -> "Channel":
        step = mass2.core.RoughCalibrationStep.learn_combinatoric(
            self,
            line_names,
            uncalibrated_col=uncalibrated_col,
            calibrated_col=calibrated_col,
            ph_smoothing_fwhm=ph_smoothing_fwhm,
            n_extra=n_extra,
            use_expr=use_expr,
        )
        return self.with_step(step)

    def rough_cal_combinatoric_height_info(
        self,
        line_names,
        line_heights_allowed,
        uncalibrated_col,
        calibrated_col,
        ph_smoothing_fwhm,
        n_extra=3,
        use_expr=pl.lit(True),
    ) -> "Channel":
        step = mass2.core.RoughCalibrationStep.learn_combinatoric_height_info(
            self,
            line_names,
            line_heights_allowed,
            uncalibrated_col=uncalibrated_col,
            calibrated_col=calibrated_col,
            ph_smoothing_fwhm=ph_smoothing_fwhm,
            n_extra=n_extra,
            use_expr=use_expr,
        )
        return self.with_step(step)

    def rough_cal(  # noqa: PLR0917
        self,
        line_names: list[str | float],
        uncalibrated_col: str = "filtValue",
        calibrated_col: str | None = None,
        use_expr: pl.Expr = field(default_factory=alwaysTrue),
        max_fractional_energy_error_3rd_assignment: float = 0.1,
        min_gain_fraction_at_ph_30k: float = 0.25,
        fwhm_pulse_height_units: float = 75,
        n_extra_peaks: int = 10,
        acceptable_rms_residual_e: float = 10,
    ) -> "Channel":
        step = mass2.core.RoughCalibrationStep.learn_3peak(
            self,
            line_names,
            uncalibrated_col,
            calibrated_col,
            use_expr,
            max_fractional_energy_error_3rd_assignment,
            min_gain_fraction_at_ph_30k,
            fwhm_pulse_height_units,
            n_extra_peaks,
            acceptable_rms_residual_e,
        )
        return self.with_step(step)

    def with_step(self, step) -> "Channel":
        t_start = time.time()
        df2 = step.calc_from_df(self.df)
        elapsed_s = time.time() - t_start
        ch2 = Channel(
            df=df2,
            header=self.header,
            npulses=self.npulses,
            subframediv=self.subframediv,
            noise=self.noise,
            good_expr=step.good_expr,
            df_history=self.df_history + [self.df],
            steps=self.steps.with_step(step),
            steps_elapsed_s=self.steps_elapsed_s + [elapsed_s],
        )
        return ch2

    def with_steps(self, steps) -> "Channel":
        ch2 = self
        for step in steps:
            ch2 = ch2.with_step(step)
        return ch2

    def with_good_expr(self, good_expr, replace=False) -> "Channel":
        # the default value of self.good_expr is pl.lit(True)
        # and_(True) will just add visual noise when looking at good_expr and not affect behavior
        if not replace and good_expr is not True and not good_expr.meta.eq(pl.lit(True)):
            good_expr = good_expr.and_(self.good_expr)
        return Channel(
            df=self.df,
            header=self.header,
            npulses=self.npulses,
            subframediv=self.subframediv,
            noise=self.noise,
            good_expr=good_expr,
            df_history=self.df_history,
            steps=self.steps,
            steps_elapsed_s=self.steps_elapsed_s,
        )

    def with_column_map_step(self, input_col: str, output_col: str, f: Callable) -> "Channel":
        """f should take a numpy array and return a numpy array with the same number of elements"""
        step = mass2.core.recipe.ColumnAsNumpyMapStep([input_col], [output_col], good_expr=self.good_expr, use_expr=pl.lit(True), f=f)
        return self.with_step(step)

    def with_good_expr_pretrig_rms_and_postpeak_deriv(
        self, n_sigma_pretrig_rms=20, n_sigma_postpeak_deriv=20, replace=False
    ) -> "Channel":
        max_postpeak_deriv = misc.outlier_resistant_nsigma_above_mid(
            self.df["postpeak_deriv"].to_numpy(), nsigma=n_sigma_postpeak_deriv
        )
        max_pretrig_rms = misc.outlier_resistant_nsigma_above_mid(self.df["pretrig_rms"].to_numpy(), nsigma=n_sigma_pretrig_rms)
        good_expr = (pl.col("postpeak_deriv") < max_postpeak_deriv).and_(pl.col("pretrig_rms") < max_pretrig_rms)
        return self.with_good_expr(good_expr, replace)

    def with_range_around_median(self, col, range_up, range_down):
        med = np.median(self.df[col].to_numpy())
        return self.with_good_expr(pl.col(col).is_between(med - range_down, med + range_up))

    def with_good_expr_below_nsigma_outlier_resistant(self, col_nsigma_pairs, replace=False, use_prev_good_expr=True) -> "Channel":
        """
        always sets lower limit at 0, don't use for values that can be negative
        """
        if use_prev_good_expr:
            df = self.df.lazy().select(pl.exclude("pulse")).filter(self.good_expr).collect()
        else:
            df = self.df
        for i, (col, nsigma) in enumerate(col_nsigma_pairs):
            max_for_col = misc.outlier_resistant_nsigma_above_mid(df[col].to_numpy(), nsigma=nsigma)
            this_iter_good_expr = pl.col(col).is_between(0, max_for_col)
            if i == 0:
                good_expr = this_iter_good_expr
            else:
                good_expr = good_expr.and_(this_iter_good_expr)
        return self.with_good_expr(good_expr, replace)

    def with_good_expr_nsigma_range_outlier_resistant(self, col_nsigma_pairs, replace=False, use_prev_good_expr=True) -> "Channel":
        """
        always sets lower limit at 0, don't use for values that can be negative
        """
        if use_prev_good_expr:
            df = self.df.lazy().select(pl.exclude("pulse")).filter(self.good_expr).collect()
        else:
            df = self.df
        for i, (col, nsigma) in enumerate(col_nsigma_pairs):
            min_for_col, max_for_col = misc.outlier_resistant_nsigma_range_from_mid(df[col].to_numpy(), nsigma=nsigma)
            this_iter_good_expr = pl.col(col).is_between(min_for_col, max_for_col)
            if i == 0:
                good_expr = this_iter_good_expr
            else:
                good_expr = good_expr.and_(this_iter_good_expr)
        return self.with_good_expr(good_expr, replace)

    @functools.cache
    def typical_peak_ind(self, col="pulse"):
        raw = self.df.limit(100)[col].to_numpy()
        if self.transform_raw is not None:
            raw = self.transform_raw(raw)
        return int(np.median(raw.argmax(axis=1)))

    def summarize_pulses(self, col="pulse", pretrigger_ignore_samples=0, peak_index=None) -> "Channel":
        if peak_index is None:
            peak_index = self.typical_peak_ind(col)
        out_names = mass2.core.pulse_algorithms.result_dtype.names
        # mypy (incorrectly) thinks `out_names` might be None, and `list(None)` is forbidden. Assertion makes it happy again.
        assert out_names is not None
        outputs = list(out_names)
        step = SummarizeStep(
            inputs=[col],
            output=outputs,
            good_expr=self.good_expr,
            use_expr=pl.lit(True),
            frametime_s=self.header.frametime_s,
            peak_index=peak_index,
            pulse_col=col,
            pretrigger_ignore_samples=pretrigger_ignore_samples,
            n_presamples=self.header.n_presamples,
            transform_raw=self.transform_raw,
        )
        return self.with_step(step)

    def correct_pretrig_mean_jumps(self, uncorrected="pretrig_mean", corrected="ptm_jf", period=4096):
        step = mass2.core.recipe.PretrigMeanJumpFixStep(
            inputs=[uncorrected],
            output=[corrected],
            good_expr=self.good_expr,
            use_expr=pl.lit(True),
            period=period,
        )
        return self.with_step(step)

    def with_select_step(self, col_expr_dict: dict[str, pl.Expr]) -> "Channel":
        """
        This step is meant for interactive exploration, it's basically like the df.select() method, but it's saved as a step.
        """
        extract = mass2.misc.extract_column_names_from_polars_expr
        inputs: set[str] = set()
        for expr in col_expr_dict.values():
            inputs.update(extract(expr))
        step = mass2.core.recipe.SelectStep(
            inputs=list(inputs),
            output=list(col_expr_dict.keys()),
            good_expr=self.good_expr,
            use_expr=pl.lit(True),
            col_expr_dict=col_expr_dict,
        )
        return self.with_step(step)

    def with_categorize_step(self, category_condition_dict: dict[str, pl.Expr], output_col="category") -> "Channel":
        # ensure the first condition is True, to be used as a fallback
        first_expr = next(iter(category_condition_dict.values()))
        if not first_expr.meta.eq(pl.lit(True)):
            category_condition_dict = {"fallback": pl.lit(True), **category_condition_dict}
        extract = mass2.misc.extract_column_names_from_polars_expr
        inputs: set[str] = set()
        for expr in category_condition_dict.values():
            inputs.update(extract(expr))
        step = mass2.core.recipe.CategorizeStep(
            inputs=list(inputs),
            output=[output_col],
            good_expr=self.good_expr,
            use_expr=pl.lit(True),
            category_condition_dict=category_condition_dict,
        )
        return self.with_step(step)

    def filter5lag(
        self,
        pulse_col="pulse",
        peak_y_col="5lagy",
        peak_x_col="5lagx",
        f_3db=25e3,
        use_expr=pl.lit(True),
        time_constant_s_of_exp_to_be_orthogonal_to=None,
    ) -> "Channel":
        avg_pulse = (
            self.df.lazy()
            .filter(self.good_expr)
            .filter(use_expr)
            .select(pulse_col)
            .limit(2000)
            .collect()
            .to_series()
            .to_numpy()
            .mean(axis=0)
        )
        avg_pulse -= avg_pulse[: self.header.n_presamples].mean()
        assert self.noise
        spectrum5lag = self.noise.spectrum(trunc_front=2, trunc_back=2)
        filter_maker = FilterMaker(
            signal_model=avg_pulse,
            n_pretrigger=self.header.n_presamples,
            noise_psd=spectrum5lag.psd,
            noise_autocorr=spectrum5lag.autocorr_vec,
            sample_time_sec=self.header.frametime_s,
        )
        if time_constant_s_of_exp_to_be_orthogonal_to is None:
            filter5lag = filter_maker.compute_5lag(f_3db=f_3db)
        else:
            filter5lag = filter_maker.compute_5lag_noexp(f_3db=f_3db, exp_time_seconds=time_constant_s_of_exp_to_be_orthogonal_to)
        step = Filter5LagStep(
            inputs=["pulse"],
            output=[peak_x_col, peak_y_col],
            good_expr=self.good_expr,
            use_expr=use_expr,
            filter=filter5lag,
            spectrum=spectrum5lag,
            filter_maker=filter_maker,
            transform_raw=self.transform_raw,
        )
        return self.with_step(step)

    def good_df(self, cols=pl.all(), use_expr: bool | pl.Expr = True):
        good_df = self.df.lazy().filter(self.good_expr)
        if use_expr is not True:
            good_df = good_df.filter(use_expr)
        return good_df.select(cols).collect()

    def bad_df(self, cols=pl.all(), use_expr: bool | pl.Expr = True):
        bad_df = self.df.lazy().filter(self.good_expr.not_())
        if use_expr is not True:
            bad_df = bad_df.filter(use_expr)
        return bad_df.select(cols).collect()

    def good_serieses(self, cols, use_expr: bool | pl.Expr):
        df2 = self.good_df(cols, use_expr)
        return [df2[col] for col in cols]

    def driftcorrect(
        self,
        indicator_col="pretrig_mean",
        uncorrected_col="5lagy",
        corrected_col=None,
        use_expr=pl.lit(True),
    ) -> "Channel":
        # by defining a seperate learn method that takes ch as an argument,
        # we can move all the code for the step outside of Channel
        step = DriftCorrectStep.learn(
            ch=self,
            indicator_col=indicator_col,
            uncorrected_col=uncorrected_col,
            corrected_col=corrected_col,
            use_expr=use_expr,
        )
        return self.with_step(step)

    def linefit(  # noqa: PLR0917
        self,
        line,
        col,
        use_expr=pl.lit(True),
        has_linear_background=False,
        has_tails=False,
        dlo=50,
        dhi=50,
        binsize=0.5,
        params_update=lmfit.Parameters(),
    ):
        model = mass2.calibration.algorithms.get_model(line, has_linear_background=has_linear_background, has_tails=has_tails)
        pe = model.spect.peak_energy
        _bin_edges = np.arange(pe - dlo, pe + dhi, binsize)
        df_small = self.df.lazy().filter(self.good_expr).filter(use_expr).select(col).collect()
        bin_centers, counts = misc.hist_of_series(df_small[col], _bin_edges)
        params = model.guess(counts, bin_centers=bin_centers, dph_de=1)
        params["dph_de"].set(1.0, vary=False)
        print(f"before update {params=}")
        params = params.update(params_update)
        print(f"after update {params=}")
        result = model.fit(counts, params, bin_centers=bin_centers, minimum_bins_per_fwhm=3)
        result.set_label_hints(
            binsize=bin_centers[1] - bin_centers[0],
            ds_shortname=self.header.description,
            unit_str="eV",
            attr_str=col,
            states_hint=f"{use_expr=}",
            cut_hint="",
        )
        return result

    def step_summary(self):
        return [(type(a).__name__, b) for (a, b) in zip(self.steps, self.steps_elapsed_s)]

    def __hash__(self) -> int:
        # needed to make functools.cache work
        # if self or self.anything is mutated, assumptions will be broken
        # and we may get nonsense results
        return hash(id(self))

    def __eq__(self, other):
        # needed to make functools.cache work
        # if self or self.anything is mutated, assumptions will be broken
        # and we may get nonsense results
        # only checks if the ids match, does not try to be equal if all contents are equal
        return id(self) == id(other)

    @classmethod
    def from_ljh(cls, path, noise_path=None, keep_posix_usec=False, transform_raw: Callable | None = None) -> "Channel":
        if not noise_path:
            noise_channel = None
        else:
            noise_channel = NoiseChannel.from_ljh(noise_path)
        ljh = mass2.LJHFile.open(path)
        df, header_df = ljh.to_polars(keep_posix_usec)
        header = ChannelHeader.from_ljh_header_df(header_df)
        channel = cls(
            df, header=header, npulses=ljh.npulses, subframediv=ljh.subframediv, noise=noise_channel, transform_raw=transform_raw
        )
        return channel

    @classmethod
    def from_off(cls, off) -> "Channel":
        df = pl.from_numpy(off._mmap)
        df = (
            df.select(pl.from_epoch("unixnano", time_unit="ns").dt.cast_time_unit("us").alias("timestamp"))
            .with_columns(df)
            .select(pl.exclude("unixnano"))
        )
        df_header = pl.DataFrame(off.header)
        df_header = df_header.with_columns(pl.Series("Filename", [off.filename]))
        header = ChannelHeader(
            f"{os.path.split(off.filename)[1]}",
            off.header["ChannelNumberMatchingName"],
            off.framePeriodSeconds,
            off._mmap["recordPreSamples"][0],
            off._mmap["recordSamples"][0],
            df_header,
        )
        channel = cls(df, header, off.nRecords, subframediv=off.subframediv)
        return channel

    def with_experiment_state_df(self, df_es, force_timestamp_monotonic=False) -> "Channel":
        if not self.df["timestamp"].is_sorted():
            df = self.df.select(pl.col("timestamp").cum_max().alias("timestamp")).with_columns(self.df.select(pl.exclude("timestamp")))
            # print("WARNING: in with_experiment_state_df, timestamp is not monotonic, forcing it to be")
            # print("This is likely a BUG in DASTARD.")
        else:
            df = self.df
        df2 = df.join_asof(df_es, on="timestamp", strategy="backward")
        return self.with_replacement_df(df2)

    def with_external_trigger_df(self, df_ext: pl.DataFrame) -> "Channel":
        df2 = (
            self.df.with_columns(subframecount=pl.col("framecount") * self.subframediv)
            .join_asof(df_ext, on="subframecount", strategy="backward", coalesce=False, suffix="_prev_ext_trig")
            .join_asof(df_ext, on="subframecount", strategy="forward", coalesce=False, suffix="_next_ext_trig")
        )
        return self.with_replacement_df(df2)

    def with_replacement_df(self, df2) -> "Channel":
        return Channel(
            df=df2,
            header=self.header,
            npulses=self.npulses,
            subframediv=self.subframediv,
            noise=self.noise,
            good_expr=self.good_expr,
            df_history=self.df_history,
            steps=self.steps,
            transform_raw=self.transform_raw,
        )

    def with_columns(self, df2) -> "Channel":
        df3 = self.df.with_columns(df2)
        return self.with_replacement_df(df3)

    def multifit_quadratic_gain_cal(
        self,
        multifit: MultiFit,
        previous_cal_step_index,
        calibrated_col,
        use_expr=pl.lit(True),
    ) -> "Channel":
        step = MultiFitQuadraticGainStep.learn(
            self,
            multifit_spec=multifit,
            previous_cal_step_index=previous_cal_step_index,
            calibrated_col=calibrated_col,
            use_expr=use_expr,
        )
        return self.with_step(step)

    def multifit_mass_cal(
        self,
        multifit: MultiFit,
        previous_cal_step_index,
        calibrated_col,
        use_expr=pl.lit(True),
    ) -> "Channel":
        step = MultiFitMassCalibrationStep.learn(
            self,
            multifit_spec=multifit,
            previous_cal_step_index=previous_cal_step_index,
            calibrated_col=calibrated_col,
            use_expr=use_expr,
        )
        return self.with_step(step)

    def concat_df(self, df: pl.DataFrame) -> "Channel":
        ch2 = Channel(
            mass2.core.misc.concat_dfs_with_concat_state(self.df, df),
            self.header,
            self.npulses,
            subframediv=self.subframediv,
            noise=self.noise,
            good_expr=self.good_expr,
        )
        # we won't copy over df_history and steps. I don't think you should use this when those are filled in?
        return ch2

    def concat_ch(self, ch: "Channel") -> "Channel":
        ch2 = self.concat_df(ch.df)
        return ch2

    def phase_correct_mass_specific_lines(
        self,
        indicator_col,
        uncorrected_col,
        line_names,
        previous_cal_step_index,
        corrected_col=None,
        use_expr=pl.lit(True),
    ) -> "Channel":
        if corrected_col is None:
            corrected_col = uncorrected_col + "_pc"
        step = mass2.core.phase_correct_steps.phase_correct_mass_specific_lines(
            self,
            indicator_col,
            uncorrected_col,
            corrected_col,
            previous_cal_step_index,
            line_names,
            use_expr,
        )
        return self.with_step(step)

    def as_bad(self, error_type, error_msg, backtrace):
        return BadChannel(self, error_type, error_msg, backtrace)

    def save_recipes(self, filename):
        steps = {self.header.ch_num: self.steps[:]}
        misc.pickle_object(steps, filename)
        return steps

    def plot_summaries(self, use_expr_in: pl.Expr | None = None, downsample: int | None = None, log: bool = False) -> None:
        """Plot a summary of the data set, including time series and histograms of key pulse properties.

        Parameters
        ----------
        use_expr_in : pl.Expr | None, optional
            A polars expression to determine valid pulses, by default None. If None, use `self.good_expr`
        downsample : int | None, optional
            Plot only every one of `downsample` pulses in the scatter plots, by default None.
            If None, choose the smallest value so that no more than 10000 points appear
        log : bool, optional
            Whether to make the histograms have a logarithmic y-scale, by default False.
        """
        plt.figure()
        tpi_microsec = (self.typical_peak_ind() - self.header.n_presamples) * (1e6 * self.header.frametime_s)
        plottables = (
            ("pulse_rms", "Pulse RMS", "#dd00ff", None),
            ("pulse_average", "Pulse Avg", "purple", None),
            ("peak_value", "Peak value", "blue", None),
            ("pretrig_rms", "Pretrig RMS", "green", [0, 4000]),
            ("pretrig_mean", "Pretrig Mean", "#00ff26", None),
            ("postpeak_deriv", "Max PostPk deriv", "gold", [0, 200]),
            ("rise_time_µs", "Rise time (µs)", "orange", [-0.3 * tpi_microsec, 2 * tpi_microsec]),
            ("peak_time_µs", "Peak time (µs)", "red", [-0.3 * tpi_microsec, 2 * tpi_microsec]),
        )

        use_expr = self.good_expr if use_expr_in is None else use_expr_in

        if downsample is None:
            downsample = self.npulses // 10000
        downsample = max(downsample, 1)

        df = self.df.lazy().gather_every(downsample)
        df = df.with_columns(
            ((pl.col("peak_index") - self.header.n_presamples) * (1e6 * self.header.frametime_s)).alias("peak_time_µs")
        )
        df = df.with_columns((pl.col("rise_time") * 1e6).alias("rise_time_µs"))
        existing_columns = df.collect_schema().names()
        preserve = [p[0] for p in plottables if p[0] in existing_columns]
        preserve.append("timestamp")
        df2 = df.filter(use_expr).select(preserve).collect()

        # Plot timeseries relative to 0 = the last 00 UT during or before the run.
        timestamp = df2["timestamp"].to_numpy()
        last_midnight = timestamp[-1].astype("datetime64[D]")
        hour_rel = (timestamp - last_midnight).astype(float) / 3600e6

        for i, (column_name, label, color, limits) in enumerate(plottables):
            if column_name not in df2:
                continue
            y = df2[column_name].to_numpy()

            # Time series scatter plots (left-hand panels)
            plt.subplot(len(plottables), 2, 1 + i * 2)
            plt.ylabel(label)
            plt.plot(hour_rel, y, ".", ms=1, color=color)
            if i == len(plottables) - 1:
                plt.xlabel("Time since last UT midnight (hours)")

            # Histogram (right-hand panels)
            plt.subplot(len(plottables), 2, 2 + i * 2)
            contents, _, _ = plt.hist(y, 200, range=limits, log=log, histtype="stepfilled", fc=color, alpha=0.5)
            if log:
                plt.ylim(ymin=contents.min())
        print(f"Plotting {len(y)} out of {self.npulses} data points")

    def fit_pulse(self, index=0, col="pulse", verbose=True):
        pulse = self.df[col][index].to_numpy()
        result = mass2.core.pulse_algorithms.fit_pulse_2exp_with_tail(pulse, npre=self.header.n_presamples, dt=self.header.frametime_s)
        if verbose:
            print(f"ch={self}")
            print(f"pulse index={index}")
            print(result.fit_report())
        return result

plot_hists(col, bin_edges, group_by_col, axis=None, use_good_expr=True, use_expr=pl.lit(True), skip_none=True)

Plots histograms for the given column, grouped by the specified column.

Parameters: - col (str): The column name to plot. - bin_edges (array-like): The edges of the bins for the histogram. - group_by_col (str): The column name to group by. This is required. - axis (matplotlib.Axes, optional): The axis to plot on. If None, a new figure is created.

Source code in mass2/core/channel.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
def plot_hists(
    self,
    col,
    bin_edges,
    group_by_col,
    axis=None,
    use_good_expr=True,
    use_expr=pl.lit(True),
    skip_none=True,
):
    """
    Plots histograms for the given column, grouped by the specified column.

    Parameters:
    - col (str): The column name to plot.
    - bin_edges (array-like): The edges of the bins for the histogram.
    - group_by_col (str): The column name to group by. This is required.
    - axis (matplotlib.Axes, optional): The axis to plot on. If None, a new figure is created.
    """
    if axis is None:
        _, ax = plt.subplots()  # Create a new figure if no axis is provided
    else:
        ax = axis

    if use_good_expr and self.good_expr is not True:
        # True doesn't implement .and_, haven't found a exper literal equivalent that does
        # so we special case True
        filter_expr = self.good_expr.and_(use_expr)
    else:
        filter_expr = use_expr

    # Group by the specified column and filter using good_expr
    df_small = (self.df.lazy().filter(filter_expr).select(col, group_by_col)).collect().sort(group_by_col, descending=False)

    # Plot a histogram for each group
    counts_dict = {}
    for (group_name,), group_data in df_small.group_by(group_by_col, maintain_order=True):
        if group_name is None and skip_none:
            continue
        # Get the data for the column to plot
        values = group_data[col]
        _, step_size = misc.midpoints_and_step_size(bin_edges)
        bin_centers, counts = misc.hist_of_series(values, bin_edges)
        counts_dict[group_name] = counts
        plt.step(bin_centers, counts, where="mid", label=str(group_name))
        # Plot the histogram for the current group
        # if group_name == "EBIT":
        #     ax.hist(values, bins=bin_edges, alpha=0.9, color="k", label=str(group_name))
        # else:
        #     ax.hist(values, bins=bin_edges, alpha=0.5, label=str(group_name))
        # bin_centers, counts = misc.hist_of_series(values, bin_edges)
        # plt.plot(bin_centers, counts, label=group_name)
    # Customize the plot
    ax.set_xlabel(str(col))
    ax.set_ylabel(f"Counts per {step_size:.02f} unit bin")
    ax.set_title(f"Histogram of {col} grouped by {group_by_col}")

    # Add a legend to label the groups
    ax.legend(title=group_by_col)

    plt.tight_layout()
    return bin_centers, counts_dict

plot_summaries(use_expr_in=None, downsample=None, log=False)

Plot a summary of the data set, including time series and histograms of key pulse properties.

Parameters:
  • use_expr_in (Expr | None, default: None ) –

    A polars expression to determine valid pulses, by default None. If None, use self.good_expr

  • downsample (int | None, default: None ) –

    Plot only every one of downsample pulses in the scatter plots, by default None. If None, choose the smallest value so that no more than 10000 points appear

  • log (bool, default: False ) –

    Whether to make the histograms have a logarithmic y-scale, by default False.

Source code in mass2/core/channel.py
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
def plot_summaries(self, use_expr_in: pl.Expr | None = None, downsample: int | None = None, log: bool = False) -> None:
    """Plot a summary of the data set, including time series and histograms of key pulse properties.

    Parameters
    ----------
    use_expr_in : pl.Expr | None, optional
        A polars expression to determine valid pulses, by default None. If None, use `self.good_expr`
    downsample : int | None, optional
        Plot only every one of `downsample` pulses in the scatter plots, by default None.
        If None, choose the smallest value so that no more than 10000 points appear
    log : bool, optional
        Whether to make the histograms have a logarithmic y-scale, by default False.
    """
    plt.figure()
    tpi_microsec = (self.typical_peak_ind() - self.header.n_presamples) * (1e6 * self.header.frametime_s)
    plottables = (
        ("pulse_rms", "Pulse RMS", "#dd00ff", None),
        ("pulse_average", "Pulse Avg", "purple", None),
        ("peak_value", "Peak value", "blue", None),
        ("pretrig_rms", "Pretrig RMS", "green", [0, 4000]),
        ("pretrig_mean", "Pretrig Mean", "#00ff26", None),
        ("postpeak_deriv", "Max PostPk deriv", "gold", [0, 200]),
        ("rise_time_µs", "Rise time (µs)", "orange", [-0.3 * tpi_microsec, 2 * tpi_microsec]),
        ("peak_time_µs", "Peak time (µs)", "red", [-0.3 * tpi_microsec, 2 * tpi_microsec]),
    )

    use_expr = self.good_expr if use_expr_in is None else use_expr_in

    if downsample is None:
        downsample = self.npulses // 10000
    downsample = max(downsample, 1)

    df = self.df.lazy().gather_every(downsample)
    df = df.with_columns(
        ((pl.col("peak_index") - self.header.n_presamples) * (1e6 * self.header.frametime_s)).alias("peak_time_µs")
    )
    df = df.with_columns((pl.col("rise_time") * 1e6).alias("rise_time_µs"))
    existing_columns = df.collect_schema().names()
    preserve = [p[0] for p in plottables if p[0] in existing_columns]
    preserve.append("timestamp")
    df2 = df.filter(use_expr).select(preserve).collect()

    # Plot timeseries relative to 0 = the last 00 UT during or before the run.
    timestamp = df2["timestamp"].to_numpy()
    last_midnight = timestamp[-1].astype("datetime64[D]")
    hour_rel = (timestamp - last_midnight).astype(float) / 3600e6

    for i, (column_name, label, color, limits) in enumerate(plottables):
        if column_name not in df2:
            continue
        y = df2[column_name].to_numpy()

        # Time series scatter plots (left-hand panels)
        plt.subplot(len(plottables), 2, 1 + i * 2)
        plt.ylabel(label)
        plt.plot(hour_rel, y, ".", ms=1, color=color)
        if i == len(plottables) - 1:
            plt.xlabel("Time since last UT midnight (hours)")

        # Histogram (right-hand panels)
        plt.subplot(len(plottables), 2, 2 + i * 2)
        contents, _, _ = plt.hist(y, 200, range=limits, log=log, histtype="stepfilled", fc=color, alpha=0.5)
        if log:
            plt.ylim(ymin=contents.min())
    print(f"Plotting {len(y)} out of {self.npulses} data points")

with_column_map_step(input_col, output_col, f)

f should take a numpy array and return a numpy array with the same number of elements

Source code in mass2/core/channel.py
361
362
363
364
def with_column_map_step(self, input_col: str, output_col: str, f: Callable) -> "Channel":
    """f should take a numpy array and return a numpy array with the same number of elements"""
    step = mass2.core.recipe.ColumnAsNumpyMapStep([input_col], [output_col], good_expr=self.good_expr, use_expr=pl.lit(True), f=f)
    return self.with_step(step)

with_good_expr_below_nsigma_outlier_resistant(col_nsigma_pairs, replace=False, use_prev_good_expr=True)

always sets lower limit at 0, don't use for values that can be negative

Source code in mass2/core/channel.py
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
def with_good_expr_below_nsigma_outlier_resistant(self, col_nsigma_pairs, replace=False, use_prev_good_expr=True) -> "Channel":
    """
    always sets lower limit at 0, don't use for values that can be negative
    """
    if use_prev_good_expr:
        df = self.df.lazy().select(pl.exclude("pulse")).filter(self.good_expr).collect()
    else:
        df = self.df
    for i, (col, nsigma) in enumerate(col_nsigma_pairs):
        max_for_col = misc.outlier_resistant_nsigma_above_mid(df[col].to_numpy(), nsigma=nsigma)
        this_iter_good_expr = pl.col(col).is_between(0, max_for_col)
        if i == 0:
            good_expr = this_iter_good_expr
        else:
            good_expr = good_expr.and_(this_iter_good_expr)
    return self.with_good_expr(good_expr, replace)

with_good_expr_nsigma_range_outlier_resistant(col_nsigma_pairs, replace=False, use_prev_good_expr=True)

always sets lower limit at 0, don't use for values that can be negative

Source code in mass2/core/channel.py
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
def with_good_expr_nsigma_range_outlier_resistant(self, col_nsigma_pairs, replace=False, use_prev_good_expr=True) -> "Channel":
    """
    always sets lower limit at 0, don't use for values that can be negative
    """
    if use_prev_good_expr:
        df = self.df.lazy().select(pl.exclude("pulse")).filter(self.good_expr).collect()
    else:
        df = self.df
    for i, (col, nsigma) in enumerate(col_nsigma_pairs):
        min_for_col, max_for_col = misc.outlier_resistant_nsigma_range_from_mid(df[col].to_numpy(), nsigma=nsigma)
        this_iter_good_expr = pl.col(col).is_between(min_for_col, max_for_col)
        if i == 0:
            good_expr = this_iter_good_expr
        else:
            good_expr = good_expr.and_(this_iter_good_expr)
    return self.with_good_expr(good_expr, replace)

with_select_step(col_expr_dict)

This step is meant for interactive exploration, it's basically like the df.select() method, but it's saved as a step.

Source code in mass2/core/channel.py
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
def with_select_step(self, col_expr_dict: dict[str, pl.Expr]) -> "Channel":
    """
    This step is meant for interactive exploration, it's basically like the df.select() method, but it's saved as a step.
    """
    extract = mass2.misc.extract_column_names_from_polars_expr
    inputs: set[str] = set()
    for expr in col_expr_dict.values():
        inputs.update(extract(expr))
    step = mass2.core.recipe.SelectStep(
        inputs=list(inputs),
        output=list(col_expr_dict.keys()),
        good_expr=self.good_expr,
        use_expr=pl.lit(True),
        col_expr_dict=col_expr_dict,
    )
    return self.with_step(step)

Channels dataclass

Source code in mass2/core/channels.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
@dataclass(frozen=True)  # noqa: PLR0904
class Channels:
    channels: dict[int, Channel]
    description: str
    bad_channels: dict[int, BadChannel] = field(default_factory=dict)

    @property
    def ch0(self):
        assert len(self.channels) > 0, "channels must be non-empty"
        return next(iter(self.channels.values()))

    @functools.cache
    def dfg(self, exclude="pulse"):
        # return a dataframe containing good pulses from each channel,
        # exluding "pulse" by default
        # and including columns "key" (to be removed?) and "ch_num"
        # the more common call should be to wrap this in a convenient plotter
        dfs = []
        for ch_num, channel in self.channels.items():
            df = channel.df.select(pl.exclude(exclude)).filter(channel.good_expr)
            # key_series = pl.Series("key", dtype=pl.Int64).extend_constant(key, len(df))
            assert ch_num == channel.header.ch_num
            ch_series = pl.Series("ch_num", dtype=pl.Int64).extend_constant(channel.header.ch_num, len(df))
            dfs.append(df.with_columns(ch_series))
        return pl.concat(dfs)

    def linefit(  # noqa: PLR0917
        self,
        line,
        col,
        use_expr=pl.lit(True),
        has_linear_background=False,
        has_tails=False,
        dlo=50,
        dhi=50,
        binsize=0.5,
        params_update=lmfit.Parameters(),
    ):
        model = mass2.calibration.algorithms.get_model(line, has_linear_background=has_linear_background, has_tails=has_tails)
        pe = model.spect.peak_energy
        _bin_edges = np.arange(pe - dlo, pe + dhi, binsize)
        df_small = self.dfg().lazy().filter(use_expr).select(col).collect()
        bin_centers, counts = mass2.misc.hist_of_series(df_small[col], _bin_edges)
        params = model.guess(counts, bin_centers=bin_centers, dph_de=1)
        params["dph_de"].set(1.0, vary=False)
        print(f"before update {params=}")
        params = params.update(params_update)
        print(f"after update {params=}")
        result = model.fit(counts, params, bin_centers=bin_centers, minimum_bins_per_fwhm=3)
        result.set_label_hints(
            binsize=bin_centers[1] - bin_centers[0],
            ds_shortname=f"{len(self.channels)} channels, {self.description}",
            unit_str="eV",
            attr_str=col,
            states_hint=f"{use_expr=}",
            cut_hint="",
        )
        return result

    def plot_hist(self, col, bin_edges, use_expr=pl.lit(True), axis=None):
        df_small = self.dfg().lazy().filter(use_expr).select(col).collect()
        ax = mass2.misc.plot_hist_of_series(df_small[col], bin_edges, axis)
        ax.set_title(f"{len(self.channels)} channels, {self.description}")

    def plot_hists(self, col, bin_edges, group_by_col, axis=None, use_expr=None, skip_none=True):
        """
        Plots histograms for the given column, grouped by the specified column.

        Parameters:
        - col (str): The column name to plot.
        - bin_edges (array-like): The edges of the bins for the histogram.
        - group_by_col (str): The column name to group by. This is required.
        - axis (matplotlib.Axes, optional): The axis to plot on. If None, a new figure is created.
        """
        if axis is None:
            _, ax = plt.subplots()  # Create a new figure if no axis is provided
        else:
            ax = axis

        if use_expr is None:
            df_small = (self.dfg().lazy().select(col, group_by_col)).collect().sort(group_by_col, descending=False)
        else:
            df_small = (self.dfg().lazy().filter(use_expr).select(col, group_by_col)).collect().sort(group_by_col, descending=False)

        # Plot a histogram for each group
        for (group_name,), group_data in df_small.group_by(group_by_col, maintain_order=True):
            if group_name is None and skip_none:
                continue
            # Get the data for the column to plot
            values = group_data[col]
            # Plot the histogram for the current group
            if group_name == "EBIT":
                ax.hist(values, bins=bin_edges, alpha=0.9, color="k", label=str(group_name))
            else:
                ax.hist(values, bins=bin_edges, alpha=0.5, label=str(group_name))
            # bin_centers, counts = mass2.misc.hist_of_series(values, bin_edges)
            # plt.plot(bin_centers, counts, label=group_name)

        # Customize the plot
        ax.set_xlabel(str(col))
        ax.set_ylabel("Frequency")
        ax.set_title(f"Coadded Histogram of {col} grouped by {group_by_col}")

        # Add a legend to label the groups
        ax.legend(title=group_by_col)

        plt.tight_layout()

    def map(self, f: Callable, allow_throw: bool = False) -> "Channels":
        new_channels = {}
        new_bad_channels = {}
        for key, channel in self.channels.items():
            try:
                new_channels[key] = f(channel)
            except KeyboardInterrupt:
                raise
            except Exception as ex:
                error_type: type = type(ex)
                error_message: str = str(ex)
                backtrace: str = traceback.format_exc()
                if allow_throw:
                    raise
                print(f"{key=} {channel=} failed this step")
                print(f"{error_type=}")
                print(f"{error_message=}")
                new_bad_channels[key] = channel.as_bad(error_type, error_message, backtrace)
        new_bad_channels = mass2.misc.merge_dicts_ordered_by_keys(self.bad_channels, new_bad_channels)

        return Channels(new_channels, self.description, bad_channels=new_bad_channels)

    def set_bad(self, ch_num, msg, require_ch_num_exists=True):
        new_channels = {}
        new_bad_channels = {}
        if require_ch_num_exists:
            assert ch_num in self.channels.keys(), f"{ch_num} can't be set bad because it does not exist"
        for key, channel in self.channels.items():
            if key == ch_num:
                new_bad_channels[key] = channel.as_bad(None, msg, None)
            else:
                new_channels[key] = channel
        return Channels(new_channels, self.description, bad_channels=new_bad_channels)

    def linefit_joblib(self, line, col, prefer="threads", n_jobs=4):
        def work(key):
            channel = self.channels[key]
            return channel.linefit(line, col)

        parallel = joblib.Parallel(n_jobs=n_jobs, prefer=prefer)  # its not clear if threads are better.... what blocks the gil?
        results = parallel(joblib.delayed(work)(key) for key in self.channels.keys())
        return results

    def __hash__(self):
        # needed to make functools.cache work
        # if self or self.anything is mutated, assumptions will be broken
        # and we may get nonsense results
        return hash(id(self))

    def __eq__(self, other):
        return id(self) == id(other)

    @classmethod
    def from_ljh_path_pairs(cls, pulse_noise_pairs: List[Tuple[str, str]], description: str):
        """
        Create a :class:`Channels` instance from pairs of LJH files.

        Args:
            pulse_noise_pairs (List[Tuple[str, str]]):
                A list of `(pulse_path, noise_path)` tuples, where each entry contains
                the file path to a pulse LJH file and its corresponding noise LJH file.
            description (str):
                A human-readable description for the resulting Channels object.

        Returns:
            Channels:
                A Channels object with one :class:`Channel` per `(pulse_path, noise_path)` pair.

        Raises:
            AssertionError:
                If two input files correspond to the same channel number.

        Notes:
            Each channel is created via :meth:`Channel.from_ljh`.
            The channel number is taken from the LJH file header and used as the key
            in the returned Channels mapping.

        Examples:
            >>> pairs = [
            ...     ("datadir/run0000_ch0000.ljh", "datadir/run0001_ch0000.ljh"),
            ...     ("datadir/run0000_ch0001.ljh", "datadir/run0001_ch0001.ljh"),
            ... ]
            >>> channels = Channels.from_ljh_path_pairs(pairs, description="Test run")
            >>> list(channels.keys())
            [0, 1]
        """
        channels = {}
        for pulse_path, noise_path in pulse_noise_pairs:
            channel = Channel.from_ljh(pulse_path, noise_path)
            assert channel.header.ch_num not in channels.keys()
            channels[channel.header.ch_num] = channel
        return cls(channels, description)

    @classmethod
    def from_off_paths(cls, off_paths, description):
        channels = {}
        for path in off_paths:
            ch = Channel.from_off(mass2.core.OffFile(path))
            channels[ch.header.ch_num] = ch
        return cls(channels, description)

    @classmethod
    def from_ljh_folder(
        cls,
        pulse_folder: str,
        noise_folder: str | None = None,
        limit: int | None = None,
        exclude_ch_nums: list[int] | None = None,
    ):
        assert os.path.isdir(pulse_folder), f"{pulse_folder=} {noise_folder=}"
        if exclude_ch_nums is None:
            exclude_ch_nums = []
        if noise_folder is None:
            paths = ljhutil.find_ljh_files(pulse_folder, exclude_ch_nums=exclude_ch_nums)
            if limit is not None:
                paths = paths[:limit]
            pairs = [(path, "") for path in paths]
        else:
            assert os.path.isdir(noise_folder), f"{pulse_folder=} {noise_folder=}"
            pairs = ljhutil.match_files_by_channel(pulse_folder, noise_folder, limit=limit, exclude_ch_nums=exclude_ch_nums)
        description = f"from_ljh_folder {pulse_folder=} {noise_folder=}"
        print(f"{description}")
        print(f"   from_ljh_folder has {len(pairs)} pairs")
        data = cls.from_ljh_path_pairs(pairs, description)
        print(f"   and the Channels obj has {len(data.channels)} pairs")
        return data

    def get_an_ljh_path(self):
        return pathlib.Path(self.ch0.header.df["Filename"][0])

    def get_path_in_output_folder(self, filename):
        ljh_path = self.get_an_ljh_path()
        base_name, post_chan = ljh_path.name.split("_chan")
        date, run_num = base_name.split("_run")  # timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        output_dir = ljh_path.parent.parent / f"{run_num}moss_output"
        output_dir.mkdir(parents=True, exist_ok=True)
        return output_dir / filename

    def get_experiment_state_df(self, experiment_state_path=None):
        if experiment_state_path is None:
            ljh_path = self.get_an_ljh_path()
            experiment_state_path = ljhutil.experiment_state_path_from_ljh_path(ljh_path)
        df = pl.read_csv(experiment_state_path, new_columns=["unixnano", "state_label"])
        # _col0, _col1 = df.columns
        df_es = df.select(pl.from_epoch("unixnano", time_unit="ns").dt.cast_time_unit("us").alias("timestamp"))
        # strip whitespace from state_label column
        sl_series = df.select(pl.col("state_label").str.strip_chars()).to_series()
        df_es = df_es.with_columns(state_label=pl.Series(values=sl_series, dtype=pl.Categorical))
        return df_es

    def with_experiment_state_by_path(self, experiment_state_path=None):
        df_es = self.get_experiment_state_df(experiment_state_path)
        return self.with_experiment_state_df(df_es)

    def with_external_trigger_by_path(self, path: str | None = None) -> "Channels":
        if path is None:
            raise NotImplementedError("cannot infer external trigger path yet")
        with open(path, "rb") as _f:
            _header_line = _f.readline()  # read the one header line before opening the binary data
            external_trigger_subframe_count = np.fromfile(_f, "int64")
        df_ext = pl.DataFrame({
            "subframecount": external_trigger_subframe_count,
        })
        return self.with_external_trigger_df(df_ext)

    def with_external_trigger_df(self, df_ext: pl.DataFrame) -> "Channels":
        def with_etrig_df(channel: Channel):
            return channel.with_external_trigger_df(df_ext)

        return self.map(with_etrig_df)

    def with_experiment_state_df(self, df_es):
        # this is not as performant as making use_exprs for states
        # and using .set_sorted on the timestamp column
        ch2s = {}
        for ch_num, ch in self.channels.items():
            ch2s[ch_num] = ch.with_experiment_state_df(df_es)
        return Channels(ch2s, self.description)

    def with_steps_dict(self, steps_dict):
        def load_recipes(channel):
            try:
                steps = steps_dict[channel.header.ch_num]
            except KeyError:
                raise Exception("steps dict did not contain steps for this ch_num")
            return channel.with_steps(steps)

        return self.map(load_recipes)

    def save_recipes(self, filename, required_fields: str | Iterable[str] | None = None, drop_debug=True) -> dict[int, Recipe]:
        """Pickle a dictionary (one entry per channel) of Recipe objects.

        If you want to save a "recipe", a minimal series of steps required to reproduce the required field(s),
        then set `required_fields` to be a list/tuple/set of DataFrame column names (or a single column name)
        whose production from raw data should be possible.

        Parameters
        ----------
        filename : str
            Filename to store recipe in, typically of the form "*.pkl"
        required_fields : str | Iterable[str] | None
            The field (str) or fields (Iterable[str]) that the recipe should be able to generate from a raw LJH file.
            Drop all steps that do not lead (directly or indireactly) to producing this field or these fields.
            If None, then preserve all steps (default None).
        drop_debug: bool
            Whether to remove debugging-related data from each `RecipeStep`, if the subclass supports this (via the
            `RecipeStep.drop_debug() method).

        Returns
        -------
        dict
            Dictionary with keys=channel numbers, values=the (possibly trimmed and debug-dropped) Recipe objects.
        """
        steps = {}
        for channum, ch in self.channels.items():
            steps[channum] = ch.steps.trim_dead_ends(required_fields=required_fields, drop_debug=drop_debug)
        mass2.misc.pickle_object(steps, filename)
        return steps

    def load_recipes(self, filename):
        steps = mass2.misc.unpickle_object(filename)
        return self.with_steps_dict(steps)

    def parent_folder_path(self):
        parent_folder_path = pathlib.Path(self.ch0.header.df["Filename"][0]).parent.parent
        print(f"{parent_folder_path=}")
        return parent_folder_path

    def concat_data(self, other_data):
        # sorting here to show intention, but I think set is sorted by insertion order as
        # an implementation detail so this may not do anything
        ch_nums = sorted(list(set(self.channels.keys()).intersection(other_data.channels.keys())))
        new_channels = {}
        for ch_num in ch_nums:
            ch = self.channels[ch_num]
            other_ch = other_data.channels[ch_num]
            combined_df = mass2.core.misc.concat_dfs_with_concat_state(ch.df, other_ch.df)
            new_ch = ch.with_replacement_df(combined_df)
            new_channels[ch_num] = new_ch
        return mass2.Channels(new_channels, self.description + other_data.description)

    @classmethod
    def from_df(
        cls,
        df_in,
        frametime_s=np.nan,
        n_presamples=None,
        n_samples=None,
        description="from Channels.channels_from_df",
    ):
        # requres a column named "ch_num" containing the channel number
        keys_df = df_in.partition_by(by=["ch_num"], as_dict=True)
        dfs = {keys[0]: df for (keys, df) in keys_df.items()}
        channels = {}
        for ch_num, df in dfs.items():
            channels[ch_num] = Channel(
                df,
                header=ChannelHeader(
                    description="from df",
                    ch_num=ch_num,
                    frametime_s=frametime_s,
                    n_presamples=n_presamples,
                    n_samples=n_samples,
                    df=df,
                ),
                npulses=len(df),
            )
        return Channels(channels, description)

from_ljh_path_pairs(pulse_noise_pairs, description) classmethod

Create a :class:Channels instance from pairs of LJH files.

Args: pulse_noise_pairs (List[Tuple[str, str]]): A list of (pulse_path, noise_path) tuples, where each entry contains the file path to a pulse LJH file and its corresponding noise LJH file. description (str): A human-readable description for the resulting Channels object.

Returns: Channels: A Channels object with one :class:Channel per (pulse_path, noise_path) pair.

Raises: AssertionError: If two input files correspond to the same channel number.

Notes: Each channel is created via :meth:Channel.from_ljh. The channel number is taken from the LJH file header and used as the key in the returned Channels mapping.

Examples: >>> pairs = [ ... ("datadir/run0000_ch0000.ljh", "datadir/run0001_ch0000.ljh"), ... ("datadir/run0000_ch0001.ljh", "datadir/run0001_ch0001.ljh"), ... ] >>> channels = Channels.from_ljh_path_pairs(pairs, description="Test run") >>> list(channels.keys()) [0, 1]

Source code in mass2/core/channels.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
@classmethod
def from_ljh_path_pairs(cls, pulse_noise_pairs: List[Tuple[str, str]], description: str):
    """
    Create a :class:`Channels` instance from pairs of LJH files.

    Args:
        pulse_noise_pairs (List[Tuple[str, str]]):
            A list of `(pulse_path, noise_path)` tuples, where each entry contains
            the file path to a pulse LJH file and its corresponding noise LJH file.
        description (str):
            A human-readable description for the resulting Channels object.

    Returns:
        Channels:
            A Channels object with one :class:`Channel` per `(pulse_path, noise_path)` pair.

    Raises:
        AssertionError:
            If two input files correspond to the same channel number.

    Notes:
        Each channel is created via :meth:`Channel.from_ljh`.
        The channel number is taken from the LJH file header and used as the key
        in the returned Channels mapping.

    Examples:
        >>> pairs = [
        ...     ("datadir/run0000_ch0000.ljh", "datadir/run0001_ch0000.ljh"),
        ...     ("datadir/run0000_ch0001.ljh", "datadir/run0001_ch0001.ljh"),
        ... ]
        >>> channels = Channels.from_ljh_path_pairs(pairs, description="Test run")
        >>> list(channels.keys())
        [0, 1]
    """
    channels = {}
    for pulse_path, noise_path in pulse_noise_pairs:
        channel = Channel.from_ljh(pulse_path, noise_path)
        assert channel.header.ch_num not in channels.keys()
        channels[channel.header.ch_num] = channel
    return cls(channels, description)

plot_hists(col, bin_edges, group_by_col, axis=None, use_expr=None, skip_none=True)

Plots histograms for the given column, grouped by the specified column.

Parameters: - col (str): The column name to plot. - bin_edges (array-like): The edges of the bins for the histogram. - group_by_col (str): The column name to group by. This is required. - axis (matplotlib.Axes, optional): The axis to plot on. If None, a new figure is created.

Source code in mass2/core/channels.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def plot_hists(self, col, bin_edges, group_by_col, axis=None, use_expr=None, skip_none=True):
    """
    Plots histograms for the given column, grouped by the specified column.

    Parameters:
    - col (str): The column name to plot.
    - bin_edges (array-like): The edges of the bins for the histogram.
    - group_by_col (str): The column name to group by. This is required.
    - axis (matplotlib.Axes, optional): The axis to plot on. If None, a new figure is created.
    """
    if axis is None:
        _, ax = plt.subplots()  # Create a new figure if no axis is provided
    else:
        ax = axis

    if use_expr is None:
        df_small = (self.dfg().lazy().select(col, group_by_col)).collect().sort(group_by_col, descending=False)
    else:
        df_small = (self.dfg().lazy().filter(use_expr).select(col, group_by_col)).collect().sort(group_by_col, descending=False)

    # Plot a histogram for each group
    for (group_name,), group_data in df_small.group_by(group_by_col, maintain_order=True):
        if group_name is None and skip_none:
            continue
        # Get the data for the column to plot
        values = group_data[col]
        # Plot the histogram for the current group
        if group_name == "EBIT":
            ax.hist(values, bins=bin_edges, alpha=0.9, color="k", label=str(group_name))
        else:
            ax.hist(values, bins=bin_edges, alpha=0.5, label=str(group_name))
        # bin_centers, counts = mass2.misc.hist_of_series(values, bin_edges)
        # plt.plot(bin_centers, counts, label=group_name)

    # Customize the plot
    ax.set_xlabel(str(col))
    ax.set_ylabel("Frequency")
    ax.set_title(f"Coadded Histogram of {col} grouped by {group_by_col}")

    # Add a legend to label the groups
    ax.legend(title=group_by_col)

    plt.tight_layout()

save_recipes(filename, required_fields=None, drop_debug=True)

Pickle a dictionary (one entry per channel) of Recipe objects.

If you want to save a "recipe", a minimal series of steps required to reproduce the required field(s), then set required_fields to be a list/tuple/set of DataFrame column names (or a single column name) whose production from raw data should be possible.

Parameters:
  • filename (str) –

    Filename to store recipe in, typically of the form "*.pkl"

  • required_fields (str | Iterable[str] | None, default: None ) –

    The field (str) or fields (Iterable[str]) that the recipe should be able to generate from a raw LJH file. Drop all steps that do not lead (directly or indireactly) to producing this field or these fields. If None, then preserve all steps (default None).

  • drop_debug

    Whether to remove debugging-related data from each RecipeStep, if the subclass supports this (via the `RecipeStep.drop_debug() method).

Returns:
  • dict

    Dictionary with keys=channel numbers, values=the (possibly trimmed and debug-dropped) Recipe objects.

Source code in mass2/core/channels.py
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
def save_recipes(self, filename, required_fields: str | Iterable[str] | None = None, drop_debug=True) -> dict[int, Recipe]:
    """Pickle a dictionary (one entry per channel) of Recipe objects.

    If you want to save a "recipe", a minimal series of steps required to reproduce the required field(s),
    then set `required_fields` to be a list/tuple/set of DataFrame column names (or a single column name)
    whose production from raw data should be possible.

    Parameters
    ----------
    filename : str
        Filename to store recipe in, typically of the form "*.pkl"
    required_fields : str | Iterable[str] | None
        The field (str) or fields (Iterable[str]) that the recipe should be able to generate from a raw LJH file.
        Drop all steps that do not lead (directly or indireactly) to producing this field or these fields.
        If None, then preserve all steps (default None).
    drop_debug: bool
        Whether to remove debugging-related data from each `RecipeStep`, if the subclass supports this (via the
        `RecipeStep.drop_debug() method).

    Returns
    -------
    dict
        Dictionary with keys=channel numbers, values=the (possibly trimmed and debug-dropped) Recipe objects.
    """
    steps = {}
    for channum, ch in self.channels.items():
        steps[channum] = ch.steps.trim_dead_ends(required_fields=required_fields, drop_debug=drop_debug)
    mass2.misc.pickle_object(steps, filename)
    return steps

mass2.core.analysis_algorithms - main algorithms used in data analysis

Designed to abstract certain key algorithms out of the class MicrocalDataSet and be able to run them fast.

Created on Jun 9, 2014

@author: fowlerj

HistogramSmoother

Object that can repeatedly smooth histograms with the same bin count and width to the same Gaussian width. By pre-computing the smoothing kernel for that histogram, we can smooth multiple histograms with the same geometry.

Source code in mass2/core/analysis_algorithms.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
class HistogramSmoother:
    """Object that can repeatedly smooth histograms with the same bin count and
    width to the same Gaussian width.  By pre-computing the smoothing kernel for
    that histogram, we can smooth multiple histograms with the same geometry.
    """

    def __init__(self, smooth_sigma, limits):
        """Give the smoothing Gaussian's width as <smooth_sigma> and the
        [lower,upper] histogram limits as <limits>."""

        self.limits = np.asarray(limits, dtype=float)
        self.smooth_sigma = smooth_sigma

        # Choose a reasonable # of bins, at least 1024 and a power of 2
        stepsize = 0.4 * smooth_sigma
        dlimits = self.limits[1] - self.limits[0]
        nbins_guess = int(dlimits / stepsize + 0.5)
        min_nbins = 1024
        max_nbins = 32768  # 32k bins, 2**15

        # Clamp nbins_guess to at least min_nbins
        clamped_nbins = np.clip(nbins_guess, min_nbins, max_nbins)
        nbins_forced_to_power_of_2 = int(2 ** np.ceil(np.log2(clamped_nbins)))
        if nbins_forced_to_power_of_2 == max_nbins:
            print(f"Warning: HistogramSmoother (for drift correct) Limiting histogram bins to {max_nbins} (requested {nbins_guess})")
        self.nbins = nbins_forced_to_power_of_2
        self.stepsize = dlimits / self.nbins

        # Compute the Fourier-space smoothing kernel
        kernel = np.exp(-0.5 * (np.arange(self.nbins) * self.stepsize / self.smooth_sigma) ** 2)
        kernel[1:] += kernel[-1:0:-1]  # Handle the negative frequencies
        kernel /= kernel.sum()
        self.kernel_ft = np.fft.rfft(kernel)

    def __call__(self, values):
        """Return a smoothed histogram of the data vector <values>"""
        contents, _ = np.histogram(values, self.nbins, self.limits)
        ftc = np.fft.rfft(contents)
        csmooth = np.fft.irfft(self.kernel_ft * ftc)
        csmooth[csmooth < 0] = 0
        return csmooth

__call__(values)

Return a smoothed histogram of the data vector

Source code in mass2/core/analysis_algorithms.py
210
211
212
213
214
215
216
def __call__(self, values):
    """Return a smoothed histogram of the data vector <values>"""
    contents, _ = np.histogram(values, self.nbins, self.limits)
    ftc = np.fft.rfft(contents)
    csmooth = np.fft.irfft(self.kernel_ft * ftc)
    csmooth[csmooth < 0] = 0
    return csmooth

__init__(smooth_sigma, limits)

Give the smoothing Gaussian's width as and the [lower,upper] histogram limits as .

Source code in mass2/core/analysis_algorithms.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
def __init__(self, smooth_sigma, limits):
    """Give the smoothing Gaussian's width as <smooth_sigma> and the
    [lower,upper] histogram limits as <limits>."""

    self.limits = np.asarray(limits, dtype=float)
    self.smooth_sigma = smooth_sigma

    # Choose a reasonable # of bins, at least 1024 and a power of 2
    stepsize = 0.4 * smooth_sigma
    dlimits = self.limits[1] - self.limits[0]
    nbins_guess = int(dlimits / stepsize + 0.5)
    min_nbins = 1024
    max_nbins = 32768  # 32k bins, 2**15

    # Clamp nbins_guess to at least min_nbins
    clamped_nbins = np.clip(nbins_guess, min_nbins, max_nbins)
    nbins_forced_to_power_of_2 = int(2 ** np.ceil(np.log2(clamped_nbins)))
    if nbins_forced_to_power_of_2 == max_nbins:
        print(f"Warning: HistogramSmoother (for drift correct) Limiting histogram bins to {max_nbins} (requested {nbins_guess})")
    self.nbins = nbins_forced_to_power_of_2
    self.stepsize = dlimits / self.nbins

    # Compute the Fourier-space smoothing kernel
    kernel = np.exp(-0.5 * (np.arange(self.nbins) * self.stepsize / self.smooth_sigma) ** 2)
    kernel[1:] += kernel[-1:0:-1]  # Handle the negative frequencies
    kernel /= kernel.sum()
    self.kernel_ft = np.fft.rfft(kernel)

compute_max_deriv(pulse_data, ignore_leading, spike_reject=True, kernel=None)

Computes the maximum derivative in timeseries . can be a 2D array where each row is a different pulse record, in which case the return value will be an array last long as the number of rows in .

Args: pulse_data: ignore_leading: spike_reject: (default True) kernel: the linear filter against which the signals will be convolved (CONVOLED, not correlated, so reverse the filter as needed). If None, then the default kernel of [+.2 +.1 0 -.1 -.2] will be used. If "SG", then the cubic 5-point Savitzky-Golay filter will be used (see below). Otherwise, kernel needs to be a (short) array which will be converted to a 1xN 2-dimensional np.ndarray. (default None)

Returns: An np.ndarray, dimension 1: the value of the maximum derivative (units of per sample).

When kernel=="SG", then we estimate the derivative by Savitzky-Golay filtering (with 1 point before/3 points after the point in question and fitting polynomial of order 3). Find the right general area by first doing a simple difference.

Source code in mass2/core/analysis_algorithms.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def compute_max_deriv(pulse_data, ignore_leading, spike_reject=True, kernel=None):
    """Computes the maximum derivative in timeseries <pulse_data>.
    <pulse_data> can be a 2D array where each row is a different pulse record, in which case
    the return value will be an array last long as the number of rows in <pulse_data>.

    Args:
        pulse_data:
        ignore_leading:
        spike_reject: (default True)
        kernel: the linear filter against which the signals will be convolved
            (CONVOLED, not correlated, so reverse the filter as needed). If None,
            then the default kernel of [+.2 +.1 0 -.1 -.2] will be used. If
            "SG", then the cubic 5-point Savitzky-Golay filter will be used (see
            below). Otherwise, kernel needs to be a (short) array which will
            be converted to a 1xN 2-dimensional np.ndarray. (default None)

    Returns:
        An np.ndarray, dimension 1: the value of the maximum derivative (units of <pulse_data units> per sample).

    When kernel=="SG", then we estimate the derivative by Savitzky-Golay filtering
    (with 1 point before/3 points after the point in question and fitting polynomial
    of order 3).  Find the right general area by first doing a simple difference.
    """

    # If pulse_data is a 1D array, turn it into 2
    pulse_data = np.asarray(pulse_data)
    ndim = len(pulse_data.shape)
    if ndim > 2 or ndim < 1:
        raise ValueError("input pulse_data should be a 1d or 2d array.")
    if ndim == 1:
        pulse_data.shape = (1, pulse_data.shape[0])
    pulse_view = pulse_data[:, ignore_leading:]
    NPulse = pulse_view.shape[0]
    NSamp = pulse_view.shape[1]

    # The default filter:
    filter_coef = np.array([+0.2, +0.1, 0, -0.1, -0.2])
    if kernel == "SG":
        # This filter is the Savitzky-Golay filter of n_L=1, n_R=3 and M=3, to use the
        # language of Numerical Recipes 3rd edition.  It amounts to least-squares fitting
        # of an M=3rd order polynomial to the five points [-1,+3] and
        # finding the slope of the polynomial at 0.
        # Note that we reverse the order of coefficients because convolution will re-reverse
        filter_coef = np.array([-0.45238, -0.02381, 0.28571, 0.30952, -0.11905])[::-1]

    elif kernel is not None:
        filter_coef = np.array(kernel).ravel()

    f0, f1, f2, f3, f4 = filter_coef

    max_deriv = np.zeros(NPulse, dtype=np.float64)

    if spike_reject:
        for i in range(NPulse):
            pulses = pulse_view[i]
            t0 = f4 * pulses[0] + f3 * pulses[1] + f2 * pulses[2] + f1 * pulses[3] + f0 * pulses[4]
            t1 = f4 * pulses[1] + f3 * pulses[2] + f2 * pulses[3] + f1 * pulses[4] + f0 * pulses[5]
            t2 = f4 * pulses[2] + f3 * pulses[3] + f2 * pulses[4] + f1 * pulses[5] + f0 * pulses[6]
            t_max_deriv = t2 if t2 < t0 else t0

            for j in range(7, NSamp):
                t3 = f4 * pulses[j - 4] + f3 * pulses[j - 3] + f2 * pulses[j - 2] + f1 * pulses[j - 1] + f0 * pulses[j]
                t4 = t3 if t3 < t1 else t1
                t_max_deriv = max(t4, t_max_deriv)

                t0, t1, t2 = t1, t2, t3

            max_deriv[i] = t_max_deriv
    else:
        for i in range(NPulse):
            pulses = pulse_view[i]
            t0 = f4 * pulses[0] + f3 * pulses[1] + f2 * pulses[2] + f1 * pulses[3] + f0 * pulses[4]
            t_max_deriv = t0

            for j in range(5, NSamp):
                t0 = f4 * pulses[j - 4] + f3 * pulses[j - 3] + f2 * pulses[j - 2] + f1 * pulses[j - 1] + f0 * pulses[j]
                t_max_deriv = max(t0, t_max_deriv)
            max_deriv[i] = t_max_deriv

    return np.asarray(max_deriv, dtype=np.float32)

correct_flux_jumps(vals, mask, flux_quant)

Remove 'flux' jumps' from pretrigger mean.

When using umux readout, if a pulse is recorded that has a very fast rising edge (e.g. a cosmic ray), the readout system will "slip" an integer number of flux quanta. This means that the baseline level returned to after the pulse will different from the pretrigger value by an integer number of flux quanta. This causes that pretrigger mean summary quantity to jump around in a way that causes trouble for the rest of MASS. This function attempts to correct these jumps.

Arguments: vals -- array of values to correct mask -- mask indentifying "good" pulses flux_quant -- size of 1 flux quanta

Returns: Array with values corrected

Source code in mass2/core/analysis_algorithms.py
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
def correct_flux_jumps(vals, mask, flux_quant):
    """Remove 'flux' jumps' from pretrigger mean.

    When using umux readout, if a pulse is recorded that has a very fast rising
    edge (e.g. a cosmic ray), the readout system will "slip" an integer number
    of flux quanta. This means that the baseline level returned to after the
    pulse will different from the pretrigger value by an integer number of flux
    quanta. This causes that pretrigger mean summary quantity to jump around in
    a way that causes trouble for the rest of MASS. This function attempts to
    correct these jumps.

    Arguments:
    vals -- array of values to correct
    mask -- mask indentifying "good" pulses
    flux_quant -- size of 1 flux quanta

    Returns:
    Array with values corrected
    """
    return unwrap_n(vals, flux_quant, mask)

correct_flux_jumps_original(vals, mask, flux_quant)

Remove 'flux' jumps' from pretrigger mean.

When using umux readout, if a pulse is recorded that has a very fast rising edge (e.g. a cosmic ray), the readout system will "slip" an integer number of flux quanta. This means that the baseline level returned to after the pulse will different from the pretrigger value by an integer number of flux quanta. This causes that pretrigger mean summary quantity to jump around in a way that causes trouble for the rest of MASS. This function attempts to correct these jumps.

Arguments: vals -- array of values to correct mask -- mask indentifying "good" pulses flux_quant -- size of 1 flux quanta

Returns: Array with values corrected

Source code in mass2/core/analysis_algorithms.py
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
def correct_flux_jumps_original(vals, mask, flux_quant):
    """Remove 'flux' jumps' from pretrigger mean.

    When using umux readout, if a pulse is recorded that has a very fast rising
    edge (e.g. a cosmic ray), the readout system will "slip" an integer number
    of flux quanta. This means that the baseline level returned to after the
    pulse will different from the pretrigger value by an integer number of flux
    quanta. This causes that pretrigger mean summary quantity to jump around in
    a way that causes trouble for the rest of MASS. This function attempts to
    correct these jumps.

    Arguments:
    vals -- array of values to correct
    mask -- mask indentifying "good" pulses
    flux_quant -- size of 1 flux quanta

    Returns:
    Array with values corrected
    """
    # The naive thing is to simply replace each value with its value mod
    # the flux quantum. But of the baseline value turns out to fluctuate
    # about an integer number of flux quanta, this will introduce new
    # jumps. I don't know the best way to handle this in general. For now,
    # if there are still jumps after the mod, I add 1/4 of a flux quanta
    # before modding, then mod, then subtract the 1/4 flux quantum and then
    # *add* a single flux quantum so that the values never go negative.
    #
    # To determine whether there are "still jumps after the mod" I look at the
    # difference between the largest and smallest values for "good" pulses. If
    # you don't exclude "bad" pulses, this check can be tricked in cases where
    # the pretrigger section contains a (sufficiently large) tail.
    if (np.amax(vals) - np.amin(vals)) >= flux_quant:
        corrected = vals % (flux_quant)
        if (np.amax(corrected[mask]) - np.amin(corrected[mask])) > 0.75 * flux_quant:
            corrected = (vals + flux_quant / 4) % (flux_quant)
            corrected = corrected - flux_quant / 4 + flux_quant
        corrected -= corrected[0] - vals[0]
        return corrected
    else:
        return vals

drift_correct(indicator, uncorrected, limit=None)

Compute a drift correction that minimizes the spectral entropy.

Args: indicator: The "x-axis", which indicates the size of the correction. uncorrected: A filtered pulse height vector. Same length as indicator. Assumed to have some gain that is linearly related to indicator. limit: The upper limit of uncorrected values over which entropy is computed (default None).

Generally indicator will be the pretrigger mean of the pulses, but you can experiment with other choices.

The entropy will be computed on corrected values only in the range [0, limit], so limit should be set to a characteristic large value of uncorrected. If limit is None (the default), then in will be compute as 25% larger than the 99%ile point of uncorrected.

The model is that the filtered pulse height PH should be scaled by (1 + a*PTM) where a is an arbitrary parameter computed here, and PTM is the difference between each record's pretrigger mean and the median value of all pretrigger means. (Or replace "pretrigger mean" with whatever quantity you passed in as .)

Source code in mass2/core/analysis_algorithms.py
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
def drift_correct(indicator, uncorrected, limit=None):
    """Compute a drift correction that minimizes the spectral entropy.

    Args:
        indicator: The "x-axis", which indicates the size of the correction.
        uncorrected: A filtered pulse height vector. Same length as indicator.
            Assumed to have some gain that is linearly related to indicator.
        limit: The upper limit of uncorrected values over which entropy is
            computed (default None).

    Generally indicator will be the pretrigger mean of the pulses, but you can
    experiment with other choices.

    The entropy will be computed on corrected values only in the range
    [0, limit], so limit should be set to a characteristic large value of
    uncorrected. If limit is None (the default), then in will be compute as
    25% larger than the 99%ile point of uncorrected.

    The model is that the filtered pulse height PH should be scaled by (1 +
    a*PTM) where a is an arbitrary parameter computed here, and PTM is the
    difference between each record's pretrigger mean and the median value of all
    pretrigger means. (Or replace "pretrigger mean" with whatever quantity you
    passed in as <indicator>.)
    """
    ptm_offset = np.median(indicator)
    indicator = np.array(indicator) - ptm_offset

    if limit is None:
        pct99 = np.percentile(uncorrected, 99)
        limit = 1.25 * pct99

    smoother = HistogramSmoother(0.5, [0, limit])
    assert smoother.nbins < 1e6, "will be crazy slow, should not be possible"

    def entropy(param, indicator, uncorrected, smoother):
        corrected = uncorrected * (1 + indicator * param)
        hsmooth = smoother(corrected)
        w = hsmooth > 0
        return -(np.log(hsmooth[w]) * hsmooth[w]).sum()

    drift_corr_param = sp.optimize.brent(entropy, (indicator, uncorrected, smoother), brack=[0, 0.001])

    drift_correct_info = {"type": "ptmean_gain", "slope": drift_corr_param, "median_pretrig_mean": ptm_offset}
    return drift_corr_param, drift_correct_info

estimateRiseTime(pulse_data, timebase, nPretrig)

Computes the rise time of timeseries , where the time steps are . can be a 2D array where each row is a different pulse record, in which case the return value will be an array last long as the number of rows in .

If nPretrig >= 4, then the samples pulse_data[:nPretrig] are averaged to estimate the baseline. Otherwise, the minimum of pulse_data is assumed to be the baseline.

Specifically, take the first and last of the rising points in the range of 10% to 90% of the peak value, interpolate a line between the two, and use its slope to find the time to rise from 0 to the peak.

Args: pulse_data: An np.ndarray of dimension 1 (a single pulse record) or 2 (an array with each row being a pulse record). timebase: The sampling time. nPretrig: The number of samples that are recorded before the trigger.

Returns: An ndarray of dimension 1, giving the rise times.

Source code in mass2/core/analysis_algorithms.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
@njit
def estimateRiseTime(pulse_data, timebase, nPretrig):
    """Computes the rise time of timeseries <pulse_data>, where the time steps are <timebase>.
    <pulse_data> can be a 2D array where each row is a different pulse record, in which case
    the return value will be an array last long as the number of rows in <pulse_data>.

    If nPretrig >= 4, then the samples pulse_data[:nPretrig] are averaged to estimate
    the baseline.  Otherwise, the minimum of pulse_data is assumed to be the baseline.

    Specifically, take the first and last of the rising points in the range of
    10% to 90% of the peak value, interpolate a line between the two, and use its
    slope to find the time to rise from 0 to the peak.

    Args:
        pulse_data: An np.ndarray of dimension 1 (a single pulse record) or 2 (an
            array with each row being a pulse record).
        timebase: The sampling time.
        nPretrig: The number of samples that are recorded before the trigger.

    Returns:
        An ndarray of dimension 1, giving the rise times.
    """
    MINTHRESH, MAXTHRESH = 0.1, 0.9

    # If pulse_data is a 1D array, turn it into 2
    pulse_data = np.asarray(pulse_data)
    ndim = len(pulse_data.shape)
    if ndim > 2 or ndim < 1:
        raise ValueError("input pulse_data should be a 1d or 2d array.")
    if ndim == 1:
        pulse_data.shape = (1, pulse_data.shape[0])

    # The following requires a lot of numpy foo to read. Sorry!
    if nPretrig >= 4:
        baseline_value = pulse_data[:, 0:nPretrig].mean(axis=1)
    else:
        baseline_value = pulse_data.min(axis=1)
        nPretrig = 0
    value_at_peak = pulse_data.max(axis=1) - baseline_value
    idx_last_pk = pulse_data.argmax(axis=1).max()

    npulses = pulse_data.shape[0]
    try:
        rising_data = (pulse_data[:, nPretrig : idx_last_pk + 1] - baseline_value[:, np.newaxis]) / value_at_peak[:, np.newaxis]
        # Find the last and first indices at which the data are in (0.1, 0.9] times the
        # peak value. Then make sure last is at least 1 past first.
        last_idx = (rising_data > MAXTHRESH).argmax(axis=1) - 1
        first_idx = (rising_data > MINTHRESH).argmax(axis=1)
        last_idx[last_idx < first_idx] = first_idx[last_idx < first_idx] + 1
        last_idx[last_idx == rising_data.shape[1]] = rising_data.shape[1] - 1

        pulsenum = np.arange(npulses)
        y_diff = np.asarray(rising_data[pulsenum, last_idx] - rising_data[pulsenum, first_idx], dtype=float)
        y_diff[y_diff < timebase] = timebase
        time_diff = timebase * (last_idx - first_idx)
        rise_time = time_diff / y_diff
        rise_time[y_diff <= 0] = -9.9e-6
        return rise_time

    except ValueError:
        return -9.9e-6 + np.zeros(npulses, dtype=float)

filter_signal_lowpass(sig, fs, fcut)

Tophat lowpass filter using an FFT

Args: sig - the signal to be filtered fs - the sampling frequency of the signal fcut - the frequency at which to cutoff the signal

Returns: the filtered signal

Source code in mass2/core/analysis_algorithms.py
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
@njit
def filter_signal_lowpass(sig, fs, fcut):
    """Tophat lowpass filter using an FFT

    Args:
        sig - the signal to be filtered
        fs - the sampling frequency of the signal
        fcut - the frequency at which to cutoff the signal

    Returns:
        the filtered signal
    """
    N = sig.shape[0]
    SIG = np.fft.fft(sig)
    freqs = (fs / N) * np.concatenate((np.arange(0, N / 2 + 1), np.arange(N / 2 - 1, 0, -1)))
    filt = np.zeros_like(SIG)
    filt[freqs < fcut] = 1.0
    sig_filt = np.fft.ifft(SIG * filt)
    return sig_filt

make_smooth_histogram(values, smooth_sigma, limit, upper_limit=None)

Convert a vector of arbitrary info a smoothed histogram by histogramming it and smoothing.

This is a convenience function using the HistogramSmoother class.

Args: values: The vector of data to be histogrammed. smooth_sigma: The smoothing Gaussian's width (FWHM) limit, upper_limit: The histogram limits are [limit,upper_limit] or [0,limit] if upper_limit is None.

Returns: The smoothed histogram as an array.

Source code in mass2/core/analysis_algorithms.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
@njit
def make_smooth_histogram(values, smooth_sigma, limit, upper_limit=None):
    """Convert a vector of arbitrary <values> info a smoothed histogram by
    histogramming it and smoothing.

    This is a convenience function using the HistogramSmoother class.

    Args:
        values: The vector of data to be histogrammed.
        smooth_sigma: The smoothing Gaussian's width (FWHM)
        limit, upper_limit: The histogram limits are [limit,upper_limit] or
            [0,limit] if upper_limit is None.

    Returns:
        The smoothed histogram as an array.
    """
    if upper_limit is None:
        limit, upper_limit = 0, limit
    return HistogramSmoother(smooth_sigma, [limit, upper_limit])(values)

nearest_arrivals(reference_times, other_times)

Find the external trigger time immediately before and after each pulse timestamp

Args: pulse_timestamps - 1d array of pulse timestamps whose nearest neighbors need to be found. external_trigger_timestamps - 1d array of possible nearest neighbors.

Returns: (before_times, after_times)

before_times is an ndarray of the same size as pulse_timestamps. before_times[i] contains the difference between the closest lesser time contained in external_trigger_timestamps and pulse_timestamps[i] or inf if there was no earlier time in other_times Note that before_times is always a positive number even though the time difference it represents is negative.

after_times is an ndarray of the same size as pulse_timestamps. after_times[i] contains the difference between pulse_timestamps[i] and the closest greater time contained in other_times or a inf number if there was no later time in external_trigger_timestamps.

Source code in mass2/core/analysis_algorithms.py
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
@njit
def nearest_arrivals(reference_times, other_times):
    """Find the external trigger time immediately before and after each pulse timestamp

    Args:
        pulse_timestamps - 1d array of pulse timestamps whose nearest neighbors
            need to be found.
        external_trigger_timestamps - 1d array of possible nearest neighbors.

    Returns:
        (before_times, after_times)

    before_times is an ndarray of the same size as pulse_timestamps.
    before_times[i] contains the difference between the closest lesser time
    contained in external_trigger_timestamps and pulse_timestamps[i]  or inf if there was no
    earlier time in other_times Note that before_times is always a positive
    number even though the time difference it represents is negative.

    after_times is an ndarray of the same size as pulse_timestamps.
    after_times[i] contains the difference between pulse_timestamps[i] and the
    closest greater time contained in other_times or a inf number if there was
    no later time in external_trigger_timestamps.
    """
    nearest_after_index = np.searchsorted(other_times, reference_times)
    # because both sets of arrival times should be sorted, there are faster algorithms than searchsorted
    # for example: https://github.com/kwgoodman/bottleneck/issues/47
    # we could use one if performance becomes an issue
    last_index = np.searchsorted(nearest_after_index, other_times.size, side="left")
    first_index = np.searchsorted(nearest_after_index, 1)

    nearest_before_index = np.copy(nearest_after_index)
    nearest_before_index[:first_index] = 1
    nearest_before_index -= 1
    before_times = reference_times - other_times[nearest_before_index]
    before_times[:first_index] = np.inf

    nearest_after_index[last_index:] = other_times.size - 1
    after_times = other_times[nearest_after_index] - reference_times
    after_times[last_index:] = np.inf

    return before_times, after_times

time_drift_correct(time, uncorrected, w, sec_per_degree=2000, pulses_per_degree=2000, max_degrees=20, ndeg=None, limit=None)

Compute a time-based drift correction that minimizes the spectral entropy.

Args: time: The "time-axis". Correction will be a low-order polynomial in this. uncorrected: A filtered pulse height vector. Same length as indicator. Assumed to have some gain that is linearly related to indicator. w: the kernel width for the Laplace KDE density estimator sec_per_degree: assign as many as one polynomial degree per this many seconds pulses_per_degree: assign as many as one polynomial degree per this many pulses max_degrees: never use more than this many degrees of Legendre polynomial. n_deg: If not None, use this many degrees, regardless of the values of sec_per_degree, pulses_per_degree, and max_degress. In this case, never downsample. limit: The [lower,upper] limit of uncorrected values over which entropy is computed (default None).

The entropy will be computed on corrected values only in the range [limit[0], limit[1]], so limit should be set to a characteristic large value of uncorrected. If limit is None (the default), then it will be computed as 25%% larger than the 99%%ile point of uncorrected.

Possible improvements in the future: * Use Numba to speed up. * Allow the parameters to be function arguments with defaults: photons per degree of freedom, seconds per degree of freedom, and max degrees of freedom. * Figure out how to span the available time with more than one set of legendre polynomials, so that we can have more than 20 d.o.f. eventually, for long runs.

Source code in mass2/core/analysis_algorithms.py
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
def time_drift_correct(  # noqa: PLR0914
    time,
    uncorrected,
    w,
    sec_per_degree=2000,
    pulses_per_degree=2000,
    max_degrees=20,
    ndeg=None,
    limit=None,
):
    """Compute a time-based drift correction that minimizes the spectral entropy.

    Args:
        time: The "time-axis". Correction will be a low-order polynomial in this.
        uncorrected: A filtered pulse height vector. Same length as indicator.
            Assumed to have some gain that is linearly related to indicator.
        w: the kernel width for the Laplace KDE density estimator
        sec_per_degree: assign as many as one polynomial degree per this many seconds
        pulses_per_degree: assign as many as one polynomial degree per this many pulses
        max_degrees: never use more than this many degrees of Legendre polynomial.
        n_deg: If not None, use this many degrees, regardless of the values of
               sec_per_degree, pulses_per_degree, and max_degress. In this case, never downsample.
        limit: The [lower,upper] limit of uncorrected values over which entropy is
            computed (default None).

    The entropy will be computed on corrected values only in the range
    [limit[0], limit[1]], so limit should be set to a characteristic large value
    of uncorrected. If limit is None (the default), then it will be computed as
    25%% larger than the 99%%ile point of uncorrected.

    Possible improvements in the future:
    * Use Numba to speed up.
    * Allow the parameters to be function arguments with defaults: photons per
      degree of freedom, seconds per degree of freedom, and max degrees of freedom.
    * Figure out how to span the available time with more than one set of legendre
      polynomials, so that we can have more than 20 d.o.f. eventually, for long runs.
    """
    if limit is None:
        pct99 = np.percentile(uncorrected, 99)
        limit = [0, 1.25 * pct99]

    use = np.logical_and(uncorrected > limit[0], uncorrected < limit[1])
    time = time[use]
    uncorrected = uncorrected[use]

    tmin, tmax = np.min(time), np.max(time)

    def normalize(t):
        return (t - tmin) / (tmax - tmin) * 2 - 1

    info = {
        "tmin": tmin,
        "tmax": tmax,
        "normalize": normalize,
    }

    dtime = tmax - tmin
    N = len(time)
    if ndeg is None:
        ndeg = int(np.minimum(dtime / sec_per_degree, N / pulses_per_degree))
        ndeg = min(ndeg, max_degrees)
        ndeg = max(ndeg, 1)
        phot_per_degree = N / float(ndeg)
        if phot_per_degree >= 2 * pulses_per_degree:
            downsample = int(phot_per_degree / pulses_per_degree)
            time = time[::downsample]
            uncorrected = uncorrected[::downsample]
            N = len(time)
        else:
            downsample = 1
    else:
        downsample = 1

    LOG.info("Using %2d degrees for %6d photons (after %d downsample)", ndeg, N, downsample)
    LOG.info("That's %6.1f photons per degree, and %6.1f seconds per degree.", N / float(ndeg), dtime / ndeg)

    def model1(pi, i, param, basis):
        pcopy = np.array(param)
        pcopy[i] = pi
        return 1 + np.dot(basis.T, pcopy)

    def cost1(pi, i, param, y, w, basis):
        return laplace_entropy(y * model1(pi, i, param, basis), w=w)

    param = np.zeros(ndeg, dtype=float)
    xnorm = np.asarray(normalize(time), dtype=float)
    basis = np.vstack([sp.special.legendre(i + 1)(xnorm) for i in range(ndeg)])

    fc = 0
    model = np.poly1d([0])
    info["coefficients"] = np.zeros(ndeg, dtype=float)
    for i in range(ndeg):
        result, _fval, _iter, funcalls = sp.optimize.brent(
            cost1, (i, param, uncorrected, w, basis), [-0.001, 0.001], tol=1e-5, full_output=True
        )
        param[i] = result
        fc += funcalls
        model += sp.special.legendre(i + 1) * result
        info["coefficients"][i] = result
    info["funccalls"] = fc

    xk = np.linspace(-1, 1, 1 + 2 * ndeg)
    model2 = CubicSpline(xk, model(xk))
    H1 = laplace_entropy(uncorrected, w=w)
    H2 = laplace_entropy(uncorrected * (1 + model(xnorm)), w=w)
    H3 = laplace_entropy(uncorrected * (1 + model2(xnorm)), w=w)
    if H2 <= 0 or H2 - H1 > 0.0:
        model = np.poly1d([0])
    elif H3 <= 0 or H3 - H2 > 0.00001:
        model2 = model

    info["entropies"] = (H1, H2, H3)
    info["model"] = model
    return info

unwrap_n(data, period, mask, n=3)

Unwrap data that has been restricted to a given period.

The algorithm iterates through each data point and compares it to the average of the previous n data points. It then offsets the data point by the multiple of the period that will minimize the difference from that n-point running average.

For the first n data points, there are not enough preceding points to average n of them, so the algorithm will average fewer points.

This code was written by Thomas Baker; integrated into MASS by Dan Becker. Sped up 300x by @njit.

Parameters:
  • data (array of data values) –
  • period (the range over which the data loops) –
  • n (how many preceding points to average, default: 3 ) –
  • mask
Source code in mass2/core/analysis_algorithms.py
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
@njit
def unwrap_n(data, period, mask, n=3):
    """Unwrap data that has been restricted to a given period.

    The algorithm iterates through each data point and compares
    it to the average of the previous n data points. It then
    offsets the data point by the multiple of the period that
    will minimize the difference from that n-point running average.

    For the first n data points, there are not enough preceding
    points to average n of them, so the algorithm will average
    fewer points.

    This code was written by Thomas Baker; integrated into MASS by Dan
    Becker. Sped up 300x by @njit.

    Parameters
    ----------
    data : array of data values
    period : the range over which the data loops
    n : how many preceding points to average
    mask (optional) -- mask indentifying "good" pulses
    """
    udata = data.copy()
    if n <= 0:
        return udata

    # Iterate through each data point and offset it by
    # an amount that will minimize the difference from the
    # rolling average
    nprior = 0
    firstgoodidx = np.argmax(mask)
    priorvalues = np.full(n, udata[firstgoodidx])
    for i in range(len(data)):
        # Take the average of the previous n data points (only those with mask[i]==True).
        # Offset the data point by the most reasonable multiple of period (make this point closest to the running average).
        if mask[i]:
            avg = np.mean(priorvalues)
            if nprior == 0:
                avg = float(priorvalues[0])
            elif nprior < n:
                avg = np.mean(priorvalues[:nprior])
        udata[i] -= np.round((udata[i] - avg) / period) * period
        if mask[i]:
            priorvalues[nprior % n] = udata[i]
            nprior += 1
    return udata

ColumnAsNumpyMapStep dataclass

Bases: RecipeStep

This step is meant for interactive exploration, it takes a column and applies a function to it, and makes a new column with the result. It makes it easy to test functions on a column without having to write a whole new step class, while maintaining the benefit of being able to use the step in a Recipe chain, like replaying steps on another channel.

example usage:

def my_function(x): ... return x * 2 step = ColumnAsNumpyMapStep(inputs=["my_column"], output=["my_new_column"], f=my_function) ch2 = ch.with_step(step)

Source code in mass2/core/recipe.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
@dataclass(frozen=True)
class ColumnAsNumpyMapStep(RecipeStep):
    """
    This step is meant for interactive exploration, it takes a column and applies a function to it,
    and makes a new column with the result. It makes it easy to test functions on a column without
    having to write a whole new step class,
    while maintaining the benefit of being able to use the step in a Recipe chain, like replaying steps
    on another channel.

    example usage:
    >>> def my_function(x):
    ...     return x * 2
    >>> step = ColumnAsNumpyMapStep(inputs=["my_column"], output=["my_new_column"], f=my_function)
    >>> ch2 = ch.with_step(step)
    """

    f: Callable[[np.ndarray], np.ndarray]

    def __post_init__(self):
        assert len(self.inputs) == 1, "ColumnMapStep expects exactly one input"
        assert len(self.output) == 1, "ColumnMapStep expects exactly one output"
        if not callable(self.f):
            raise ValueError(f"f must be a callable, got {self.f}")

    def calc_from_df(self, df: pl.DataFrame) -> pl.DataFrame:
        output_col = self.output[0]
        output_segments = []
        for df_iter in df.select(self.inputs).iter_slices():
            series1 = df_iter[self.inputs[0]]
            # Have to apply the function differently when series elements are arrays vs scalars
            if series1.dtype.base_type() is pl.Array:
                output_numpy = np.array([self.f(v.to_numpy()) for v in series1])
            else:
                output_numpy = self.f(series1.to_numpy())
            this_output_segment = pl.Series(output_col, output_numpy)
            output_segments.append(this_output_segment)

        combined = pl.concat(output_segments)
        # Put into a DataFrame with one column
        df2 = pl.DataFrame({output_col: combined}).with_columns(df)
        return df2

Recipe dataclass

Source code in mass2/core/recipe.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
@dataclass(frozen=True)
class Recipe:
    # leaves many optimizations on the table, but is very simple
    # 1. we could calculate filt_value_5lag and filt_phase_5lag at the same time
    # 2. we could calculate intermediate quantities optionally and not materialize all of them
    steps: list[RecipeStep]

    def calc_from_df(self, df: pl.DataFrame) -> pl.DataFrame:
        "return a dataframe with all the newly calculated info"
        for step in self.steps:
            df = step.calc_from_df(df).with_columns(df)
        return df

    @classmethod
    def new_empty(cls) -> "Recipe":
        return cls([])

    def __getitem__(self, key: int) -> RecipeStep:
        return self.steps[key]

    def __len__(self) -> int:
        return len(self.steps)

    # def copy(self):
    #     # copy by creating a new list containing all the entires in the old list
    #     # a list entry, aka a RecipeStep, should be immutable
    #     return Recipe(self.steps[:])

    def with_step(self, step: RecipeStep) -> "Recipe":
        # return a new Recipe with the step added, no mutation!
        return Recipe(self.steps + [step])

    def trim_dead_ends(self, required_fields: Iterable[str] | str | None, drop_debug: bool = True) -> "Recipe":
        """Create a new Recipe object with all dead-end steps (and optionally also debug info) removed.

        The purpose is to replace the fully useful interactive Recipe with a trimmed-down object that can
        repeat the current steps as a "recipe" without having the extra information from which the recipe
        was first created. In one test, this method reduced the pickle file's size from 3.4 MB per channel
        to 30 kB per channel, or a 112x size reduction (with `drop_debug=True`).

        Dead-end steps are defined as any step that can be omitted without affecting the ability to
        compute any of the fields given in `required_fields`. The result of this method is to return
        a Recipe where any step is remove if it does not contribute to computing any of the `required_fields`
        (i.e., if it is a dead end).

        Examples of a dead end are typically steps used to prepare a tentative, intermediate calibration function.

        Parameters
        ----------
        required_fields : Iterable[str] | str | None
            Steps will be preserved if any of their outputs are among `required_fields`, or if their outputs are
            found recursively among the inputs to any such steps. If a string, treat as a list of that one string.
            If None, preserve all steps.

        drop_debug : bool
            Whether to run `step.drop_debug()` to remove debugging information from the preserved steps.

        Returns
        -------
        Recipe
            A copy of `self`, except that any steps not required to compute any of `required_fields` are omitted.
        """
        if isinstance(required_fields, str):
            required_fields = [required_fields]

        nsteps = len(self)
        required = np.zeros(nsteps, dtype=bool)

        # The easiest approach is to traverse the steps from last to first to build our list of required
        # fields, because necessarily no later step can produce the inputs needed by an earlier step.
        if required_fields is None:
            required[:] = True
        else:
            all_fields_out: set[str] = set(required_fields)
            for istep in range(nsteps - 1, -1, -1):
                step = self[istep]
                for field in step.output:
                    if field in all_fields_out:
                        required[istep] = True
                        all_fields_out.update(step.inputs)
                        break

        if not np.any(required):
            # If this error ever because a problem, where user _acutally_ wants an empty series of steps
            # to be a non-err, then add argument `error_on_empty_output=True` to this method.
            raise ValueError("trim_dead_ends found no steps to be preserved")

        steps = []
        for i in range(nsteps):
            if required[i]:
                if drop_debug:
                    steps.append(self[i].drop_debug())
                else:
                    steps.append(self[i])
        return Recipe(steps)

calc_from_df(df)

return a dataframe with all the newly calculated info

Source code in mass2/core/recipe.py
188
189
190
191
192
def calc_from_df(self, df: pl.DataFrame) -> pl.DataFrame:
    "return a dataframe with all the newly calculated info"
    for step in self.steps:
        df = step.calc_from_df(df).with_columns(df)
    return df

trim_dead_ends(required_fields, drop_debug=True)

Create a new Recipe object with all dead-end steps (and optionally also debug info) removed.

The purpose is to replace the fully useful interactive Recipe with a trimmed-down object that can repeat the current steps as a "recipe" without having the extra information from which the recipe was first created. In one test, this method reduced the pickle file's size from 3.4 MB per channel to 30 kB per channel, or a 112x size reduction (with drop_debug=True).

Dead-end steps are defined as any step that can be omitted without affecting the ability to compute any of the fields given in required_fields. The result of this method is to return a Recipe where any step is remove if it does not contribute to computing any of the required_fields (i.e., if it is a dead end).

Examples of a dead end are typically steps used to prepare a tentative, intermediate calibration function.

Parameters:
  • required_fields (Iterable[str] | str | None) –

    Steps will be preserved if any of their outputs are among required_fields, or if their outputs are found recursively among the inputs to any such steps. If a string, treat as a list of that one string. If None, preserve all steps.

  • drop_debug (bool, default: True ) –

    Whether to run step.drop_debug() to remove debugging information from the preserved steps.

Returns:
  • Recipe

    A copy of self, except that any steps not required to compute any of required_fields are omitted.

Source code in mass2/core/recipe.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
def trim_dead_ends(self, required_fields: Iterable[str] | str | None, drop_debug: bool = True) -> "Recipe":
    """Create a new Recipe object with all dead-end steps (and optionally also debug info) removed.

    The purpose is to replace the fully useful interactive Recipe with a trimmed-down object that can
    repeat the current steps as a "recipe" without having the extra information from which the recipe
    was first created. In one test, this method reduced the pickle file's size from 3.4 MB per channel
    to 30 kB per channel, or a 112x size reduction (with `drop_debug=True`).

    Dead-end steps are defined as any step that can be omitted without affecting the ability to
    compute any of the fields given in `required_fields`. The result of this method is to return
    a Recipe where any step is remove if it does not contribute to computing any of the `required_fields`
    (i.e., if it is a dead end).

    Examples of a dead end are typically steps used to prepare a tentative, intermediate calibration function.

    Parameters
    ----------
    required_fields : Iterable[str] | str | None
        Steps will be preserved if any of their outputs are among `required_fields`, or if their outputs are
        found recursively among the inputs to any such steps. If a string, treat as a list of that one string.
        If None, preserve all steps.

    drop_debug : bool
        Whether to run `step.drop_debug()` to remove debugging information from the preserved steps.

    Returns
    -------
    Recipe
        A copy of `self`, except that any steps not required to compute any of `required_fields` are omitted.
    """
    if isinstance(required_fields, str):
        required_fields = [required_fields]

    nsteps = len(self)
    required = np.zeros(nsteps, dtype=bool)

    # The easiest approach is to traverse the steps from last to first to build our list of required
    # fields, because necessarily no later step can produce the inputs needed by an earlier step.
    if required_fields is None:
        required[:] = True
    else:
        all_fields_out: set[str] = set(required_fields)
        for istep in range(nsteps - 1, -1, -1):
            step = self[istep]
            for field in step.output:
                if field in all_fields_out:
                    required[istep] = True
                    all_fields_out.update(step.inputs)
                    break

    if not np.any(required):
        # If this error ever because a problem, where user _acutally_ wants an empty series of steps
        # to be a non-err, then add argument `error_on_empty_output=True` to this method.
        raise ValueError("trim_dead_ends found no steps to be preserved")

    steps = []
    for i in range(nsteps):
        if required[i]:
            if drop_debug:
                steps.append(self[i].drop_debug())
            else:
                steps.append(self[i])
    return Recipe(steps)

RecipeStep dataclass

Source code in mass2/core/recipe.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@dataclass(frozen=True)
class RecipeStep:
    inputs: list[str]
    output: list[str]
    good_expr: pl.Expr
    use_expr: pl.Expr

    @property
    def name(self):
        return type(self)

    @property
    def description(self):
        return f"{type(self).__name__} inputs={self.inputs} outputs={self.output}"

    def calc_from_df(self, df: pl.DataFrame) -> pl.DataFrame:
        # TODO: should this be an abstract method?
        return df.filter(self.good_expr)

    def dbg_plot(self, df_after: pl.DataFrame, **kwargs) -> plt.Axes:
        # this is a no-op, subclasses can override this to plot something
        plt.figure()
        plt.text(0.0, 0.5, f"No plot defined for: {self.description}")
        return plt.gca()

    def drop_debug(self) -> "RecipeStep":
        "Return self, or a copy of it with debug information removed"
        return self

drop_debug()

Return self, or a copy of it with debug information removed

Source code in mass2/core/recipe.py
35
36
37
def drop_debug(self) -> "RecipeStep":
    "Return self, or a copy of it with debug information removed"
    return self

SelectStep dataclass

Bases: RecipeStep

This step is meant for interactive exploration, it's basically like the df.select() method, but it's saved as a step.

Source code in mass2/core/recipe.py
167
168
169
170
171
172
173
174
175
176
177
178
@dataclass(frozen=True)
class SelectStep(RecipeStep):
    """
    This step is meant for interactive exploration, it's basically like the df.select() method, but it's saved as a step.

    """

    col_expr_dict: dict[str, pl.Expr]

    def calc_from_df(self, df: pl.DataFrame) -> pl.DataFrame:
        df2 = df.select(**self.col_expr_dict).with_columns(df)
        return df2

Classes to create time-domain and Fourier-domain optimal filters.

Filter dataclass

Bases: ABC

A single optimal filter, possibly with optimal estimators of the Delta-t and of the DC level.

Returns:
  • Filter

    A set of optimal filter values.

    These values are chosen with the following specifics: * one model of the pulses and of the noise, including pulse record length, * a first-order arrival-time detection filter is (optionally) computed * filtering model (1-lag, or other odd # of lags), * low-pass smoothing of the filter itself, * a fixed number of samples "cut" (given zero weight) at the start and/or end of records.

    The object also stores the pulse shape and (optionally) the delta-T shape used to generate it, and the low-pass filter's fmax or f_3db (cutoff or rolloff) frequency.

    It also stores the predicted variance due to noise and the resulting predicted_v_over_dv, the ratio of the filtered pulse height to the (FWHM) noise, in pulse height units. Both of these values assume pulses of the same size as that used to generate the filter: nominal_peak.

Source code in mass2/core/optimal_filtering.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
@dataclass(frozen=True)
class Filter(ABC):
    """A single optimal filter, possibly with optimal estimators of the Delta-t and of the DC level.

    Returns
    -------
    Filter
        A set of optimal filter values.

        These values are chosen with the following specifics:
        * one model of the pulses and of the noise, including pulse record length,
        * a first-order arrival-time detection filter is (optionally) computed
        * filtering model (1-lag, or other odd # of lags),
        * low-pass smoothing of the filter itself,
        * a fixed number of samples "cut" (given zero weight) at the start and/or end of records.

        The object also stores the pulse shape and (optionally) the delta-T shape used to generate it,
        and the low-pass filter's fmax or f_3db (cutoff or rolloff) frequency.

        It also stores the predicted `variance` due to noise and the resulting `predicted_v_over_dv`,
        the ratio of the filtered pulse height to the (FWHM) noise, in pulse height units. Both
        of these values assume pulses of the same size as that used to generate the filter: `nominal_peak`.

    """

    values: np.ndarray
    nominal_peak: float
    variance: float
    predicted_v_over_dv: float
    dt_values: np.ndarray | None
    const_values: np.ndarray | None
    signal_model: np.ndarray | None
    dt_model: np.ndarray | None
    convolution_lags: int = 1
    fmax: float | None = None
    f_3db: float | None = None
    cut_pre: int = 0
    cut_post: int = 0

    @property
    @abstractmethod
    def is_arrival_time_safe(self):
        """Is this an arrival-time-safe filter?"""
        return False

    @property
    @abstractmethod
    def _filter_type(self):
        return "illegal: this is supposed to be an abstract base class"

    def plot(self, axis: plt.Axes | None = None, **kwargs):
        """Make a plot of the filter

        Parameters
        ----------
        axis : plt.Axes, optional
            A pre-existing axis to plot on, by default None
        """
        if axis is None:
            plt.clf()
            axis = plt.subplot(111)
        axis.plot(self.values, label="mass 5lag filter", **kwargs)
        axis.grid()
        axis.set_title(f"{self._filter_type=} V/dV={self.predicted_v_over_dv:.2f}")
        axis.set_ylabel("filter value")
        axis.set_xlabel("Lag Time (s)")
        plt.gcf().tight_layout()

    def report(self, std_energy: float = 5898.8):
        """Report on estimated V/dV for the filter.

        Parameters
        ----------
        std_energy : float, optional
            Energy (in eV) of a "standard" pulse.  Resolution will be given in eV at this energy,
                assuming linear devices, by default 5898.8
        """
        var = self.variance
        v_dv = self.predicted_v_over_dv
        fwhm_eV = std_energy / v_dv
        print(f"v/\u03b4v: {v_dv: .2f}, variance: {var:.2f} \u03b4E: {fwhm_eV:.2f} eV (FWHM), assuming standard E={std_energy:.2f} eV")

    @abstractmethod
    def filter_records(self, x: npt.ArrayLike) -> tuple[np.ndarray, np.ndarray]:
        pass

is_arrival_time_safe abstractmethod property

Is this an arrival-time-safe filter?

plot(axis=None, **kwargs)

Make a plot of the filter

Parameters:
  • axis (Axes, default: None ) –

    A pre-existing axis to plot on, by default None

Source code in mass2/core/optimal_filtering.py
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
def plot(self, axis: plt.Axes | None = None, **kwargs):
    """Make a plot of the filter

    Parameters
    ----------
    axis : plt.Axes, optional
        A pre-existing axis to plot on, by default None
    """
    if axis is None:
        plt.clf()
        axis = plt.subplot(111)
    axis.plot(self.values, label="mass 5lag filter", **kwargs)
    axis.grid()
    axis.set_title(f"{self._filter_type=} V/dV={self.predicted_v_over_dv:.2f}")
    axis.set_ylabel("filter value")
    axis.set_xlabel("Lag Time (s)")
    plt.gcf().tight_layout()

report(std_energy=5898.8)

Report on estimated V/dV for the filter.

Parameters:
  • std_energy (float, default: 5898.8 ) –

    Energy (in eV) of a "standard" pulse. Resolution will be given in eV at this energy, assuming linear devices, by default 5898.8

Source code in mass2/core/optimal_filtering.py
298
299
300
301
302
303
304
305
306
307
308
309
310
def report(self, std_energy: float = 5898.8):
    """Report on estimated V/dV for the filter.

    Parameters
    ----------
    std_energy : float, optional
        Energy (in eV) of a "standard" pulse.  Resolution will be given in eV at this energy,
            assuming linear devices, by default 5898.8
    """
    var = self.variance
    v_dv = self.predicted_v_over_dv
    fwhm_eV = std_energy / v_dv
    print(f"v/\u03b4v: {v_dv: .2f}, variance: {var:.2f} \u03b4E: {fwhm_eV:.2f} eV (FWHM), assuming standard E={std_energy:.2f} eV")

Filter5Lag dataclass

Bases: Filter

Represent an optimal filter, specifically one intended for 5-lag convolution with data

The traditional 5-lag filter used by default until 2015.
Returns:
  • Filter5Lag

    An optimal filter, for convolution with data (at 5 lags, obviously)

Source code in mass2/core/optimal_filtering.py
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
@dataclass(frozen=True)
class Filter5Lag(Filter):
    """Represent an optimal filter, specifically one intended for 5-lag convolution with data

        The traditional 5-lag filter used by default until 2015.

    Returns
    -------
    Filter5Lag
        An optimal filter, for convolution with data (at 5 lags, obviously)
    """

    def __post_init__(self):
        assert self.convolution_lags == 5

    @property
    def is_arrival_time_safe(self):
        """Is this an arrival-time-safe filter?"""
        return False

    @property
    def _filter_type(self):
        return "5lag"

    # These parameters fit a parabola to any 5 evenly-spaced points
    FIVELAG_FITTER = (
        np.array(
            (
                (-6, 24, 34, 24, -6),
                (-14, -7, 0, 7, 14),
                (10, -5, -10, -5, 10),
            ),
            dtype=float,
        )
        / 70.0
    )

    def filter_records(self, x: npt.ArrayLike) -> tuple[np.ndarray, np.ndarray]:
        """Filter one microcalorimeter record or an array of records.

        Parameters
        ----------
        x : npt.ArrayLike
            A 1-d array, a single pulse record, or a 2-d array, where `x[i, :]` is pulse record number `i`.

        Returns
        -------
        tuple[np.ndarray, np.ndarray]
            1. The optimally filtered value, or an array (one per row) if the input is a 2-d array.
            2. The phase, or arrival-time estimate in samples. Same shape as the filtered value.

        Raises
        ------
        AssertionError
            If the input array is the wrong length
        """
        x = np.asarray(x)
        if x.ndim == 1:
            x = x.reshape((1, len(x)))
        nrec, nsamp = x.shape
        nlags = self.convolution_lags
        assert nsamp == len(self.values) + nlags - 1
        nrec = x.shape[0]
        conv = np.zeros((nlags, nrec), dtype=float)
        for i in range(nlags - 1):
            conv[i, :] = np.dot(x[:, i : i + 1 - nlags], self.values)
        conv[nlags - 1, :] = np.dot(x[:, nlags - 1 :], self.values)

        # Least-squares fit of 5 values to a parabola.
        # Order is row 0 = constant ... row 2 = quadratic coefficients.
        if nlags != 5:
            raise NotImplementedError("Currently require 5 lags to estimate peak x, y")
        param = np.dot(self.FIVELAG_FITTER, conv)
        peak_x = -0.5 * param[1, :] / param[2, :]
        peak_y = param[0, :] - 0.25 * param[1, :] ** 2 / param[2, :]
        return peak_y, peak_x

is_arrival_time_safe property

Is this an arrival-time-safe filter?

filter_records(x)

Filter one microcalorimeter record or an array of records.

Parameters:
  • x (ArrayLike) –

    A 1-d array, a single pulse record, or a 2-d array, where x[i, :] is pulse record number i.

Returns:
  • tuple[ndarray, ndarray]
    1. The optimally filtered value, or an array (one per row) if the input is a 2-d array.
    2. The phase, or arrival-time estimate in samples. Same shape as the filtered value.
Raises:
  • AssertionError

    If the input array is the wrong length

Source code in mass2/core/optimal_filtering.py
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
def filter_records(self, x: npt.ArrayLike) -> tuple[np.ndarray, np.ndarray]:
    """Filter one microcalorimeter record or an array of records.

    Parameters
    ----------
    x : npt.ArrayLike
        A 1-d array, a single pulse record, or a 2-d array, where `x[i, :]` is pulse record number `i`.

    Returns
    -------
    tuple[np.ndarray, np.ndarray]
        1. The optimally filtered value, or an array (one per row) if the input is a 2-d array.
        2. The phase, or arrival-time estimate in samples. Same shape as the filtered value.

    Raises
    ------
    AssertionError
        If the input array is the wrong length
    """
    x = np.asarray(x)
    if x.ndim == 1:
        x = x.reshape((1, len(x)))
    nrec, nsamp = x.shape
    nlags = self.convolution_lags
    assert nsamp == len(self.values) + nlags - 1
    nrec = x.shape[0]
    conv = np.zeros((nlags, nrec), dtype=float)
    for i in range(nlags - 1):
        conv[i, :] = np.dot(x[:, i : i + 1 - nlags], self.values)
    conv[nlags - 1, :] = np.dot(x[:, nlags - 1 :], self.values)

    # Least-squares fit of 5 values to a parabola.
    # Order is row 0 = constant ... row 2 = quadratic coefficients.
    if nlags != 5:
        raise NotImplementedError("Currently require 5 lags to estimate peak x, y")
    param = np.dot(self.FIVELAG_FITTER, conv)
    peak_x = -0.5 * param[1, :] / param[2, :]
    peak_y = param[0, :] - 0.25 * param[1, :] ** 2 / param[2, :]
    return peak_y, peak_x

FilterATS dataclass

Bases: Filter

Represent an optimal filter according to the arrival-time-safe, single-lag design of 2015.

Returns:
  • FilterATS

    An optimal filter, for convolution with data (at 5 lags, obviously)

Source code in mass2/core/optimal_filtering.py
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
@dataclass(frozen=True)
class FilterATS(Filter):
    """Represent an optimal filter according to the arrival-time-safe, single-lag design of 2015.

    Returns
    -------
    FilterATS
        An optimal filter, for convolution with data (at 5 lags, obviously)
    """

    def __post_init__(self):
        assert self.convolution_lags == 1
        assert self.dt_values is not None

    @property
    def is_arrival_time_safe(self):
        """Is this an arrival-time-safe filter?"""
        return True

    @property
    def _filter_type(self):
        return "ats"

    def filter_records(self, x: npt.ArrayLike) -> tuple[np.ndarray, np.ndarray]:
        """Filter one microcalorimeter record or an array of records.

        Parameters
        ----------
        x : npt.ArrayLike
            A 1-d array, a single pulse record, or a 2-d array, each row a pulse records.

        Returns
        -------
        tuple[np.ndarray, np.ndarray]
            1. The optimally filtered value, or an array (one per row) if the input is a 2-d array.
            2. The phase, or arrival-time estimate in samples. Same shape as the filtered value.

        Raises
        ------
        AssertionError
            If the input array is the wrong length
        """
        x = np.asarray(x)
        if x.ndim == 1:
            x = x.reshape((1, len(x)))
        _, nsamp = x.shape

        assert nsamp == len(self.values)
        return _filter_records_ats(x, self.values, self.dt_values)

is_arrival_time_safe property

Is this an arrival-time-safe filter?

filter_records(x)

Filter one microcalorimeter record or an array of records.

Parameters:
  • x (ArrayLike) –

    A 1-d array, a single pulse record, or a 2-d array, each row a pulse records.

Returns:
  • tuple[ndarray, ndarray]
    1. The optimally filtered value, or an array (one per row) if the input is a 2-d array.
    2. The phase, or arrival-time estimate in samples. Same shape as the filtered value.
Raises:
  • AssertionError

    If the input array is the wrong length

Source code in mass2/core/optimal_filtering.py
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
def filter_records(self, x: npt.ArrayLike) -> tuple[np.ndarray, np.ndarray]:
    """Filter one microcalorimeter record or an array of records.

    Parameters
    ----------
    x : npt.ArrayLike
        A 1-d array, a single pulse record, or a 2-d array, each row a pulse records.

    Returns
    -------
    tuple[np.ndarray, np.ndarray]
        1. The optimally filtered value, or an array (one per row) if the input is a 2-d array.
        2. The phase, or arrival-time estimate in samples. Same shape as the filtered value.

    Raises
    ------
    AssertionError
        If the input array is the wrong length
    """
    x = np.asarray(x)
    if x.ndim == 1:
        x = x.reshape((1, len(x)))
    _, nsamp = x.shape

    assert nsamp == len(self.values)
    return _filter_records_ats(x, self.values, self.dt_values)

FilterMaker dataclass

An object capable of creating optimal filter based on a single signal and noise set.

Parameters:
  • signal_model (ArrayLike) –

    The average signal shape. Filters will be rescaled so that the output upon putting this signal into the filter equals the peak value of this filter (that is, peak value relative to the baseline level).

  • n_pretrigger (int) –

    The number of leading samples in the average signal that are considered to be pre-trigger samples. The avg_signal in this section is replaced by its constant averaged value before creating filters. Also, one filter (filt_baseline_pretrig) is designed to infer the baseline using only n_pretrigger samples at the start of a record.

  • noise_autocorr (Optional[ArrayLike], default: None ) –

    The autocorrelation function of the noise, where the lag spacing is assumed to be the same as the sample period of avg_signal.

  • noise_psd (Optional[ArrayLike], default: None ) –

    The noise power spectral density. If not None, then it must be of length (2N+1), where N is the length of avg_signal, and its values are assumed to cover the non-negative frequencies from 0, 1/Delta, 2/Delta,.... up to the Nyquist frequency. If None, then method compute_fourier() will not work.

  • whitener (Optional[ToeplitzWhitener], default: None ) –

    An optional function object which, when called, whitens a vector or the columns of a matrix. Supersedes noise_autocorr if both are given.

  • sample_time_sec (float, default: 0.0 ) –

    The time step between samples in avg_signal and noise_autocorr (in seconds). This must be given if fmax or f_3db are ever to be used.

  • peak (float, default: 0.0 ) –

    The peak amplitude of the standard signal

Notes
  • If both noise_autocorr and whitener are None, then methods compute_5lag and compute_ats will both fail, as they require a time-domain characterization of the noise.

  • The units of noise_autocorr are the square of the units used in signal_model and/or peak. The units of whitener are the inverse of the signal units. Any rescaling of the noise autocorrelation or whitener does not affect any filter values, but only the predicted signal/noise ratios.

  • The units of noise_psd are square signal units, per Hertz.

Returns:
  • FilterMaker

    An object that can make a variety of optimal filters, assuming a single signal and noise analysis.

Source code in mass2/core/optimal_filtering.py
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
@dataclass(frozen=True)
class FilterMaker:
    """An object capable of creating optimal filter based on a single signal and noise set.

    Parameters
    ---------
    signal_model : npt.ArrayLike
        The average signal shape.  Filters will be rescaled so that the output
        upon putting this signal into the filter equals the *peak value* of this
        filter (that is, peak value relative to the baseline level).
    n_pretrigger : int
        The number of leading samples in the average signal that are considered
        to be pre-trigger samples.  The avg_signal in this section is replaced by
        its constant averaged value before creating filters.  Also, one filter
        (filt_baseline_pretrig) is designed to infer the baseline using only
        `n_pretrigger` samples at the start of a record.
    noise_autocorr : Optional[npt.ArrayLike]
        The autocorrelation function of the noise, where the lag spacing is
        assumed to be the same as the sample period of `avg_signal`.
    noise_psd : Optional[npt.ArrayLike]
        The noise power spectral density.  If not None, then it must be of length (2N+1),
        where N is the length of `avg_signal`, and its values are assumed to cover the
        non-negative frequencies from 0, 1/Delta, 2/Delta,.... up to the Nyquist frequency.
        If None, then method `compute_fourier()` will not work.
    whitener : Optional[ToeplitzWhitener]
        An optional function object which, when called, whitens a vector or the
        columns of a matrix. Supersedes `noise_autocorr` if both are given.
    sample_time_sec : float
        The time step between samples in `avg_signal` and `noise_autocorr` (in seconds).
        This must be given if `fmax` or `f_3db` are ever to be used.
    peak : float
        The peak amplitude of the standard signal


    Notes
    -----

    * If both `noise_autocorr` and `whitener` are None, then methods `compute_5lag` and
    `compute_ats` will both fail, as they require a time-domain characterization of the
    noise.

    * The units of `noise_autocorr` are the square of the units used in `signal_model` and/or
    `peak`. The units of `whitener` are the inverse of the signal units.  Any rescaling of the
    noise autocorrelation or whitener does not affect any filter values, but only
    the predicted signal/noise ratios.

    * The units of `noise_psd` are square signal units, per Hertz.

    Returns
    -------
    FilterMaker
        An object that can make a variety of optimal filters, assuming a single signal and noise analysis.
    """

    signal_model: npt.NDArray
    n_pretrigger: int
    noise_autocorr: npt.NDArray | None = None
    noise_psd: npt.NDArray | None = None
    dt_model: npt.NDArray | None = None
    whitener: ToeplitzWhitener | None = None
    sample_time_sec: float = 0.0
    peak: float = 0.0

    def compute_constrained_5lag(
        self,
        constraints: npt.ArrayLike | None = None,
        fmax: float | None = None,
        f_3db: float | None = None,
        cut_pre: int = 0,
        cut_post: int = 0,
    ) -> Filter:
        """Compute a single constrained optimal filter, with optional low-pass filtering, and with optional zero
        weights at the pre-trigger or post-trigger end of the filter.

        Either or both of `fmax` and `f_3db` are allowed.

        Parameters
        ----------
        constraints: ndarray, optional
            The vector or vectors to which the filter should be orthogonal. If a 2d array, each _row_
            is a constraint, and the number of columns should be equal to the len(self.signal_model)
            minus `(cut_pre+cut_post)`.
        fmax : Optional[float], optional
            The strict maximum frequency to be passed in all filters, by default None
        f_3db : Optional[float], optional
            The 3 dB point for a one-pole low-pass filter to be applied to all filters, by default None
        cut_pre : int
            The number of initial samples to be given zero weight, by default 0
        cut_post : int
            The number of samples at the end of a record to be given zero weight, by default 0

        Returns
        -------
        Filter
            A 5-lag optimal filter.

        Raises
        ------
        ValueError
            Under various conditions where arguments are inconsistent with the data
        """

        if self.sample_time_sec <= 0 and not (fmax is None and f_3db is None):
            raise ValueError("FilterMaker must have a sample_time_sec if it's to be smoothed with fmax or f_3db")
        if cut_pre < 0 or cut_post < 0:
            raise ValueError(f"(cut_pre,cut_post)=({cut_pre},{cut_post}), but neither can be negative")

        if self.noise_autocorr is None and self.whitener is None:
            raise ValueError("FilterMaker must have noise_autocorr or whitener arguments to generate 5-lag filters")
        noise_autocorr = self._compute_autocorr(cut_pre, cut_post)
        avg_signal, peak, _ = self._normalize_signal(cut_pre, cut_post)

        shorten = 2  # for 5-lag convolution
        truncated_signal = avg_signal[shorten:-shorten]
        n = len(truncated_signal)
        assert len(noise_autocorr) >= n, "Noise autocorrelation vector is too short for signal size"
        pulse_model = np.vstack((truncated_signal, np.ones_like(truncated_signal)))
        if constraints is not None:
            pulse_model = np.vstack((pulse_model, constraints))
        assert pulse_model.shape[1] == n

        noise_corr = noise_autocorr[:n]
        TS = ToeplitzSolver(noise_corr, symmetric=True)
        Rinv_model = np.vstack([TS(r) for r in pulse_model])
        A = pulse_model.dot(Rinv_model.T)
        all_filters = np.linalg.solve(A, Rinv_model)
        filt_noconst = all_filters[0]

        band_limit(filt_noconst, self.sample_time_sec, fmax, f_3db)

        self._normalize_5lag_filter(filt_noconst, avg_signal)
        variance = bracketR(filt_noconst, noise_corr)

        # Set weights in the cut_pre and cut_post windows to 0
        if cut_pre > 0 or cut_post > 0:
            filt_noconst = np.hstack([np.zeros(cut_pre), filt_noconst, np.zeros(cut_post)])

        vdv = peak / (8 * np.log(2) * variance) ** 0.5
        return Filter5Lag(
            filt_noconst, peak, variance, vdv, None, None, avg_signal, None, 1 + 2 * shorten, fmax, f_3db, cut_pre, cut_post
        )

    def compute_5lag(self, fmax: float | None = None, f_3db: float | None = None, cut_pre: int = 0, cut_post: int = 0) -> Filter:
        """Compute a single filter, with optional low-pass filtering, and with optional zero
        weights at the pre-trigger or post-trigger end of the filter.

        Either or both of `fmax` and `f_3db` are allowed.

        Parameters
        ----------
        fmax : Optional[float], optional
            The strict maximum frequency to be passed in all filters, by default None
        f_3db : Optional[float], optional
            The 3 dB point for a one-pole low-pass filter to be applied to all filters, by default None
        cut_pre : int
            The number of initial samples to be given zero weight, by default 0
        cut_post : int
            The number of samples at the end of a record to be given zero weight, by default 0

        Returns
        -------
        Filter
            A 5-lag optimal filter.

        Raises
        ------
        ValueError
            Under various conditions where arguments are inconsistent with the data
        """
        return self.compute_constrained_5lag(None, fmax=fmax, f_3db=f_3db, cut_pre=cut_pre, cut_post=cut_post)

    def compute_5lag_noexp(
        self, exp_time_seconds: float, fmax: float | None = None, f_3db: float | None = None, cut_pre: int = 0, cut_post: int = 0
    ) -> Filter:
        """Compute a single filter, with optional low-pass filtering, and with optional zero
        weights at the pre-trigger or post-trigger end of the filter.

        Either or both of `fmax` and `f_3db` are allowed.

        Parameters
        ----------
        exp_time_seconds: float
            Generate a filter orthogonal to decaying exponentials of this time constant (must be positive)
        fmax : Optional[float], optional
            The strict maximum frequency to be passed in all filters, by default None
        f_3db : Optional[float], optional
            The 3 dB point for a one-pole low-pass filter to be applied to all filters, by default None
        cut_pre : int
            The number of initial samples to be given zero weight, by default 0
        cut_post : int
            The number of samples at the end of a record to be given zero weight, by default 0

        Returns
        -------
        Filter
            A 5-lag optimal filter.

        Raises
        ------
        ValueError
            Under various conditions where arguments are inconsistent with the data
        """
        assert exp_time_seconds > 0
        n = len(self.signal_model) - 4 - (cut_pre + cut_post)
        log_per_sample = self.sample_time_sec / exp_time_seconds
        constraint = np.exp(-np.arange(n) * log_per_sample)
        return self.compute_constrained_5lag(constraint, fmax=fmax, f_3db=f_3db, cut_pre=cut_pre, cut_post=cut_post)

    def compute_fourier(self, fmax: float | None = None, f_3db: float | None = None, cut_pre: int = 0, cut_post: int = 0) -> Filter:
        """Compute a single Fourier-domain filter, with optional low-pass filtering, and with optional
        zero weights at the pre-trigger or post-trigger end of the filter. Fourier domain calculation
        implicitly assumes periodic boundary conditions.

        Either or both of `fmax` and `f_3db` are allowed.

        Parameters
        ----------
        fmax : Optional[float], optional
            The strict maximum frequency to be passed in all filters, by default None
        f_3db : Optional[float], optional
            The 3 dB point for a one-pole low-pass filter to be applied to all filters, by default None
        cut_pre : int
            The number of initial samples to be given zero weight, by default 0
        cut_post : int
            The number of samples at the end of a record to be given zero weight, by default 0

        Returns
        -------
        Filter
            A 5-lag optimal filter, computed in the Fourier domain.

        Raises
        ------
        ValueError
            Under various conditions where arguments are inconsistent with the data
        """
        # Make sure we have either a noise PSD or an autocorrelation or a whitener
        if self.noise_psd is None:
            raise ValueError("FilterMaker must have noise_psd to generate a Fourier filter")
        if cut_pre < 0 or cut_post < 0:
            raise ValueError(f"(cut_pre,cut_post)=({cut_pre},{cut_post}), but neither can be negative")

        avg_signal, peak, _ = self._normalize_signal(cut_pre, cut_post)
        noise_psd = np.asarray(self.noise_psd)

        # Terminology: the `avg_signal` vector will be "shortened" by `shorten` _on each end.
        # That's to permit 5-lag filtering (where we step the filter by ±2 lags either direction from 0 lag).
        # The `avg_signal` was already "reduced" in length by (cut_pre, cut_post), for a total
        # `reduction` of `2 * shorten + (cut_pre + cut_post)`.
        shorten = 2  # to use in 5-lag style
        reduction = 2 * shorten + (cut_pre + cut_post)

        truncated_avg_signal = avg_signal[shorten:-shorten]
        len_reduced_psd = len(noise_psd) - (reduction + 1) // 2
        window = 1.0
        sig_ft = np.fft.rfft(truncated_avg_signal * window)

        if len(sig_ft) != len_reduced_psd:
            raise ValueError(f"signal real DFT and noise PSD are not the same length ({len(sig_ft)} and {len_reduced_psd})")

        # Careful with PSD: "shorten" it by converting into a real space autocorrelation,
        # truncating the middle, and going back to Fourier space
        if reduction > 0:
            noise_autocorr = np.fft.irfft(noise_psd)
            noise_autocorr = np.hstack((noise_autocorr[: len_reduced_psd - 1], noise_autocorr[-len_reduced_psd:]))
            noise_psd = np.abs(np.fft.rfft(noise_autocorr))
        sig_ft_weighted = sig_ft / noise_psd

        # Band-limit
        if fmax is not None or f_3db is not None:
            f_nyquist = 0.5 / self.sample_time_sec
            freq = np.linspace(0, f_nyquist, len_reduced_psd, dtype=float)
            if fmax is not None:
                sig_ft_weighted[freq > fmax] = 0.0
            if f_3db is not None:
                sig_ft_weighted /= 1 + (freq * 1.0 / f_3db) ** 2

        sig_ft_weighted[0] = 0.0
        filt_fourier = np.fft.irfft(sig_ft_weighted) / window
        self._normalize_5lag_filter(filt_fourier, avg_signal)

        # How we compute the uncertainty depends on whether there's a noise autocorrelation result
        if self.noise_autocorr is None:
            noise_ft_squared = (len(noise_psd) - 1) / self.sample_time_sec * noise_psd
            kappa = (np.abs(sig_ft) ** 2 / noise_ft_squared)[1:].sum()
            variance_fourier = 1.0 / kappa
            print(kappa, noise_ft_squared)
        else:
            ac = np.array(self.noise_autocorr)[: len(filt_fourier)]
            variance_fourier = bracketR(filt_fourier, ac)
        vdv = peak / (8 * np.log(2) * variance_fourier) ** 0.5
        return Filter5Lag(
            filt_fourier,
            peak,
            variance_fourier,
            vdv,
            None,
            None,
            truncated_avg_signal,
            None,
            1 + 2 * shorten,
            fmax,
            f_3db,
            cut_pre,
            cut_post,
        )

    def compute_ats(self, fmax: float | None = None, f_3db: float | None = None, cut_pre: int = 0, cut_post: int = 0) -> Filter:  # noqa: PLR0914
        """Compute a single "arrival-time-safe" filter, with optional low-pass filtering,
        and with optional zero weights at the pre-trigger or post-trigger end of the filter.

        Either or both of `fmax` and `f_3db` are allowed.

        Parameters
        ----------
        fmax : Optional[float], optional
            The strict maximum frequency to be passed in all filters, by default None
        f_3db : Optional[float], optional
            The 3 dB point for a one-pole low-pass filter to be applied to all filters, by default None
        cut_pre : int
            The number of initial samples to be given zero weight, by default 0
        cut_post : int
            The number of samples at the end of a record to be given zero weight, by default 0

        Returns
        -------
        Filter
            An arrival-time-safe optimal filter.

        Raises
        ------
        ValueError
            Under various conditions where arguments are inconsistent with the data
        """
        if self.noise_autocorr is None and self.whitener is None:
            raise ValueError("FilterMaker must have noise_autocorr or whitener arguments to generate ATS filters")
        if self.dt_model is None:
            raise ValueError("FilterMaker must have dt_model to generate ATS filters")
        if self.sample_time_sec is None and not (fmax is None and f_3db is None):
            raise ValueError("FilterMaker must have a sample_time_sec if it's to be smoothed with fmax or f_3db")

        noise_autocorr = self._compute_autocorr(cut_pre, cut_post)
        avg_signal, peak, dt_model = self._normalize_signal(cut_pre, cut_post)

        ns = len(avg_signal)
        assert ns == len(dt_model)
        if cut_pre < 0 or cut_post < 0:
            raise ValueError(f"(cut_pre,cut_post)=({cut_pre},{cut_post}), but neither can be negative")
        if cut_pre + cut_post >= ns:
            raise ValueError(f"cut_pre+cut_post = {cut_pre + cut_post} but should be < {ns}")

        MT = np.vstack((avg_signal, dt_model, np.ones(ns)))

        if self.whitener is not None:
            WM = self.whitener(MT.T)
            A = np.dot(WM.T, WM)
            Ainv = np.linalg.inv(A)
            WtWM = self.whitener.applyWT(WM)
            filt = np.dot(Ainv, WtWM.T)

        else:
            assert len(noise_autocorr) >= ns
            noise_corr = noise_autocorr[:ns]
            TS = ToeplitzSolver(noise_corr, symmetric=True)

            RinvM = np.vstack([TS(r) for r in MT]).T
            A = np.dot(MT, RinvM)
            Ainv = np.linalg.inv(A)
            filt = np.dot(Ainv, RinvM.T)

        band_limit(filt.T, self.sample_time_sec, fmax, f_3db)

        if cut_pre > 0 or cut_post > 0:
            nfilt = filt.shape[0]
            filt = np.hstack([np.zeros((nfilt, cut_pre), dtype=float), filt, np.zeros((nfilt, cut_post), dtype=float)])

        filt_noconst = filt[0]
        filt_dt = filt[1]
        filt_baseline = filt[2]

        variance = bracketR(filt_noconst, self.noise_autocorr)
        vdv = peak / (np.log(2) * 8 * variance) ** 0.5
        return FilterATS(
            filt_noconst, peak, variance, vdv, filt_dt, filt_baseline, avg_signal, dt_model, 1, fmax, f_3db, cut_pre, cut_post
        )

    def _compute_autocorr(self, cut_pre: int = 0, cut_post: int = 0) -> np.ndarray:
        """Return the noise autocorrelation, if any, cut down by the requested number of values at the start and end.

        Parameters
        ----------
        cut_pre : int, optional
            How many samples to remove from the start of the each pulse record, by default 0
        cut_post : int, optional
            How many samples to remove from the end of the each pulse record, by default 0

        Returns
        -------
        np.ndarray
            The noise autocorrelation of the appropriate length. Or a length-0 array if not known.
        """
        # If there's an autocorrelation, cut it down to length.
        if self.noise_autocorr is None:
            return np.array([], dtype=float)
        N = len(np.asarray(self.signal_model))
        return np.asarray(self.noise_autocorr)[: N - (cut_pre + cut_post)]

    def _normalize_signal(self, cut_pre: int = 0, cut_post: int = 0) -> tuple[np.ndarray, float, np.ndarray]:
        """Compute the normalized signal, peak value, and first-order arrival-time model.

        Parameters
        ----------
        cut_pre : int, optional
            How many samples to remove from the start of the each pulse record, by default 0
        cut_post : int, optional
            How many samples to remove from the end of the each pulse record, by default 0

        Returns
        -------
        tuple[np.ndarray, float, np.ndarray]
            (sig, pk, dsig), where `sig` is the nominal signal model (normalized to have unit amplitude), `pk` is the
            peak values of the nominal signal, and `dsig` is the delta between `sig` that differ by one sample in
            arrival time. The `dsig` will be an empty array if no arrival-time model is known.

        Raises
        ------
        ValueError
            If negative numbers of samples are to be cut, or the entire record is to be cut.
        """
        avg_signal = np.array(self.signal_model)
        ns = len(avg_signal)
        pre_avg = avg_signal[cut_pre : self.n_pretrigger - 1].mean()

        if cut_pre < 0 or cut_post < 0:
            raise ValueError(f"(cut_pre,cut_post)=({cut_pre},{cut_post}), but neither can be negative")
        if cut_pre + cut_post >= ns:
            raise ValueError(f"cut_pre+cut_post = {cut_pre + cut_post} but should be < {ns}")

        # Unless passed in, find the signal's peak value. This is normally peak=(max-pretrigger).
        # If signal is negative-going, however, then peak=(pretrigger-min).
        if self.peak > 0.0:
            peak_signal = self.peak
        else:
            a = avg_signal[cut_pre : ns - cut_post].min()
            b = avg_signal[cut_pre : ns - cut_post].max()
            is_negative = pre_avg - a > b - pre_avg
            if is_negative:
                peak_signal = a - pre_avg
            else:
                peak_signal = b - pre_avg

        # avg_signal: normalize to have unit peak
        avg_signal -= pre_avg

        rescale = 1 / np.max(avg_signal)
        avg_signal *= rescale
        avg_signal[: self.n_pretrigger] = 0.0
        avg_signal = avg_signal[cut_pre : ns - cut_post]
        if self.dt_model is None:
            dt_model = np.array([], dtype=float)
        else:
            dt_model = self.dt_model * rescale
            dt_model = dt_model[cut_pre : ns - cut_post]
        return avg_signal, peak_signal, dt_model

    @staticmethod
    def _normalize_5lag_filter(f: np.ndarray, avg_signal: np.ndarray):
        """Rescale 5-lag filter `f` in-place so that it gives unit response to avg_signal

        Parameters
        ----------
        f : np.ndarray
            Optimal filter values, which need to be renormalized
        avg_signal : np.ndarray
            The signal to which filter `f` should give unit response
        """
        assert len(f) <= len(avg_signal) - 4
        conv = np.zeros(5, dtype=float)
        for i in range(5):
            conv[i] = np.dot(f, avg_signal[i : i + len(f)])
        x = np.linspace(-2, 2, 5)
        fit = np.polyfit(x, conv, 2)
        fit_ctr = -0.5 * fit[1] / fit[0]
        fit_peak = np.polyval(fit, fit_ctr)
        f *= 1.0 / fit_peak

    @staticmethod
    def _normalize_filter(f: np.ndarray, avg_signal: np.ndarray):
        """Rescale single-lag filter `f` in-place so that it gives unit response to avg_signal

        Parameters
        ----------
        f : np.ndarray
            Optimal filter values, which need to be renormalized
        avg_signal : np.ndarray
            The signal to which filter `f` should give unit response
        """
        assert len(f) == len(avg_signal)
        f *= 1 / np.dot(f, avg_signal)

compute_5lag(fmax=None, f_3db=None, cut_pre=0, cut_post=0)

Compute a single filter, with optional low-pass filtering, and with optional zero weights at the pre-trigger or post-trigger end of the filter.

Either or both of fmax and f_3db are allowed.

Parameters:
  • fmax (Optional[float], default: None ) –

    The strict maximum frequency to be passed in all filters, by default None

  • f_3db (Optional[float], default: None ) –

    The 3 dB point for a one-pole low-pass filter to be applied to all filters, by default None

  • cut_pre (int, default: 0 ) –

    The number of initial samples to be given zero weight, by default 0

  • cut_post (int, default: 0 ) –

    The number of samples at the end of a record to be given zero weight, by default 0

Returns:
  • Filter

    A 5-lag optimal filter.

Raises:
  • ValueError

    Under various conditions where arguments are inconsistent with the data

Source code in mass2/core/optimal_filtering.py
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
def compute_5lag(self, fmax: float | None = None, f_3db: float | None = None, cut_pre: int = 0, cut_post: int = 0) -> Filter:
    """Compute a single filter, with optional low-pass filtering, and with optional zero
    weights at the pre-trigger or post-trigger end of the filter.

    Either or both of `fmax` and `f_3db` are allowed.

    Parameters
    ----------
    fmax : Optional[float], optional
        The strict maximum frequency to be passed in all filters, by default None
    f_3db : Optional[float], optional
        The 3 dB point for a one-pole low-pass filter to be applied to all filters, by default None
    cut_pre : int
        The number of initial samples to be given zero weight, by default 0
    cut_post : int
        The number of samples at the end of a record to be given zero weight, by default 0

    Returns
    -------
    Filter
        A 5-lag optimal filter.

    Raises
    ------
    ValueError
        Under various conditions where arguments are inconsistent with the data
    """
    return self.compute_constrained_5lag(None, fmax=fmax, f_3db=f_3db, cut_pre=cut_pre, cut_post=cut_post)

compute_5lag_noexp(exp_time_seconds, fmax=None, f_3db=None, cut_pre=0, cut_post=0)

Compute a single filter, with optional low-pass filtering, and with optional zero weights at the pre-trigger or post-trigger end of the filter.

Either or both of fmax and f_3db are allowed.

Parameters:
  • exp_time_seconds (float) –

    Generate a filter orthogonal to decaying exponentials of this time constant (must be positive)

  • fmax (Optional[float], default: None ) –

    The strict maximum frequency to be passed in all filters, by default None

  • f_3db (Optional[float], default: None ) –

    The 3 dB point for a one-pole low-pass filter to be applied to all filters, by default None

  • cut_pre (int, default: 0 ) –

    The number of initial samples to be given zero weight, by default 0

  • cut_post (int, default: 0 ) –

    The number of samples at the end of a record to be given zero weight, by default 0

Returns:
  • Filter

    A 5-lag optimal filter.

Raises:
  • ValueError

    Under various conditions where arguments are inconsistent with the data

Source code in mass2/core/optimal_filtering.py
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
def compute_5lag_noexp(
    self, exp_time_seconds: float, fmax: float | None = None, f_3db: float | None = None, cut_pre: int = 0, cut_post: int = 0
) -> Filter:
    """Compute a single filter, with optional low-pass filtering, and with optional zero
    weights at the pre-trigger or post-trigger end of the filter.

    Either or both of `fmax` and `f_3db` are allowed.

    Parameters
    ----------
    exp_time_seconds: float
        Generate a filter orthogonal to decaying exponentials of this time constant (must be positive)
    fmax : Optional[float], optional
        The strict maximum frequency to be passed in all filters, by default None
    f_3db : Optional[float], optional
        The 3 dB point for a one-pole low-pass filter to be applied to all filters, by default None
    cut_pre : int
        The number of initial samples to be given zero weight, by default 0
    cut_post : int
        The number of samples at the end of a record to be given zero weight, by default 0

    Returns
    -------
    Filter
        A 5-lag optimal filter.

    Raises
    ------
    ValueError
        Under various conditions where arguments are inconsistent with the data
    """
    assert exp_time_seconds > 0
    n = len(self.signal_model) - 4 - (cut_pre + cut_post)
    log_per_sample = self.sample_time_sec / exp_time_seconds
    constraint = np.exp(-np.arange(n) * log_per_sample)
    return self.compute_constrained_5lag(constraint, fmax=fmax, f_3db=f_3db, cut_pre=cut_pre, cut_post=cut_post)

compute_ats(fmax=None, f_3db=None, cut_pre=0, cut_post=0)

Compute a single "arrival-time-safe" filter, with optional low-pass filtering, and with optional zero weights at the pre-trigger or post-trigger end of the filter.

Either or both of fmax and f_3db are allowed.

Parameters:
  • fmax (Optional[float], default: None ) –

    The strict maximum frequency to be passed in all filters, by default None

  • f_3db (Optional[float], default: None ) –

    The 3 dB point for a one-pole low-pass filter to be applied to all filters, by default None

  • cut_pre (int, default: 0 ) –

    The number of initial samples to be given zero weight, by default 0

  • cut_post (int, default: 0 ) –

    The number of samples at the end of a record to be given zero weight, by default 0

Returns:
  • Filter

    An arrival-time-safe optimal filter.

Raises:
  • ValueError

    Under various conditions where arguments are inconsistent with the data

Source code in mass2/core/optimal_filtering.py
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
def compute_ats(self, fmax: float | None = None, f_3db: float | None = None, cut_pre: int = 0, cut_post: int = 0) -> Filter:  # noqa: PLR0914
    """Compute a single "arrival-time-safe" filter, with optional low-pass filtering,
    and with optional zero weights at the pre-trigger or post-trigger end of the filter.

    Either or both of `fmax` and `f_3db` are allowed.

    Parameters
    ----------
    fmax : Optional[float], optional
        The strict maximum frequency to be passed in all filters, by default None
    f_3db : Optional[float], optional
        The 3 dB point for a one-pole low-pass filter to be applied to all filters, by default None
    cut_pre : int
        The number of initial samples to be given zero weight, by default 0
    cut_post : int
        The number of samples at the end of a record to be given zero weight, by default 0

    Returns
    -------
    Filter
        An arrival-time-safe optimal filter.

    Raises
    ------
    ValueError
        Under various conditions where arguments are inconsistent with the data
    """
    if self.noise_autocorr is None and self.whitener is None:
        raise ValueError("FilterMaker must have noise_autocorr or whitener arguments to generate ATS filters")
    if self.dt_model is None:
        raise ValueError("FilterMaker must have dt_model to generate ATS filters")
    if self.sample_time_sec is None and not (fmax is None and f_3db is None):
        raise ValueError("FilterMaker must have a sample_time_sec if it's to be smoothed with fmax or f_3db")

    noise_autocorr = self._compute_autocorr(cut_pre, cut_post)
    avg_signal, peak, dt_model = self._normalize_signal(cut_pre, cut_post)

    ns = len(avg_signal)
    assert ns == len(dt_model)
    if cut_pre < 0 or cut_post < 0:
        raise ValueError(f"(cut_pre,cut_post)=({cut_pre},{cut_post}), but neither can be negative")
    if cut_pre + cut_post >= ns:
        raise ValueError(f"cut_pre+cut_post = {cut_pre + cut_post} but should be < {ns}")

    MT = np.vstack((avg_signal, dt_model, np.ones(ns)))

    if self.whitener is not None:
        WM = self.whitener(MT.T)
        A = np.dot(WM.T, WM)
        Ainv = np.linalg.inv(A)
        WtWM = self.whitener.applyWT(WM)
        filt = np.dot(Ainv, WtWM.T)

    else:
        assert len(noise_autocorr) >= ns
        noise_corr = noise_autocorr[:ns]
        TS = ToeplitzSolver(noise_corr, symmetric=True)

        RinvM = np.vstack([TS(r) for r in MT]).T
        A = np.dot(MT, RinvM)
        Ainv = np.linalg.inv(A)
        filt = np.dot(Ainv, RinvM.T)

    band_limit(filt.T, self.sample_time_sec, fmax, f_3db)

    if cut_pre > 0 or cut_post > 0:
        nfilt = filt.shape[0]
        filt = np.hstack([np.zeros((nfilt, cut_pre), dtype=float), filt, np.zeros((nfilt, cut_post), dtype=float)])

    filt_noconst = filt[0]
    filt_dt = filt[1]
    filt_baseline = filt[2]

    variance = bracketR(filt_noconst, self.noise_autocorr)
    vdv = peak / (np.log(2) * 8 * variance) ** 0.5
    return FilterATS(
        filt_noconst, peak, variance, vdv, filt_dt, filt_baseline, avg_signal, dt_model, 1, fmax, f_3db, cut_pre, cut_post
    )

compute_constrained_5lag(constraints=None, fmax=None, f_3db=None, cut_pre=0, cut_post=0)

Compute a single constrained optimal filter, with optional low-pass filtering, and with optional zero weights at the pre-trigger or post-trigger end of the filter.

Either or both of fmax and f_3db are allowed.

Parameters:
  • constraints (ArrayLike | None, default: None ) –

    The vector or vectors to which the filter should be orthogonal. If a 2d array, each row is a constraint, and the number of columns should be equal to the len(self.signal_model) minus (cut_pre+cut_post).

  • fmax (Optional[float], default: None ) –

    The strict maximum frequency to be passed in all filters, by default None

  • f_3db (Optional[float], default: None ) –

    The 3 dB point for a one-pole low-pass filter to be applied to all filters, by default None

  • cut_pre (int, default: 0 ) –

    The number of initial samples to be given zero weight, by default 0

  • cut_post (int, default: 0 ) –

    The number of samples at the end of a record to be given zero weight, by default 0

Returns:
  • Filter

    A 5-lag optimal filter.

Raises:
  • ValueError

    Under various conditions where arguments are inconsistent with the data

Source code in mass2/core/optimal_filtering.py
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
def compute_constrained_5lag(
    self,
    constraints: npt.ArrayLike | None = None,
    fmax: float | None = None,
    f_3db: float | None = None,
    cut_pre: int = 0,
    cut_post: int = 0,
) -> Filter:
    """Compute a single constrained optimal filter, with optional low-pass filtering, and with optional zero
    weights at the pre-trigger or post-trigger end of the filter.

    Either or both of `fmax` and `f_3db` are allowed.

    Parameters
    ----------
    constraints: ndarray, optional
        The vector or vectors to which the filter should be orthogonal. If a 2d array, each _row_
        is a constraint, and the number of columns should be equal to the len(self.signal_model)
        minus `(cut_pre+cut_post)`.
    fmax : Optional[float], optional
        The strict maximum frequency to be passed in all filters, by default None
    f_3db : Optional[float], optional
        The 3 dB point for a one-pole low-pass filter to be applied to all filters, by default None
    cut_pre : int
        The number of initial samples to be given zero weight, by default 0
    cut_post : int
        The number of samples at the end of a record to be given zero weight, by default 0

    Returns
    -------
    Filter
        A 5-lag optimal filter.

    Raises
    ------
    ValueError
        Under various conditions where arguments are inconsistent with the data
    """

    if self.sample_time_sec <= 0 and not (fmax is None and f_3db is None):
        raise ValueError("FilterMaker must have a sample_time_sec if it's to be smoothed with fmax or f_3db")
    if cut_pre < 0 or cut_post < 0:
        raise ValueError(f"(cut_pre,cut_post)=({cut_pre},{cut_post}), but neither can be negative")

    if self.noise_autocorr is None and self.whitener is None:
        raise ValueError("FilterMaker must have noise_autocorr or whitener arguments to generate 5-lag filters")
    noise_autocorr = self._compute_autocorr(cut_pre, cut_post)
    avg_signal, peak, _ = self._normalize_signal(cut_pre, cut_post)

    shorten = 2  # for 5-lag convolution
    truncated_signal = avg_signal[shorten:-shorten]
    n = len(truncated_signal)
    assert len(noise_autocorr) >= n, "Noise autocorrelation vector is too short for signal size"
    pulse_model = np.vstack((truncated_signal, np.ones_like(truncated_signal)))
    if constraints is not None:
        pulse_model = np.vstack((pulse_model, constraints))
    assert pulse_model.shape[1] == n

    noise_corr = noise_autocorr[:n]
    TS = ToeplitzSolver(noise_corr, symmetric=True)
    Rinv_model = np.vstack([TS(r) for r in pulse_model])
    A = pulse_model.dot(Rinv_model.T)
    all_filters = np.linalg.solve(A, Rinv_model)
    filt_noconst = all_filters[0]

    band_limit(filt_noconst, self.sample_time_sec, fmax, f_3db)

    self._normalize_5lag_filter(filt_noconst, avg_signal)
    variance = bracketR(filt_noconst, noise_corr)

    # Set weights in the cut_pre and cut_post windows to 0
    if cut_pre > 0 or cut_post > 0:
        filt_noconst = np.hstack([np.zeros(cut_pre), filt_noconst, np.zeros(cut_post)])

    vdv = peak / (8 * np.log(2) * variance) ** 0.5
    return Filter5Lag(
        filt_noconst, peak, variance, vdv, None, None, avg_signal, None, 1 + 2 * shorten, fmax, f_3db, cut_pre, cut_post
    )

compute_fourier(fmax=None, f_3db=None, cut_pre=0, cut_post=0)

Compute a single Fourier-domain filter, with optional low-pass filtering, and with optional zero weights at the pre-trigger or post-trigger end of the filter. Fourier domain calculation implicitly assumes periodic boundary conditions.

Either or both of fmax and f_3db are allowed.

Parameters:
  • fmax (Optional[float], default: None ) –

    The strict maximum frequency to be passed in all filters, by default None

  • f_3db (Optional[float], default: None ) –

    The 3 dB point for a one-pole low-pass filter to be applied to all filters, by default None

  • cut_pre (int, default: 0 ) –

    The number of initial samples to be given zero weight, by default 0

  • cut_post (int, default: 0 ) –

    The number of samples at the end of a record to be given zero weight, by default 0

Returns:
  • Filter

    A 5-lag optimal filter, computed in the Fourier domain.

Raises:
  • ValueError

    Under various conditions where arguments are inconsistent with the data

Source code in mass2/core/optimal_filtering.py
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
def compute_fourier(self, fmax: float | None = None, f_3db: float | None = None, cut_pre: int = 0, cut_post: int = 0) -> Filter:
    """Compute a single Fourier-domain filter, with optional low-pass filtering, and with optional
    zero weights at the pre-trigger or post-trigger end of the filter. Fourier domain calculation
    implicitly assumes periodic boundary conditions.

    Either or both of `fmax` and `f_3db` are allowed.

    Parameters
    ----------
    fmax : Optional[float], optional
        The strict maximum frequency to be passed in all filters, by default None
    f_3db : Optional[float], optional
        The 3 dB point for a one-pole low-pass filter to be applied to all filters, by default None
    cut_pre : int
        The number of initial samples to be given zero weight, by default 0
    cut_post : int
        The number of samples at the end of a record to be given zero weight, by default 0

    Returns
    -------
    Filter
        A 5-lag optimal filter, computed in the Fourier domain.

    Raises
    ------
    ValueError
        Under various conditions where arguments are inconsistent with the data
    """
    # Make sure we have either a noise PSD or an autocorrelation or a whitener
    if self.noise_psd is None:
        raise ValueError("FilterMaker must have noise_psd to generate a Fourier filter")
    if cut_pre < 0 or cut_post < 0:
        raise ValueError(f"(cut_pre,cut_post)=({cut_pre},{cut_post}), but neither can be negative")

    avg_signal, peak, _ = self._normalize_signal(cut_pre, cut_post)
    noise_psd = np.asarray(self.noise_psd)

    # Terminology: the `avg_signal` vector will be "shortened" by `shorten` _on each end.
    # That's to permit 5-lag filtering (where we step the filter by ±2 lags either direction from 0 lag).
    # The `avg_signal` was already "reduced" in length by (cut_pre, cut_post), for a total
    # `reduction` of `2 * shorten + (cut_pre + cut_post)`.
    shorten = 2  # to use in 5-lag style
    reduction = 2 * shorten + (cut_pre + cut_post)

    truncated_avg_signal = avg_signal[shorten:-shorten]
    len_reduced_psd = len(noise_psd) - (reduction + 1) // 2
    window = 1.0
    sig_ft = np.fft.rfft(truncated_avg_signal * window)

    if len(sig_ft) != len_reduced_psd:
        raise ValueError(f"signal real DFT and noise PSD are not the same length ({len(sig_ft)} and {len_reduced_psd})")

    # Careful with PSD: "shorten" it by converting into a real space autocorrelation,
    # truncating the middle, and going back to Fourier space
    if reduction > 0:
        noise_autocorr = np.fft.irfft(noise_psd)
        noise_autocorr = np.hstack((noise_autocorr[: len_reduced_psd - 1], noise_autocorr[-len_reduced_psd:]))
        noise_psd = np.abs(np.fft.rfft(noise_autocorr))
    sig_ft_weighted = sig_ft / noise_psd

    # Band-limit
    if fmax is not None or f_3db is not None:
        f_nyquist = 0.5 / self.sample_time_sec
        freq = np.linspace(0, f_nyquist, len_reduced_psd, dtype=float)
        if fmax is not None:
            sig_ft_weighted[freq > fmax] = 0.0
        if f_3db is not None:
            sig_ft_weighted /= 1 + (freq * 1.0 / f_3db) ** 2

    sig_ft_weighted[0] = 0.0
    filt_fourier = np.fft.irfft(sig_ft_weighted) / window
    self._normalize_5lag_filter(filt_fourier, avg_signal)

    # How we compute the uncertainty depends on whether there's a noise autocorrelation result
    if self.noise_autocorr is None:
        noise_ft_squared = (len(noise_psd) - 1) / self.sample_time_sec * noise_psd
        kappa = (np.abs(sig_ft) ** 2 / noise_ft_squared)[1:].sum()
        variance_fourier = 1.0 / kappa
        print(kappa, noise_ft_squared)
    else:
        ac = np.array(self.noise_autocorr)[: len(filt_fourier)]
        variance_fourier = bracketR(filt_fourier, ac)
    vdv = peak / (8 * np.log(2) * variance_fourier) ** 0.5
    return Filter5Lag(
        filt_fourier,
        peak,
        variance_fourier,
        vdv,
        None,
        None,
        truncated_avg_signal,
        None,
        1 + 2 * shorten,
        fmax,
        f_3db,
        cut_pre,
        cut_post,
    )

ToeplitzWhitener dataclass

An object that can perform approximate noise whitening.

For an ARMA(p,q) noise model, mutliply by (or solve) the matrix W (or its transpose), where W is the Toeplitz approximation to the whitening matrix V. A whitening matrix V means that if R is the ARMA noise covariance matrix, then VRV' = I. While W only approximately satisfies this, it has some handy properties that make it a useful replacement. (In particular, it has the time-transpose property that if you zero-pad the beginning of vector v and shift the remaining elements, then the same is done to Wv.)

The callable function object returns Wv or WM if called with vector v or matrix M. Other methods:

  • tw.whiten(v) returns Wv; it is equivalent to tw(v)
  • tw.solveWT(v) returns inv(W')*v
  • tw.applyWT(v) returns W'v
  • tw.solveW(v) returns inv(W)*v
Arguments

theta : np.ndarray The moving-average (MA) process coefficients phi : np.ndarray The autoregressive (AR) process coefficients

Returns:
  • ToeplitzWhitener

    Object that can perform approximate, time-invariant noise whitening.

Raises:
  • ValueError

    If the operative methods are passed an array of dimension higher than 2.

Source code in mass2/core/optimal_filtering.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
@dataclass(frozen=True)
class ToeplitzWhitener:
    """An object that can perform approximate noise whitening.

    For an ARMA(p,q) noise model, mutliply by (or solve) the matrix W (or its
    transpose), where W is the Toeplitz approximation to the whitening matrix V.
    A whitening matrix V means that if R is the ARMA noise covariance matrix,
    then VRV' = I. While W only approximately satisfies this, it has some handy
    properties that make it a useful replacement. (In particular, it has the
    time-transpose property that if you zero-pad the beginning of vector v and
    shift the remaining elements, then the same is done to Wv.)

    The callable function object returns Wv or WM if called with
    vector v or matrix M. Other methods:

    * `tw.whiten(v)` returns Wv; it is equivalent to `tw(v)`
    * `tw.solveWT(v)` returns inv(W')*v
    * `tw.applyWT(v)` returns W'v
    * `tw.solveW(v)` returns inv(W)*v

    Arguments
    ---------
    theta : np.ndarray
        The moving-average (MA) process coefficients
    phi : np.ndarray
        The autoregressive (AR) process coefficients

    Returns
    -------
    ToeplitzWhitener
        Object that can perform approximate, time-invariant noise whitening.

    Raises
    ------
    ValueError
        If the operative methods are passed an array of dimension higher than 2.
    """

    theta: np.ndarray
    phi: np.ndarray

    @property
    def p(self):
        return len(self.phi) - 1

    @property
    def q(self):
        return len(self.theta) - 1

    def whiten(self, v: npt.ArrayLike) -> np.ndarray:
        "Return whitened vector (or matrix of column vectors) Wv"
        return self(v)

    def __call__(self, v: npt.ArrayLike) -> np.ndarray:
        "Return whitened vector (or matrix of column vectors) Wv"
        v = np.asarray(v)
        if v.ndim > 3:
            raise ValueError("v must be an array of dimension 1 or 2")
        elif v.ndim == 2:
            w = np.zeros_like(v)
            for i in range(v.shape[1]):
                w[:, i] = self(v[:, i])
            return w

        # Multiply by the Toeplitz AR matrix to make the MA*w vector.
        N = len(v)
        y = self.phi[0] * v
        for i in range(1, 1 + self.p):
            y[i:] += self.phi[i] * v[:-i]

        # Second, solve the MA matrix (a banded, lower-triangular Toeplitz matrix with
        # q non-zero subdiagonals.)
        y[0] /= self.theta[0]
        if N == 1:
            return y
        for i in range(1, min(self.q, N)):
            for j in range(i):
                y[i] -= y[j] * self.theta[i - j]
            y[i] /= self.theta[0]
        for i in range(self.q, N):
            for j in range(i - self.q, i):
                y[i] -= y[j] * self.theta[i - j]
            y[i] /= self.theta[0]
        return y

    def solveW(self, v: npt.ArrayLike) -> np.ndarray:
        "Return unwhitened vector (or matrix of column vectors) inv(W)*v"
        v = np.asarray(v)
        if v.ndim > 3:
            raise ValueError("v must be dimension 1 or 2")
        elif v.ndim == 2:
            r = np.zeros_like(v)
            for i in range(v.shape[1]):
                r[:, i] = self.solveW(v[:, i])
            return r

        # Multiply by the Toeplitz MA matrix to make the AR*w vector.
        N = len(v)
        y = self.theta[0] * v
        for i in range(1, 1 + self.q):
            y[i:] += self.theta[i] * v[:-i]

        # Second, solve the AR matrix (a banded, lower-triangular Toeplitz matrix with
        # p non-zero subdiagonals.)
        y[0] /= self.phi[0]
        if N == 1:
            return y
        for i in range(1, min(self.p, N)):
            for j in range(i):
                y[i] -= y[j] * self.phi[i - j]
            y[i] /= self.phi[0]
        for i in range(self.p, N):
            for j in range(i - self.p, i):
                y[i] -= y[j] * self.phi[i - j]
            y[i] /= self.phi[0]
        return y

    def solveWT(self, v: npt.ArrayLike) -> np.ndarray:
        "Return vector (or matrix of column vectors) inv(W')*v"
        v = np.asarray(v)
        if v.ndim > 3:
            raise ValueError("v must be dimension 1 or 2")
        elif v.ndim == 2:
            r = np.zeros_like(v)
            for i in range(v.shape[1]):
                r[:, i] = self.solveWT(v[:, i])
            return r

        N = len(v)
        y = np.array(v)
        y[N - 1] /= self.phi[0]
        for i in range(N - 2, -1, -1):
            f = min(self.p + 1, N - i)
            y[i] -= np.dot(y[i + 1 : i + f], self.phi[1:f])
            y[i] /= self.phi[0]
        return np.correlate(y, self.theta, "full")[self.q :]

    def applyWT(self, v: npt.ArrayLike) -> np.ndarray:
        """Return vector (or matrix of column vectors) W'v"""
        v = np.asarray(v)
        if v.ndim > 3:
            raise ValueError("v must be dimension 1 or 2")
        elif v.ndim == 2:
            r = np.zeros_like(v)
            for i in range(v.shape[1]):
                r[:, i] = self.applyWT(v[:, i])
            return r

        N = len(v)
        y = np.array(v)
        y[N - 1] /= self.theta[0]
        for i in range(N - 2, -1, -1):
            f = min(self.q + 1, N - i)
            y[i] -= np.dot(y[i + 1 : i + f], self.theta[1:f])
            y[i] /= self.theta[0]
        return np.correlate(y, self.phi, "full")[self.p :]

    def W(self, N: int) -> np.ndarray:
        """Return the full, approximate whitening matrix.

        Normally the full W is large and slow to use. But it's here so you can
        easily test that W(len(v))*v == whiten(v), and similar.
        """
        AR = np.zeros((N, N), dtype=float)
        MA = np.zeros((N, N), dtype=float)
        for i in range(N):
            for j in range(max(0, i - self.p), i + 1):
                AR[i, j] = self.phi[i - j]
            for j in range(max(0, i - self.q), i + 1):
                MA[i, j] = self.theta[i - j]
        return np.linalg.solve(MA, AR)

W(N)

Return the full, approximate whitening matrix.

Normally the full W is large and slow to use. But it's here so you can easily test that W(len(v))*v == whiten(v), and similar.

Source code in mass2/core/optimal_filtering.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
def W(self, N: int) -> np.ndarray:
    """Return the full, approximate whitening matrix.

    Normally the full W is large and slow to use. But it's here so you can
    easily test that W(len(v))*v == whiten(v), and similar.
    """
    AR = np.zeros((N, N), dtype=float)
    MA = np.zeros((N, N), dtype=float)
    for i in range(N):
        for j in range(max(0, i - self.p), i + 1):
            AR[i, j] = self.phi[i - j]
        for j in range(max(0, i - self.q), i + 1):
            MA[i, j] = self.theta[i - j]
    return np.linalg.solve(MA, AR)

__call__(v)

Return whitened vector (or matrix of column vectors) Wv

Source code in mass2/core/optimal_filtering.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def __call__(self, v: npt.ArrayLike) -> np.ndarray:
    "Return whitened vector (or matrix of column vectors) Wv"
    v = np.asarray(v)
    if v.ndim > 3:
        raise ValueError("v must be an array of dimension 1 or 2")
    elif v.ndim == 2:
        w = np.zeros_like(v)
        for i in range(v.shape[1]):
            w[:, i] = self(v[:, i])
        return w

    # Multiply by the Toeplitz AR matrix to make the MA*w vector.
    N = len(v)
    y = self.phi[0] * v
    for i in range(1, 1 + self.p):
        y[i:] += self.phi[i] * v[:-i]

    # Second, solve the MA matrix (a banded, lower-triangular Toeplitz matrix with
    # q non-zero subdiagonals.)
    y[0] /= self.theta[0]
    if N == 1:
        return y
    for i in range(1, min(self.q, N)):
        for j in range(i):
            y[i] -= y[j] * self.theta[i - j]
        y[i] /= self.theta[0]
    for i in range(self.q, N):
        for j in range(i - self.q, i):
            y[i] -= y[j] * self.theta[i - j]
        y[i] /= self.theta[0]
    return y

applyWT(v)

Return vector (or matrix of column vectors) W'v

Source code in mass2/core/optimal_filtering.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def applyWT(self, v: npt.ArrayLike) -> np.ndarray:
    """Return vector (or matrix of column vectors) W'v"""
    v = np.asarray(v)
    if v.ndim > 3:
        raise ValueError("v must be dimension 1 or 2")
    elif v.ndim == 2:
        r = np.zeros_like(v)
        for i in range(v.shape[1]):
            r[:, i] = self.applyWT(v[:, i])
        return r

    N = len(v)
    y = np.array(v)
    y[N - 1] /= self.theta[0]
    for i in range(N - 2, -1, -1):
        f = min(self.q + 1, N - i)
        y[i] -= np.dot(y[i + 1 : i + f], self.theta[1:f])
        y[i] /= self.theta[0]
    return np.correlate(y, self.phi, "full")[self.p :]

solveW(v)

Return unwhitened vector (or matrix of column vectors) inv(W)*v

Source code in mass2/core/optimal_filtering.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def solveW(self, v: npt.ArrayLike) -> np.ndarray:
    "Return unwhitened vector (or matrix of column vectors) inv(W)*v"
    v = np.asarray(v)
    if v.ndim > 3:
        raise ValueError("v must be dimension 1 or 2")
    elif v.ndim == 2:
        r = np.zeros_like(v)
        for i in range(v.shape[1]):
            r[:, i] = self.solveW(v[:, i])
        return r

    # Multiply by the Toeplitz MA matrix to make the AR*w vector.
    N = len(v)
    y = self.theta[0] * v
    for i in range(1, 1 + self.q):
        y[i:] += self.theta[i] * v[:-i]

    # Second, solve the AR matrix (a banded, lower-triangular Toeplitz matrix with
    # p non-zero subdiagonals.)
    y[0] /= self.phi[0]
    if N == 1:
        return y
    for i in range(1, min(self.p, N)):
        for j in range(i):
            y[i] -= y[j] * self.phi[i - j]
        y[i] /= self.phi[0]
    for i in range(self.p, N):
        for j in range(i - self.p, i):
            y[i] -= y[j] * self.phi[i - j]
        y[i] /= self.phi[0]
    return y

solveWT(v)

Return vector (or matrix of column vectors) inv(W')*v

Source code in mass2/core/optimal_filtering.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def solveWT(self, v: npt.ArrayLike) -> np.ndarray:
    "Return vector (or matrix of column vectors) inv(W')*v"
    v = np.asarray(v)
    if v.ndim > 3:
        raise ValueError("v must be dimension 1 or 2")
    elif v.ndim == 2:
        r = np.zeros_like(v)
        for i in range(v.shape[1]):
            r[:, i] = self.solveWT(v[:, i])
        return r

    N = len(v)
    y = np.array(v)
    y[N - 1] /= self.phi[0]
    for i in range(N - 2, -1, -1):
        f = min(self.p + 1, N - i)
        y[i] -= np.dot(y[i + 1 : i + f], self.phi[1:f])
        y[i] /= self.phi[0]
    return np.correlate(y, self.theta, "full")[self.q :]

whiten(v)

Return whitened vector (or matrix of column vectors) Wv

Source code in mass2/core/optimal_filtering.py
65
66
67
def whiten(self, v: npt.ArrayLike) -> np.ndarray:
    "Return whitened vector (or matrix of column vectors) Wv"
    return self(v)

band_limit(modelmatrix, sample_time_sec, fmax, f_3db)

Band-limit the column-vectors in a model matrix with a hard and/or 1-pole low-pass filter. Change the input modelmatrix in-place.

No effect if both fmax and f_3db are None.

Parameters:
  • modelmatrix (ndarray) –

    The 1D or 2D array to band-limit. (If a 2D array, columns are independently band-limited.)

  • sample_time_sec (float) –

    The sampling period, normally in seconds.

  • fmax (Optional[float]) –

    The hard maximum frequency (units are inverse of sample_time_sec units, or Hz)

  • f_3db (Optional[float]) –

    The 1-pole low-pass filter's 3 dB point (same units as fmax)

Source code in mass2/core/optimal_filtering.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
def band_limit(modelmatrix: np.ndarray, sample_time_sec: float, fmax: float | None, f_3db: float | None):
    """Band-limit the column-vectors in a model matrix with a hard and/or
    1-pole low-pass filter. Change the input `modelmatrix` in-place.

    No effect if both `fmax` and `f_3db` are `None`.

    Parameters
    ----------
    modelmatrix : np.ndarray
        The 1D or 2D array to band-limit. (If a 2D array, columns are independently band-limited.)
    sample_time_sec : float
        The sampling period, normally in seconds.
    fmax : Optional[float]
        The hard maximum frequency (units are inverse of `sample_time_sec` units, or Hz)
    f_3db : Optional[float]
        The 1-pole low-pass filter's 3 dB point (same units as `fmax`)
    """
    if fmax is None and f_3db is None:
        return

    # Handle the 2D case by calling this function once per column.
    assert len(modelmatrix.shape) <= 2
    if len(modelmatrix.shape) == 2:
        for i in range(modelmatrix.shape[1]):
            band_limit(modelmatrix[:, i], sample_time_sec, fmax, f_3db)
        return

    vector = modelmatrix
    filt_length = len(vector)
    sig_ft = np.fft.rfft(vector)
    freq = np.fft.fftfreq(filt_length, d=sample_time_sec)
    freq = np.abs(freq[: len(sig_ft)])
    if fmax is not None:
        sig_ft[freq > fmax] = 0.0
    if f_3db is not None:
        sig_ft /= 1.0 + (1.0 * freq / f_3db) ** 2

    # n=filt_length is needed when filt_length is ODD
    vector[:] = np.fft.irfft(sig_ft, n=filt_length)

bracketR(q, noise)

Return the dot product (q^T R q) for vector and matrix R constructed from the vector by R_ij = noise_|i-j|. We don't want to construct the full matrix R because for records as long as 10,000 samples, the matrix will consist of 10^8 floats (800 MB of memory).

Source code in mass2/core/optimal_filtering.py
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
def bracketR(q, noise):
    """Return the dot product (q^T R q) for vector <q> and matrix R constructed from
    the vector <noise> by R_ij = noise_|i-j|.  We don't want to construct the full matrix
    R because for records as long as 10,000 samples, the matrix will consist of 10^8 floats
    (800 MB of memory).
    """

    if len(noise) < len(q):
        raise ValueError(f"Vector q (length {len(q)}) cannot be longer than the noise (length {len(noise)})")
    n = len(q)
    r = np.zeros(2 * n - 1, dtype=float)
    r[n - 1 :] = noise[:n]
    r[n - 1 :: -1] = noise[:n]
    dot = 0.0
    for i in range(n):
        dot += q[i] * r[n - i - 1 : 2 * n - i - 1].dot(q)
    return dot

HCI lines