Skip to content

Bin by time

el_paso.processing.bin_by_time

Classes

el_paso.processing.bin_by_time.TimeBinMethod

Bases: Enum

Enum for time binning methods.

Attributes:

Name Type Description
Mean str

Calculates the mean of the data.

NanMean str

Calculates the mean of the data, ignoring NaNs.

Median str

Calculates the median of the data.

NanMedian str

Calculates the median of the data, ignoring NaNs.

Merge str

Concatenates the data.

NanMax str

Calculates the maximum of the data, ignoring NaNs.

NanMin str

Calculates the minimum of the data, ignoring NaNs.

NoBinning str

Applies no binning.

Repeat str

Repeats the data.

Unique str

Returns unique values from the data.

Source code in el_paso/processing/bin_by_time.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
class TimeBinMethod(Enum):
    """Enum for time binning methods.

    Attributes:
        Mean (str): Calculates the mean of the data.
        NanMean (str): Calculates the mean of the data, ignoring NaNs.
        Median (str): Calculates the median of the data.
        NanMedian (str): Calculates the median of the data, ignoring NaNs.
        Merge (str): Concatenates the data.
        NanMax (str): Calculates the maximum of the data, ignoring NaNs.
        NanMin (str): Calculates the minimum of the data, ignoring NaNs.
        NoBinning (str): Applies no binning.
        Repeat (str): Repeats the data.
        Unique (str): Returns unique values from the data.
    """

    Mean = "Mean"
    NanMean = "NanMean"
    Median = "Median"
    NanMedian = "NanMedian"
    Merge = "Merge"
    NanMax = "NanMax"
    NanMin = "NanMin"
    NoBinning = "NoBinning"
    Repeat = "Repeat"
    Unique = "Unique"

    def __call__(self, data: NDArray[np.generic], drop_percent: float = 0) -> NDArray[np.generic]:
        """Applies the binning method to the provided data.

        Args:
            data (NDArray[np.generic]): The input data array to be binned or aggregated.
            drop_percent (float, optional): The percentage of the lowest and highest
                values to drop before performing a statistical aggregation.
                Defaults to 0.

        Returns:
            NDArray[np.generic]: The resulting array after applying the selected
                binning or aggregation method.

        Raises:
            TypeError: If the selected binning method requires numeric types and the
                input data is not numeric.
        """
        binned_array: NDArray[np.generic]

        if self.value in ["Mean", "NanMean", "Median", "NanMedian", "NanMax", "NanMin"] and not np.issubdtype(
            data.dtype, np.number
        ):
            msg = f"{self.value} time bin method is only supported for numeric types!"
            raise TypeError(msg)

        num_to_remove = int(len(data) * drop_percent / 100)
        if num_to_remove > 0 and np.issubdtype(data.dtype, np.number):
            data = np.sort(data, axis=0)
            data = data[num_to_remove:-num_to_remove]

        match self.value:
            case "Mean":
                data = typing.cast("NDArray[np.floating]", data)
                binned_array = np.mean(data, axis=0)
            case "NanMean":
                data = typing.cast("NDArray[np.floating]", data)
                binned_array = np.nanmean(data, axis=0)
            case "Median":
                data = typing.cast("NDArray[np.floating]", data)
                binned_array = np.nanmedian(data, axis=0)
            case "NanMedian":
                data = typing.cast("NDArray[np.floating]", data)
                binned_array = np.nanmedian(data, axis=0)
            case "Merge":
                binned_array = np.concatenate(data, axis=0)
            case "NanMax":
                binned_array = np.nanmax(data, axis=0)
            case "NanMin":
                binned_array = np.nanmin(data, axis=0)
            case "NoBinning":
                binned_array = data
            case "Repeat":
                binned_array = data
            case "Unique":
                binned_array = np.unique(data, axis=0)

                if data.dtype.kind in {"U", "S"}:
                    binned_array = np.asarray(["".join(binned_array)])

        return binned_array
Methods:
__call__
__call__

Applies the binning method to the provided data.

Parameters:

Name Type Description Default
data NDArray[generic]

The input data array to be binned or aggregated.

required
drop_percent float

The percentage of the lowest and highest values to drop before performing a statistical aggregation. Defaults to 0.

0

Returns:

Type Description
NDArray[generic]

NDArray[np.generic]: The resulting array after applying the selected binning or aggregation method.

Raises:

Type Description
TypeError

If the selected binning method requires numeric types and the input data is not numeric.

Source code in el_paso/processing/bin_by_time.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def __call__(self, data: NDArray[np.generic], drop_percent: float = 0) -> NDArray[np.generic]:
    """Applies the binning method to the provided data.

    Args:
        data (NDArray[np.generic]): The input data array to be binned or aggregated.
        drop_percent (float, optional): The percentage of the lowest and highest
            values to drop before performing a statistical aggregation.
            Defaults to 0.

    Returns:
        NDArray[np.generic]: The resulting array after applying the selected
            binning or aggregation method.

    Raises:
        TypeError: If the selected binning method requires numeric types and the
            input data is not numeric.
    """
    binned_array: NDArray[np.generic]

    if self.value in ["Mean", "NanMean", "Median", "NanMedian", "NanMax", "NanMin"] and not np.issubdtype(
        data.dtype, np.number
    ):
        msg = f"{self.value} time bin method is only supported for numeric types!"
        raise TypeError(msg)

    num_to_remove = int(len(data) * drop_percent / 100)
    if num_to_remove > 0 and np.issubdtype(data.dtype, np.number):
        data = np.sort(data, axis=0)
        data = data[num_to_remove:-num_to_remove]

    match self.value:
        case "Mean":
            data = typing.cast("NDArray[np.floating]", data)
            binned_array = np.mean(data, axis=0)
        case "NanMean":
            data = typing.cast("NDArray[np.floating]", data)
            binned_array = np.nanmean(data, axis=0)
        case "Median":
            data = typing.cast("NDArray[np.floating]", data)
            binned_array = np.nanmedian(data, axis=0)
        case "NanMedian":
            data = typing.cast("NDArray[np.floating]", data)
            binned_array = np.nanmedian(data, axis=0)
        case "Merge":
            binned_array = np.concatenate(data, axis=0)
        case "NanMax":
            binned_array = np.nanmax(data, axis=0)
        case "NanMin":
            binned_array = np.nanmin(data, axis=0)
        case "NoBinning":
            binned_array = data
        case "Repeat":
            binned_array = data
        case "Unique":
            binned_array = np.unique(data, axis=0)

            if data.dtype.kind in {"U", "S"}:
                binned_array = np.asarray(["".join(binned_array)])

    return binned_array

Functions:

el_paso.processing.bin_by_time.bin_by_time

bin_by_time

Bins one or more variables by time according to specified methods and cadence.

This function takes a time variable and a dictionary of other variables, then bins these variables over time. Each variable can have a specific binning method applied (e.g., mean, median, sum). The binning is performed over defined time intervals (cadence) with a specified alignment.

Parameters:

Name Type Description Default
time_variable Variable

The master time variable that defines the time basis for all other variables. Its data should be in a time unit (e.g., ep.units.posixtime or ep.units.datenum).

required
variables dict[str, Variable]

A dictionary where keys are variable names (str) and values are the ep.Variable objects to be binned.

required
time_bin_method_dict dict[str, TimeBinMethod]

A dictionary mapping variable names (str) to ep.TimeBinMethod enums, specifying how each variable should be binned within each time window. If a variable is not present in this dictionary, it will be skipped.

required
time_binning_cadence timedelta

A datetime.timedelta object specifying the duration of each time bin.

required
window_alignement Literal['center', 'left', 'right']

Determines how the time windows are aligned. Defaults to "center". * "center": The time bin represents the center of the window. * "left": The time bin represents the left (start) of the window. * "right": The time bin represents the right (end) of the window.

'center'
start_time datetime | None

Optional. A datetime.datetime object specifying the start time for binning. If None, the start time of time_variable is used.

None
end_time datetime | None

Optional. A datetime.datetime object specifying the end time for binning. If None, the end time of time_variable is used.

None
drop_percent float

Optional. The percentage of the lowest and highest values to drop from each time bin before calculating statistical aggregates like mean or median. Defaults to 0.

0

Returns:

Type Description
Variable

ep.Variable: An ep.Variable object representing the new binned time axis. The

Variable

variables dictionary passed as an argument is modified in place, with

Variable

each variables's data updated to its binned values.

Raises:

Type Description
ValueError

If the first dimension size of any variable's data does not match the length of the time_variable data.

Source code in el_paso/processing/bin_by_time.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
@timed_function()
def bin_by_time(
    time_variable: ep.Variable,
    variables: dict[str, ep.Variable],
    time_bin_method_dict: dict[str, TimeBinMethod],
    time_binning_cadence: timedelta,
    window_alignement: Literal["center", "left", "right"] = "center",
    start_time: datetime | None = None,
    end_time: datetime | None = None,
    drop_percent: float = 0,
) -> ep.Variable:
    """Bins one or more variables by time according to specified methods and cadence.

    This function takes a time variable and a dictionary of other variables, then
    bins these variables over time. Each variable can have a specific binning
    method applied (e.g., mean, median, sum). The binning is performed over
    defined time intervals (cadence) with a specified alignment.

    Args:
        time_variable (ep.Variable): The master time variable that defines the
            time basis for all other variables. Its data should be in a time
            unit (e.g., `ep.units.posixtime` or `ep.units.datenum`).
        variables (dict[str, ep.Variable]): A dictionary where keys are variable names (str) and values
            are the `ep.Variable` objects to be binned.
        time_bin_method_dict (dict[str, ep.TimeBinMethod]): A dictionary mapping variable names (str) to
            `ep.TimeBinMethod` enums, specifying how each variable should be
            binned within each time window. If a variable is not present in
            this dictionary, it will be skipped.
        time_binning_cadence (timedelta): A `datetime.timedelta` object specifying the
            duration of each time bin.
        window_alignement (Literal["center", "left", "right"]): Determines how the time windows are aligned.
            Defaults to "center".
            * "center": The time bin represents the center of the window.
            * "left": The time bin represents the left (start) of the window.
            * "right": The time bin represents the right (end) of the window.
        start_time (datetime | None): Optional. A `datetime.datetime` object specifying the
            start time for binning. If None, the start time of `time_variable`
            is used.
        end_time (datetime | None): Optional. A `datetime.datetime` object specifying the end
            time for binning. If None, the end time of `time_variable` is used.
        drop_percent (float): Optional. The percentage of the lowest and highest values to
            drop from each time bin before calculating statistical aggregates
            like mean or median. Defaults to 0.

    Returns:
        ep.Variable: An `ep.Variable` object representing the new binned time axis. The
        `variables` dictionary passed as an argument is modified in place, with
        each variables's data updated to its binned values.

    Raises:
        ValueError: If the first dimension size of any variable's data does not
            match the length of the `time_variable` data.
    """
    logger = logging.getLogger(__name__)
    logger.info("Binning by time...")

    start_time = start_time or datenum_to_datetime(time_variable.get_data(ep.units.datenum)[0])
    end_time = end_time or datenum_to_datetime(time_variable.get_data(ep.units.datenum)[-1])

    original_cadence = float(np.nanmedian(np.diff(time_variable.get_data(ep.units.posixtime))))

    binned_time, time_bins = _create_binned_time_and_bins(start_time, end_time, time_binning_cadence, window_alignement)

    # Cache digitized indices for every time variable
    index_iterables = None

    for key, var in variables.items():
        if key not in time_bin_method_dict:
            continue

        # Just repeat in case of no time dependency
        if time_bin_method_dict[key] == ep.TimeBinMethod.Repeat:
            var.set_data(np.repeat(var.get_data()[np.newaxis, ...], len(binned_time), axis=0), "same")
            var.metadata.original_cadence_seconds = 0
            continue

        # check if time variable and data content sizes match
        if var.get_data().shape[0] != len(time_variable.get_data()):
            msg = f"Variable {key}: size of dimension 0 does not match length of time variable!"
            raise ValueError(msg)

        # calculate bin indices for given time array if it has not been calculated before
        if not index_iterables:
            timestamps = typing.cast("NDArray[np.floating]", time_variable.get_data(ep.units.posixtime))
            index_iterables = _calculate_index_iterables(timestamps, time_bins)

        unique_indices, indices_separation = index_iterables

        # Initialize binned_data as an array of np.nans with the same shape as self._data,
        # but with the length of the first dimension matching the length of time_array
        if var.get_data().dtype.kind in {"U", "S", "O"}:  # Check if the data is string or object type
            binned_data = np.full((len(binned_time),), "", dtype=var.get_data().dtype)
        else:
            binned_data_shape = (len(binned_time), *var.get_data().shape[1:])
            binned_data = np.full(binned_data_shape, np.nan)

        # Iterate over unique indices
        for i, unique_index in enumerate(unique_indices):
            bin_data = var.get_data()[indices_separation[i] : indices_separation[i + 1]]
            if len(bin_data) == 0:
                continue  # no data found
            if bin_data.dtype.kind in {"i", "f"} and not np.any(np.isfinite(bin_data)):
                continue  # no finite data found
            binned_value = time_bin_method_dict[key](bin_data, drop_percent=drop_percent)

            # Update the relevant slice of binned_data
            binned_data[unique_index, ...] = binned_value

        # Update relevant metadata fields
        # Ensure binned_data works for both numeric and string data
        if isinstance(binned_data[0], str):
            var.set_data(np.array(binned_data, dtype=object), "same")
        else:
            var.set_data(np.array(binned_data), "same")

        # update metadata
        var.metadata.original_cadence_seconds = original_cadence
        var.metadata.add_processing_note(
            f"Time binned with method {time_bin_method_dict[key].value}"
            f" and cadence of {time_binning_cadence.total_seconds() / 60} minutes"
        )

    new_time_var = ep.Variable(data=binned_time, original_unit=ep.units.posixtime)
    new_time_var.metadata.add_processing_note("Created while time binning.")

    return new_time_var