Skip to content

Typing

el_paso.typing.ConsistencyCheck dataclass

A utility class for checking the consistency of data dimensions.

This class helps verify that multiple variables saved to a file have the same length for shared dimensions (e.g., time, pitch angle, energy).

Attributes:

Name Type Description
len_time _SizeAttr | None

Stores the size of the time dimension from the first variable checked.

len_pitch_angle _SizeAttr | None

Stores the size of the pitch angle dimension from the first variable checked.

len_energy _SizeAttr | None

Stores the size of the energy dimension from the first variable checked.

Source code in el_paso/data_standard.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
@dataclass
class ConsistencyCheck:
    """A utility class for checking the consistency of data dimensions.

    This class helps verify that multiple variables saved to a file have
    the same length for shared dimensions (e.g., time, pitch angle, energy).

    Attributes:
        len_time (_SizeAttr | None): Stores the size of the time dimension from
                                     the first variable checked.
        len_pitch_angle (_SizeAttr | None): Stores the size of the pitch angle
                                            dimension from the first variable checked.
        len_energy (_SizeAttr | None): Stores the size of the energy dimension
                                       from the first variable checked.
    """

    lengths: dict[str | int, _SizeAttr] = field(default_factory=dict[str | int, _SizeAttr])

    def check(self, data_shape: tuple[int, ...], dim_names_or_sizes: Sequence[str | int], var_name: str) -> None:
        if len(data_shape) != len(dim_names_or_sizes):
            msg = "Encountered size missmatch!"
            raise ValueError(msg)

        for i, dim_name_or_size in enumerate(dim_names_or_sizes):
            self.check_size(data_shape[i], dim_name_or_size, var_name)

    def check_size(self, provided_len: int, dim_name_or_size: str | int, var_name: str) -> None:
        if isinstance(dim_name_or_size, int):
            if dim_name_or_size != provided_len:
                msg = (
                    f"Length mismatch! Variable {var_name} should have length {dim_name_or_size}, "
                    f"but encountered {provided_len}!",
                )
                raise ValueError(msg)
            return

        if dim_name_or_size in self.lengths:
            if self.lengths[dim_name_or_size].size != provided_len:
                msg = (
                    f"Length mismatch! {dim_name_or_size} length of variable "
                    f"{self.lengths[dim_name_or_size].name}: {self.lengths[dim_name_or_size].size} "
                    f"and of variable {var_name}: {provided_len}"
                )
                raise ValueError(msg)
        else:
            self.lengths[dim_name_or_size] = _SizeAttr(var_name, provided_len)

el_paso.typing.DataStandard

Bases: ABC, Generic[T_co]

Abstract base class for data standardization.

Source code in el_paso/data_standard.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
class DataStandard(ABC, Generic[T_co]):
    """Abstract base class for data standardization."""

    variable_infos: dict[InternalName, VariableInfo[T_co]]

    def __repr__(self) -> str:
        cls = type(self)

        constructor_params = inspect.signature(cls.__init__).parameters
        args = []

        for name in constructor_params:
            if name == "self":
                continue

            if hasattr(self, name):
                value = getattr(self, name)
                args.append(f"{name}={value!r}")

        return f"{cls.__name__}({', '.join(args)})"

    def __str__(self) -> str:
        return self.__repr__()

    def get_internal_name(self, standard_name: StandardName) -> InternalName | None:
        for internal_name, var_info in self.variable_infos.items():
            if var_info.standard_name == standard_name:
                return internal_name

        return None

    def get_standard_name(self, internal_name: InternalName) -> T_co:

        if internal_name not in self.variable_infos:
            msg = f"Internal name {internal_name} is not part of the {type(self)}!"
            raise ValueError(msg)

        return self.variable_infos[internal_name].standard_name

    def get_dependencies(self, internal_name: InternalName) -> list[InternalName | str]:
        return self.variable_infos[internal_name].dependencies

    def standardize_variable(
        self, internal_name: InternalName, variable: Variable, *, reset_consistency_check: bool
    ) -> Variable:
        """Standardizes a variable according to the data standard's rules.

        This abstract method takes avariable and a standard name,
        and returns a new `el_paso.Variable` that conforms to the specified standard.

        Args:
            internal_name (str): The name of the standard to apply to the variable.
            variable (Variable): The variable to be standardized.
            reset_consistency_check (bool): If set to true, the consistency check will be reseted.

        Returns:
            Variable: The standardized variable.
        """
        if reset_consistency_check:
            self.consistency_check = ConsistencyCheck()

        if internal_name not in self.variable_infos:
            logger.warning(f"Encountered custom variable which cannot be standardized: {internal_name}")
            return variable

        variable_info = self.variable_infos[internal_name]

        variable.convert_to_unit(variable_info.unit)
        if len(variable.metadata.description) == 0:
            variable.metadata.description = variable_info.description
        assert_n_dim(variable, len(variable_info.dependencies), internal_name)
        self.consistency_check.check(variable.get_data().shape, variable_info.dependencies, internal_name)

        return variable

    def __eq__(self, other: object) -> bool:
        if not isinstance(other, DataStandard):
            return NotImplemented
        return type(self) is type(other) and self.variable_infos == other.variable_infos

el_paso.typing.DensityNetCDFStrategy

Bases: MonthlyRBStrategy

Saving strategy for writing plasma density and related data to monthly NetCDF files.

This strategy extends MonthlyRBStrategy but implements saving to the NetCDF format (.nc), primarily targeting the time-series of density, position, and coordinate variables (e.g., L-star, MLT).

The variables included and their dependencies are configured based on whether the data is associated with the "RBSP" satellites or "Other".

Attributes:

Name Type Description
output_files list[OutputFile]

List of file configurations to be produced.

file_path Path

Base path for output files (inherited).

dependency_dict dict[str, list[str]]

Defines the NetCDF dimension names (e.g., 'time', 'xGEO_components') that each variable depends on.

Source code in el_paso/saving_strategies/density_netcdf_strategy.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
class DensityNetCDFStrategy(MonthlyRBStrategy):
    """Saving strategy for writing plasma density and related data to monthly NetCDF files.

    This strategy extends `MonthlyRBStrategy` but implements saving to the NetCDF
    format (`.nc`), primarily targeting the time-series of density, position, and
    coordinate variables (e.g., L-star, MLT).

    The variables included and their dependencies are configured based on whether
    the data is associated with the **"RBSP"** satellites or **"Other"**.

    Attributes:
        output_files (list[OutputFile]): List of file configurations to be produced.
        file_path (Path): Base path for output files (inherited).
        dependency_dict (dict[str, list[str]]): Defines the NetCDF dimension names
            (e.g., 'time', 'xGEO_components') that each variable depends on.
    """

    output_files: list[OutputFile]

    file_path: Path
    dependency_dict: dict[str, list[str]]

    def __init__(
        self,
        base_data_path: str | Path,
        mission: str,
        instrument: str,
        mag_field: MagneticFieldLiteral,
        satellite: Literal["RBSP", "Other"] = "Other",
        data_standard: Optional[DataStandard[StandardName]] = None,
    ) -> None:
        """Initializes the monthly NetCDF saving strategy.

        Parameters:
            base_data_path (str | Path): The base directory where the output NetCDF files will be saved.
            file_name_stem (str): The base name for the output files (e.g., "my_data").
            mag_field (MagneticFieldLiteral):
                A string specifying the magnetic field model used.
            satellite (Literal["RBSP", "Other"], optional):
                            Specifies the satellite associated with the data. This is often used to trigger
                            specific metadata or formatting conventions. Defaults to "Other".
            data_standard (DataStandard | None, optional):
            data_standard (DataStandard | None):
                An optional `DataStandard` instance to use for standardizing variables.
                If `None`, `ep.data_standards.PRBEMStandard` is used by default.
        """
        self.mag_field = mag_field
        self.data_standard = data_standard or ep.data_standards.PRBEMStandard()

        super().__init__(
            base_data_path=base_data_path,
            satellite=satellite,
            mission=mission,
            instrument=instrument,
            mag_field=self.mag_field,
            file_format="nc",
            data_standard=data_standard,
        )

        output_file_entries = [
            "time",
            "xGEO",
            "MLT",
            "R_eq",
            "Lstar",
            "xGEO_eq",
        ]

        self.dependency_dict = {
            "time": ["time"],
            "xGEO": ["time", "xGEO_components"],
            "MLT": ["time"],
            "R_eq": ["time"],
            "xGEO_eq": ["time", "xGEO_components"],
            "Lstar": ["time"],
        }

        if satellite == "Other":
            output_file_entries += ["density_local", "density_eq"]
            self.dependency_dict |= {"density_local": ["time"], "density_eq": ["time"]}

        elif satellite == "RBSP":
            output_file_entries += [
                "density_emfisis_local",
                "density_efw_local",
                "density_hiss_derived_local",
                "density_emfisis_eq",
                "density_efw_eq",
                "density_hiss_derived_eq",
            ]

            self.dependency_dict |= {
                "density_emfisis_local": ["time"],
                "density_efw_local": ["time"],
                "density_hiss_derived_local": ["time"],
                "density_emfisis_eq": ["time"],
                "density_efw_eq": ["time"],
                "density_hiss_derived_eq": ["time"],
            }

        else:
            msg = "Enountered invalid satellite! Valid names are: 'RBSP', 'Other'."
            raise ValueError(msg)

        self.output_files = [
            OutputFile("full", output_file_entries, save_incomplete=True),
        ]

    def _calculate_dimensions(self, data_dict: dict[str, np.ndarray]) -> dict[str, int]:
        """Calculate density NetCDF dimension sizes from the data dictionary."""
        dimensions = {"time": data_dict["time"].shape[0]}

        has_local_position = "xGEO" in data_dict and data_dict["xGEO"].size > 0
        has_equatorial_position = "xGEO_eq" in data_dict and data_dict["xGEO_eq"].size > 0
        if has_local_position or has_equatorial_position:
            dimensions["xGEO_components"] = 3

        return dimensions

    def standardize_variable(
        self, variable: ep.Variable, name_in_file: str, *, first_call_of_interval: bool
    ) -> ep.Variable:
        """Standardizes a variable based on the configured `DataStandard`.

        This method delegates the standardization process to a `DataStandard` instance,
        ensuring that the variable's units and dimensions are consistent with the
        defined standard.

        Parameters:
            variable (ep.Variable): The variable instance to be standardized.
            name_in_file (str): The name of the variable as it will appear in the file.
            first_call_of_interval (bool): Flag to indicate if it is the first call of a time interval

        Returns:
            ep.Variable: The standardized variable.
        """
        return self.data_standard.standardize_variable(
            name_in_file, variable, reset_consistency_check=first_call_of_interval
        )

el_paso.typing.FileLoader module-attribute

FileLoader: TypeAlias = Callable[
    [Path], dict[StandardName, Any]
]

Callable that loads a data file into a dictionary keyed by standard variable names.

el_paso.typing.FileWriter

Bases: Protocol

Callable interface for writing standardized EL-PASO data to disk.

Source code in el_paso/typing.py
154
155
156
157
158
159
160
161
162
163
164
class FileWriter(Protocol):
    """Callable interface for writing standardized EL-PASO data to disk."""

    def __call__(
        self,
        file_path: Path,
        data_dict: SavedDataDict,
        data_standard: DataStandard,
    ) -> None:
        """Write `data_dict` to `file_path` using `data_standard`."""
        ...

el_paso.typing.GFZVarNames module-attribute

GFZVarNames: TypeAlias = Literal[
    "time",
    "xGEO",
    "energy_channels",
    "Flux",
    "alpha_local",
    "alpha_eq_model",
    "PSD",
    "MLT",
    "MLat",
    "Lstar",
    "Lm",
    "B_eq",
    "B_sat",
    "B_total",
    "R0",
    "InvMu",
    "InvK",
    "alpha_local_range",
    "alpha_eq_range",
    "alpha_lc",
    "alpha_lc_eq",
    "geo_alt",
    "geo_lat",
    "geo_lon",
    "freq",
    "ellipticity",
    "planarity",
    "freq_bw",
    "BB",
    "wave_wna",
]

Variable names used by the GFZ output standard.

el_paso.typing.InternalName module-attribute

InternalName: TypeAlias = (
    Literal[
        "FEDU",
        "FEDO",
        "FEIU",
        "Energy_FEDU",
        "Energy_FEIU",
        "Epoch",
        "Alpha",
        "Alpha_range",
        "Alpha_Eq_range",
        "Position",
        "MLat",
        "PSD",
        "Position_geo_alt",
        "Position_geo_lat",
        "Position_geo_lon",
        "Number_density",
        "Wave_normal_angle",
        "Wave_ellipticity",
        "Wave_planarity",
        "Wave_frequency",
        "Magnetic_Power_Spectral_Density",
        "Wave_frequency_bandwidth",
        "B_total_obs",
    ]
    | MagFieldVarTypes
)

el_paso.typing.MFSFormats module-attribute

MFSFormats: TypeAlias = Literal[
    "nc", "cdf", "h5", "mat", ".nc", ".cdf", ".h5", ".mat"
]

File formats supported by MonthlyRBStrategy.

el_paso.typing.MagFieldVarTypes module-attribute

MagFieldVarTypes: TypeAlias = Literal[
    "Alpha_LC_Eq",
    "Alpha_LC",
    "B_Calc",
    "B_fofl",
    "B_Eq",
    "B_mirr",
    "xGEO_Eq",
    "MLT",
    "R_Eq",
    "MLT_Eq",
    "L_star",
    "L_m",
    "Alpha_Eq",
    "InvMu",
    "InvK",
    "I",
]

el_paso.typing.MagInputKeys module-attribute

MagInputKeys: TypeAlias = Literal[
    "Kp",
    "Dst",
    "dens",
    "velo",
    "Pdyn",
    "ByIMF",
    "BzIMF",
    "G1",
    "G2",
    "G3",
    "W1",
    "W2",
    "W3",
    "W4",
    "W5",
    "W6",
    "AL",
]

el_paso.typing.MagneticFieldLiteral module-attribute

MagneticFieldLiteral: TypeAlias = Literal[
    "T89",
    "T01",
    "T01s",
    "TS04",
    "TS05",
    "T04s",
    "T96",
    "OP77Q",
    "OP77",
]

Supported magnetic-field model identifiers.

el_paso.typing.OutputFile

Bases: NamedTuple

Represents an output file with its name and a list of variable names to save.

Attributes:

Name Type Description
name str

The name of the output file.

names_to_save list[str]

List of variable names to be saved in the output file.

save_incomplete bool

If True, allows saving even if some variables are missing.

Source code in el_paso/saving_strategy.py
35
36
37
38
39
40
41
42
43
44
45
46
class OutputFile(NamedTuple):
    """Represents an output file with its name and a list of variable names to save.

    Attributes:
        name (str): The name of the output file.
        names_to_save (list[str]): List of variable names to be saved in the output file.
        save_incomplete (bool): If True, allows saving even if some variables are missing.
    """

    name: str
    names_to_save: list[InternalName]
    save_incomplete: bool = False

el_paso.typing.PRBEMName module-attribute

PRBEMName: TypeAlias = InternalName

PRBEM-standard variable names, which match EL-PASO internal names.

el_paso.typing.SavedDataDict module-attribute

SavedDataDict: TypeAlias = dict[
    InternalName | Literal["metadata"], Any
]

Dictionary passed to saving backends, keyed by internal variable name or metadata.

el_paso.typing.SavingStrategy

Bases: ABC

Abstract base class for defining strategies to save output files with specific time intervals and variables.

Attributes:

Name Type Description
output_files list[OutputFile]

List of output files to be managed by the saving strategy.

data_standard DataStandard[StandardName]

The data standard that defines the variable naming convention.

base_data_path Path

The base path where output files will be saved.

satellite str

The name of the satellite for which data is being saved.

mission str

The name of the mission for which data is being saved.

instrument str

The name of the instrument for which data is being saved.

mag_field MagneticFieldLiteral

The magnetic field model used for saving data, if applicable.

Methods:

Name Description
get_time_intervals_to_save

Abstract method to determine the time intervals for saving data between start_time and end_time.

get_file_path

Abstract method to generate the file path for a given time interval and output file.

standardize_variable

Abstract method to standardize a variable before saving, possibly renaming or formatting it.

get_target_variables

Selects and prepares variables to be saved in the output file, optionally truncating them to a time range.

save_single_file

Saves the provided dictionary to a file in the specified format (.mat, .h5, .nc, .cdf), optionally appending data.

append_data

Abstract method to append data to an existing file; must be implemented by subclasses. All subclasses may not need it, so it is not defined in the base class.

Source code in el_paso/saving_strategy.py
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
class SavingStrategy(ABC):
    """Abstract base class for defining strategies to save output files with specific time intervals and variables.

    Attributes:
        output_files (list[OutputFile]): List of output files to be managed by the saving strategy.
        data_standard (DataStandard[StandardName]): The data standard that defines the variable naming convention.
        base_data_path (Path): The base path where output files will be saved.
        satellite (str): The name of the satellite for which data is being saved.
        mission (str): The name of the mission for which data is being saved.
        instrument (str): The name of the instrument for which data is being saved.
        mag_field (MagneticFieldLiteral): The magnetic field model used for saving data, if applicable.

    Methods:
        get_time_intervals_to_save:
            Abstract method to determine the time intervals for saving data between start_time and end_time.

        get_file_path:
            Abstract method to generate the file path for a given time interval and output file.

        standardize_variable:
            Abstract method to standardize a variable before saving, possibly renaming or formatting it.

        get_target_variables:
            Selects and prepares variables to be saved in the output file, optionally truncating them to a time range.

        save_single_file:
            Saves the provided dictionary to a file in the specified format (.mat, .h5, .nc, .cdf),
            optionally appending data.

        append_data:
            Abstract method to append data to an existing file; must be implemented by subclasses.
            All subclasses may not need it, so it is not defined in the base class.
    """

    output_files: list[OutputFile]
    data_standard: DataStandard[StandardName]
    base_data_path: Path
    satellite: str
    mission: str
    instrument: str
    mag_field: MagneticFieldLiteral

    def __repr__(self) -> str:
        cls = type(self)

        constructor_params = inspect.signature(cls.__init__).parameters

        args = []

        for name in constructor_params:
            if name == "self":
                continue

            if hasattr(self, name):
                value = getattr(self, name)
                args.append(f"{name}={value!r}")

        return f"{cls.__name__}({', '.join(args)})"

    def __str__(self) -> str:
        return self.__repr__()

    @abstractmethod
    def get_time_intervals_to_save(self, start_time: datetime, end_time: datetime) -> list[TimeInterval]:
        """Generates a list of time intervals to save between the specified start and end times.

        Args:
            start_time (datetime | None): The starting datetime for the intervals.
                                          If None, intervals may start from the earliest available time.
            end_time (datetime | None): The ending datetime for the intervals.
                                        If None, intervals may end at the latest available time.

        Returns:
            list[TimeInterval]: A list of tuples, each representing a time interval (start, end)
                                             to be saved.
        """

    @abstractmethod
    def get_file_path(self, interval_start: datetime, interval_end: datetime, output_file: OutputFile) -> Path:
        """Generates a file path for saving variables based on the provided interval and output file information.

        Args:
            interval_start (datetime): The start of the interval for which the file is being generated.
            interval_end (datetime): The end of the interval for which the file is being generated.
            output_file (OutputFile): An OutputFile containing the name of the output file,
                                      and which variables should be saved in this file.

        Returns:
            Path: The generated file path where the output data should be saved.
        """

    @abstractmethod
    def standardize_variable(
        self, variable: Variable, internal_name: InternalName, *, first_call_of_interval: bool
    ) -> Variable:
        """Standardizes the given variable according to the specified name in the file.

        Standardization may include checking of units, dimensions, and size consistency.

        Args:
            variable (Variable): The variable instance to be standardized.
            internal_name (str): The internal name of the variable, used for standardization rules.
            first_call_of_interval (bool): Flag to indicate if it is the first call of a time interval

        Returns:
            Variable: The standardized variable instance.
        """

    @abstractmethod
    def save_single_file(self, file_path: Path, dict_to_save: SavedDataDict, *, append: bool = False) -> None:
        """Saves the provided dictionary to a single file in one of the supported formats (.mat, .h5, .nc).

        Parameters:
            file_path (Path): The path where the file should be saved.
            dict_to_save (dict[str, Any]): The dictionary containing variable data and metadata to be saved.
            append (bool, optional): If True, data will be appended to existing files rather than overwriting them.
                    Defaults to False.
        """

    @abstractmethod
    def get_file_path_stem(self) -> Path:
        pass

    @abstractmethod
    def get_file_name_stem(self) -> str:
        pass

    def get_target_variables(
        self,
        output_file: OutputFile,
        variables_dict: dict[InternalName, Variable],
        time_var: Variable | None,
        start_time: datetime | None,
        end_time: datetime | None,
    ) -> dict[InternalName, Variable] | None:
        """Retrieves and processes target variables for saving based on the specified output file.

        Parameters:
            output_file (OutputFile): The output file configuration containing variable names to save.
            variables_dict (dict[str, Variable]): Dictionary mapping variable names to Variable objects.
            time_var (Variable | None): The time variable used for truncation, if applicable.
            start_time (datetime | None): The start time for truncating variables, if specified.
            end_time (datetime | None): The end time for truncating variables, if specified.

        Returns:
            dict[str, Variable] | None:
                - A dictionary of processed Variable objects keyed by their names,
                    or None if any specified variable name is not found in variables_dict.

        Notes:
            - If no variable names are specified in output_file, all variables in variables_dict are processed.
            - Variables are deep-copied before processing.
            - Each variable is standardized using the `standardize_variable` method.
            - If a requested variable name is not found, a warning is issued and None is returned.
        """
        target_variables: dict[InternalName, Variable] = {}
        first_call_of_interval = True

        # if no variables have been specified, we save all of them
        if len(output_file.names_to_save) == 0:
            for key, var in variables_dict.items():
                var_to_save = deepcopy(var)

                if start_time is not None and end_time is not None and time_var is not None:
                    var_to_save.truncate(time_var, start_time.timestamp(), end_time.timestamp())
                var_to_save = self.standardize_variable(var_to_save, key, first_call_of_interval=first_call_of_interval)
                first_call_of_interval = False

                target_variables[key] = var_to_save

            return target_variables

        for name_to_save in output_file.names_to_save:
            if name_to_save in variables_dict:
                var_to_save = deepcopy(variables_dict[name_to_save])

                if start_time is not None and end_time is not None and time_var is not None:
                    var_to_save.truncate(time_var, start_time.timestamp(), end_time.timestamp())

                var_to_save = self.standardize_variable(
                    var_to_save, name_to_save, first_call_of_interval=first_call_of_interval
                )
                first_call_of_interval = False

                target_variables[name_to_save] = var_to_save
            else:
                msg = f"Could not find target variable {name_to_save}!"
                logger.warning(msg, stacklevel=2)
                if output_file.save_incomplete:
                    target_variables[name_to_save] = Variable(original_unit=u.dimensionless_unscaled, data=np.array([]))
                else:
                    return None

        return target_variables

    def get_output_file(
        self, *, standard_name: StandardName | None = None, internal_name: InternalName | None = None
    ) -> OutputFile | None:
        if internal_name is None:
            if standard_name is None:
                msg = "Either standard_name or internal_name must be provided!"
                raise ValueError(msg)
            internal_name = self.data_standard.get_internal_name(standard_name)

        if internal_name is None:
            return None

        for output_file in self.output_files:
            if internal_name in output_file.names_to_save:
                return output_file

        return None

    def get_all_standard_names(self) -> list[StandardName]:
        all_standard_names: list[StandardName] = []

        for output_file in self.output_files:
            all_standard_names.extend(
                [self.data_standard.get_standard_name(internal_name) for internal_name in output_file.names_to_save]
            )

        return list(set(all_standard_names))

el_paso.typing.SingleFileStrategy

Bases: SavingStrategy

A concrete saving strategy that saves all data to a single file.

This strategy implements the SavingStrategy abstract methods to manage saving all variables for the entire time range into a single output file. It is a simple, non-partitioning approach. Supports multiple file formats including MATLAB (.mat), HDF5 (.h5), NetCDF4 (.nc), and CDF (.cdf). Users can also register custom format writers for additional file formats.

Attributes:

Name Type Description
file_path Path

The path to the single output file where all data will be saved.

output_files list[OutputFile]

List of output files to be managed.

Methods:

Name Description
__init__

Initializes the strategy with file path and optional custom writers.

get_time_intervals_to_save

Returns the entire time range as a single interval.

get_file_path

Always returns the pre-defined single file path.

standardize_variable

Passes the variable through without any standardization.

save_single_file

Saves data to a file in the specified format using the dispatch table.

register_writer

Registers a custom format writer for a file extension.

Supported Formats
  • .mat: MATLAB format using scipy.io.savemat
  • .h5: HDF5 format using h5py with optional gzip compression
  • .nc: NetCDF4 format using netCDF4 with optional compression
  • .cdf: CDF (Common Data Format) using cdflib with gzip compression
  • Custom: Any user-defined format via register_writer() or format_writers parameter
Example
def write_custom(file_path: Path, data_dict: dict[str, Any]) -> None:
    # Custom writer implementation
    pass
strategy = SingleFileStrategy("output.myformat",format_writers={".myformat": write_custom})
ep.save(variables, saving_strategy=strategy, ...)
Source code in el_paso/saving_strategies/single_file_strategy.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
class SingleFileStrategy(SavingStrategy):
    """A concrete saving strategy that saves all data to a single file.

    This strategy implements the `SavingStrategy` abstract methods to manage saving all variables
    for the entire time range into a single output file. It is a simple, non-partitioning approach.
    Supports multiple file formats including MATLAB (.mat), HDF5 (.h5), NetCDF4 (.nc), and CDF (.cdf).
    Users can also register custom format writers for additional file formats.

    Attributes:
        file_path (Path): The path to the single output file where all data will be saved.
        output_files (list[OutputFile]): List of output files to be managed.

    Methods:
        __init__(file_path, format_writers): Initializes the strategy with file path and optional custom writers.
        get_time_intervals_to_save: Returns the entire time range as a single interval.
        get_file_path: Always returns the pre-defined single file path.
        standardize_variable: Passes the variable through without any standardization.
        save_single_file: Saves data to a file in the specified format using the dispatch table.
        register_writer: Registers a custom format writer for a file extension.

    Supported Formats:
        - .mat: MATLAB format using scipy.io.savemat
        - .h5: HDF5 format using h5py with optional gzip compression
        - .nc: NetCDF4 format using netCDF4 with optional compression
        - .cdf: CDF (Common Data Format) using cdflib with gzip compression
        - Custom: Any user-defined format via register_writer() or format_writers parameter

    Example:
        ```python
        def write_custom(file_path: Path, data_dict: dict[str, Any]) -> None:
            # Custom writer implementation
            pass
        strategy = SingleFileStrategy("output.myformat",format_writers={".myformat": write_custom})
        ep.save(variables, saving_strategy=strategy, ...)
        ```
    """

    output_files: list[OutputFile]
    file_path: Path
    _writers: dict[str, SingleFileFormatWriter]

    def __init__(
        self,
        file_path: str | Path,
        format_writers: dict[str, SingleFileFormatWriter] | None = None,
    ) -> None:
        """Initializes the SingleFileStrategy with the specified file path and optional custom format writers.

        Parameters:
            file_path (str | Path): The full path to the output file. The file extension determines
                the format unless a custom writer is registered.
            format_writers (dict[str, SingleFileFormatWriter] | None): Optional dictionary mapping file extensions
                (including the dot, e.g., ".myformat") to custom writer functions. Custom writers override
                built-in writers for the same extension. Defaults to None.

        Example:
            ```python
            def write_custom(file_path: Path, data_dict: dict[str, Any]) -> None:
                # Custom writer implementation
                pass
            strategy = SingleFileStrategy("output.myformat",format_writers={".myformat": write_custom})
            ep.save(variables, saving_strategy=strategy, ...)
            ```
        """
        self.file_path = Path(file_path)
        self.output_files = [OutputFile(self.file_path.name, [])]

        # Build the dispatch table with built-in writers
        self._writers: dict[str, SingleFileFormatWriter] = {
            ".mat": self._write_mat_file,
            ".h5": self._write_h5_file,
            ".nc": self._write_netcdf_file,
            ".cdf": self._write_cdf_file,
        }

        # Register custom writers (these override built-in writers if same extension)
        if format_writers:
            self._writers.update(format_writers)

    def get_file_path_stem(self) -> None:  # ty:ignore[invalid-method-override]
        pass

    def get_file_name_stem(self) -> None:  # ty:ignore[invalid-method-override]
        pass

    def get_time_intervals_to_save(self, start_time: datetime, end_time: datetime) -> list[TimeInterval]:
        """Returns the entire time range as a single interval.

        This strategy does not split data by time; it saves everything in one go.

        Parameters:
            start_time (datetime): The start time of the data range.
            end_time (datetime): The end time of the data range.

        Returns:
            list[TimeInterval]: A list containing a single tuple with the start and end times.
        """
        return [(start_time, end_time)]

    def get_file_path(
        self,
        interval_start: datetime,  # noqa: ARG002
        interval_end: datetime,  # noqa: ARG002
        output_file: OutputFile,  # noqa: ARG002
    ) -> Path:
        """Returns the pre-defined single file path, ignoring the interval.

        This method ensures all data is saved to the same file, regardless of the time interval.

        Parameters:
            interval_start (datetime): The start of the time interval (ignored).
            interval_end (datetime): The end of the time interval (ignored).
            output_file (OutputFile): The output file configuration (ignored).

        Returns:
            Path: The `file_path` of this strategy instance.
        """
        return self.file_path

    def standardize_variable(
        self,
        variable: Variable,
        internal_name: InternalName,  # noqa: ARG002
        *,
        first_call_of_interval: bool,  # noqa: ARG002
    ) -> Variable:
        """Does not modify the variable.

        This strategy does not perform any specific standardization on the variables before saving.

        Parameters:
            variable (Variable): The variable instance to be standardized.
            name_in_file (str): The name of the variable as it appears in the file (ignored).
            first_call_of_interval (bool): Flag to indicate if it is the first call of a time interval

        Returns:
            Variable: The original variable instance, unchanged.
        """
        return variable

    def register_writer(self, extension: str, writer: SingleFileFormatWriter) -> None:
        """Register a custom format writer for a file extension.

        This method allows you to register custom writers for file formats not natively supported,
        or to override built-in writers. Custom writers are called when a file with the matching
        extension is saved.

        Parameters:
            extension (str): The file extension (including the dot), e.g., ".myformat" or ".bin".
            writer (SingleFileFormatWriter): A callable with signature `(Path, dict[str, Any]) -> None` that
                handles writing the data dictionary to the specified file path.

        Example:
            ```python
            def write_binary(path: Path, data: dict[str, Any]) -> None:
                import struct
                with open(path, 'wb') as f:
                    for key, value in data.items():
                        if key != "metadata":
                            f.write(value.tobytes())
            strategy = SingleFileStrategy("output.dat")
            strategy.register_writer(".dat", write_binary)
            ```
        """
        if not extension.startswith("."):
            extension = "." + extension
        self._writers[extension.lower()] = writer

    def _write_metadata_to_netcdf_variable(self, data_set: nC.Variable[Any], metadata: dict[str, Any]) -> None:
        """Attach metadata values that can be represented as NetCDF attributes."""
        for key, value in metadata.items():
            if isinstance(value, list):
                value = ", ".join(str(item) for item in value)

            if getattr(value, "size", None) == 0:
                continue

            setattr(data_set, key, value)

    def _write_netcdf_file(self, file_path: Path, data_dict: dict[str, Any]) -> None:
        """Write data dictionary to NetCDF4 (.nc) format.

        Creates hierarchical groups based on paths (e.g., "group1/group2/dataset" becomes nested groups).
        Applies zlib compression, shuffle filter, and creates dimension variables automatically.
        Writes metadata as variable attributes.

        Parameters:
            file_path (Path): Path to save the .nc file.
            data_dict (dict[str, Any]): Dictionary with variable data and metadata.
                Keys are path strings (e.g., "var_name" or "group/subgroup/var_name").
                The "metadata" key is skipped; metadata is stored as variable attributes.
        """
        with nC.Dataset(file_path, "w", format="NETCDF4") as file:
            for path, value in data_dict.items():
                if path == "metadata":
                    continue

                if value.size == 0:
                    continue

                path_parts = path.split("/")
                groups = path_parts[:-1]
                dataset_name = path_parts[-1]

                curr_hierarchy: nC.Group | nC.Dataset = file
                for group in groups:
                    if group not in curr_hierarchy.groups:
                        curr_hierarchy = curr_hierarchy.createGroup(group)
                    else:
                        curr_hierarchy = curr_hierarchy.groups[group]

                dimensions = []
                for axis, size in enumerate(value.shape):
                    dimension_name = f"{dataset_name}_dim_{axis}"
                    if dimension_name not in curr_hierarchy.dimensions:
                        curr_hierarchy.createDimension(dimension_name, size)
                    dimensions.append(dimension_name)

                data_set = typing.cast(
                    "nC.Variable[Any]",
                    curr_hierarchy.createVariable(
                        dataset_name, value.dtype, dimensions, zlib=True, complevel=5, shuffle=True
                    ),
                )

                data_set[...] = value

                if path in data_dict.get("metadata", {}):
                    self._write_metadata_to_netcdf_variable(data_set, data_dict["metadata"][path])

    def save_single_file(self, file_path: Path, dict_to_save: dict[str, Any], *, append: bool = False) -> None:  # ty:ignore[invalid-method-override]
        """Saves variable data to a single file in one of the supported formats.

        The file format is determined by the file extension. Built-in formats include .mat, .h5, .nc, and .cdf.
        Custom format writers can be registered via the format_writers parameter during initialization or
        via the register_writer() method.

        It is primarily designed to be used with the `el_paso.save()` function, which handles the logic of determining
        what data to save and when.

        Parameters:
            file_path (Path): The path to the file where the dictionary will be saved.
                              The file extension determines the format.
            dict_to_save (dict[str, Any]): The dictionary containing variable data to save.
                Keys are variable names (strings), values are NumPy arrays or other serializable data.
                Should include a "metadata" key with metadata dictionary.
            append (bool, optional): If True, attempts to append to an existing file.
                Only supported for CDF format. For other formats, raises NotImplementedError.
                Defaults to False.

        Raises:
            NotImplementedError: If the file format is not registered or supported,
                or if append is requested for formats that don't support it.
            Any exception raised by the format writer function.

        Supported Built-in Formats:
            - .mat: MATLAB format using scipy.io.savemat
            - .h5: HDF5 format using h5py with gzip compression
            - .nc: NetCDF4 format using netCDF4 with compression
            - .cdf: CDF (Common Data Format) using cdflib with gzip compression
        """
        logger.info(f"Saving file {file_path.name}...")

        file_path.parent.mkdir(parents=True, exist_ok=True)
        format_name = file_path.suffix.lower()

        # Look up the writer in the dispatch table
        writer = self._writers.get(format_name)

        if writer is None:
            msg = f"The '{format_name}' format is not implemented. Registered formats: {list(self._writers.keys())}"
            logger.error(msg)
            raise NotImplementedError(msg)

        if append:
            msg = f"Appending to existing files is not supported for '{format_name}' format."
            logger.error(msg)
            raise NotImplementedError(msg)
        writer(file_path, dict_to_save)

    def _write_mat_file(self, file_path: Path, data_dict: dict[str, Any]) -> None:
        """Write data dictionary to MATLAB .mat format.

        Parameters:
            file_path (Path): Path to save the .mat file.
            data_dict (dict[str, Any]): Dictionary with variable data and metadata.
        """
        savemat(str(file_path), data_dict)

    def _write_h5_file(self, file_path: Path, data_dict: dict[str, Any]) -> None:
        """Write data dictionary to HDF5 (.h5) format.

        Creates hierarchical groups based on paths (e.g., "group1/group2/dataset" becomes nested groups).
        Applies gzip compression and shuffling to all datasets. Writes metadata as dataset attributes.

        Parameters:
            file_path (Path): Path to save the .h5 file.
            data_dict (dict[str, Any]): Dictionary with variable data and metadata.
                Keys are path strings (e.g., "var_name" or "group/subgroup/var_name").
                The "metadata" key is skipped; metadata is stored as dataset attributes.
        """
        with h5py.File(file_path, "w") as file:
            for path, value in data_dict.items():
                if path == "metadata":
                    continue

                path_parts = path.split("/")
                groups = path_parts[:-1]
                dataset_name = path_parts[-1]

                curr_hierachy = file
                for group in groups:
                    if group not in curr_hierachy:
                        curr_hierachy = curr_hierachy.create_group(group)
                    else:
                        curr_hierachy = typing.cast("h5py.Group", curr_hierachy[group])

                data_set = curr_hierachy.create_dataset(dataset_name, data=value, compression="gzip", shuffle=True)

                if path in data_dict["metadata"]:
                    for key, metadata in data_dict["metadata"][path].items():
                        data_set.attrs[key] = metadata

    def _write_cdf_file(self, file_path: Path, data_dict: dict[str, Any]) -> None:
        """Write data dictionary to CDF (Common Data Format) format.

        Converts NumPy arrays to appropriate CDF data types and writes them as zVariables.
        Supports global attributes and per-variable attributes from the metadata dictionary.
        Applies gzip compression (Compress=6) to all variables.

        Parameters:
            file_path (Path): Path to save the .cdf file.
            data_dict (dict[str, Any]): Dictionary with variable data and metadata.
                Keys are variable names. The "metadata" key contains global and variable attributes.
                Metadata should follow the format: {var_name: {attr_name: attr_value, ...}, ...}
        """
        try:
            cdf_file = cdflib.cdfwrite.CDF(str(file_path), delete=True)

            try:
                metadata = data_dict.get("metadata")

                if isinstance(metadata, dict):
                    global_attrs: dict[str, dict[int, Any]] = {}

                    for attr_name, attr_value in metadata.items():
                        attr_name_str = str(attr_name)

                        if isinstance(attr_value, dict):
                            keys = list(attr_value.keys())
                            if all(isinstance(k, (int, np.integer)) or str(k).isdigit() for k in keys):
                                global_attrs[attr_name_str] = {int(k): v for k, v in attr_value.items()}
                            else:
                                for sub_key, sub_val in attr_value.items():
                                    if isinstance(sub_val, (list, tuple)) and len(sub_val) == 0:
                                        logger.warning(f"Skipping empty global attribute {attr_name_str}_{sub_key}")
                                        continue
                                    flat_name = f"{attr_name_str}_{sub_key}"
                                    global_attrs[flat_name] = {0: sub_val}

                        elif isinstance(attr_value, (list, tuple)):
                            if len(attr_value) == 0:
                                logger.warning(f"Skipping empty global attribute {attr_name_str}")
                                continue
                            global_attrs[attr_name_str] = dict(enumerate(attr_value))

                        else:
                            global_attrs[attr_name_str] = {0: attr_value}

                    if global_attrs:
                        cdf_file.write_globalattrs(global_attrs)

                for var_name, var_data in data_dict.items():
                    if var_name == "metadata":
                        continue

                    if getattr(var_data, "size", 0) == 0:
                        logger.warning(f"Skipping empty variable {var_name}")
                        continue

                    var_data_array = np.asarray(var_data)
                    if np.issubdtype(var_data_array.dtype, np.integer):
                        if var_data_array.dtype == np.int8:
                            cdf_dtype = cdflib.cdfwrite.CDF.CDF_INT1
                        elif var_data_array.dtype == np.int16:
                            cdf_dtype = cdflib.cdfwrite.CDF.CDF_INT2
                        elif var_data_array.dtype == np.int32:
                            cdf_dtype = cdflib.cdfwrite.CDF.CDF_INT4
                        else:
                            cdf_dtype = cdflib.cdfwrite.CDF.CDF_INT8

                    elif np.issubdtype(var_data_array.dtype, np.floating):
                        if var_data_array.dtype == np.float32:
                            cdf_dtype = cdflib.cdfwrite.CDF.CDF_FLOAT
                        else:
                            cdf_dtype = cdflib.cdfwrite.CDF.CDF_DOUBLE

                    else:
                        var_data_array = var_data_array.astype(np.float64)
                        cdf_dtype = cdflib.cdfwrite.CDF.CDF_DOUBLE

                    var_spec: dict[str, Any] = {
                        "Variable": var_name,
                        "Data_Type": cdf_dtype,
                        "Num_Elements": 1,
                        "Rec_Vary": True,
                        "Dim_Sizes": (list(var_data_array.shape[1:]) if var_data_array.ndim > 1 else []),
                    }

                    var_attrs: dict[str, Any] = {
                        "Compress": 6,
                    }

                    cdf_file.write_var(
                        var_spec,
                        var_attrs=var_attrs,
                        var_data=var_data_array,
                    )

            finally:
                cdf_file.close()

        except Exception as e:
            msg = f"Failed to write CDF file {file_path}: {e}"
            logger.exception(msg)
            raise RuntimeError(msg) from e

el_paso.typing.StandardName module-attribute

StandardName: TypeAlias = (
    PRBEMName | GFZVarNames | Literal["metadata"]
)

Any standard-facing variable name accepted by EL-PASO data standards.

el_paso.typing.TimeInterval module-attribute

TimeInterval: TypeAlias = tuple[datetime, datetime]

Inclusive start and end datetimes for a processing or saving interval.

el_paso.typing.Variable

Variable class holding data and metadata.

Attributes:

Name Type Description
_data NDArray[generic]

The numerical data of the variable.

metadata VariableMetadata

An instance of VariableMetadata holding information about the variable.

Source code in el_paso/variable.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
class Variable:
    """Variable class holding data and metadata.

    Attributes:
        _data (NDArray[np.generic]): The numerical data of the variable.
        metadata (VariableMetadata): An instance of `VariableMetadata` holding
            information about the variable.
    """

    __slots__ = "_data", "metadata"

    _data: NDArray[np.generic]
    metadata: VariableMetadata

    def __init__(
        self,
        original_unit: u.UnitBase,
        data: NDArray[np.generic] | None = None,
        description: str = "",
        processing_notes: str = "",
        standard_name: StandardName = "",  # ty:ignore[invalid-parameter-default]
    ) -> None:
        """Initializes a Variable instance.

        Args:
            original_unit (u.UnitBase): The original unit of the data.
            data (NDArray[np.generic] | None): The numerical data. Defaults to an empty
                numpy array if None.
            description (str): A description of the variable. Defaults to "".
            processing_notes (str): Notes on how the data was processed. Defaults to "".
            standard_name (StandardName): The standard name of the variable. Defaults to "".
        """
        self._data = np.array([]) if data is None else data

        self.metadata = VariableMetadata(
            unit=original_unit,
            description=description,
            processing_notes=processing_notes,
            standard_name=standard_name,
        )

    def __repr__(self) -> str:
        """Returns a string representation of the Variable object."""
        return f"Variable holding {self._data.shape} data points with metadata: {self.metadata}"

    def convert_to_unit(self, target_unit: u.UnitBase | str) -> None:
        """Converts the data to a given unit.

        Args:
            target_unit (u.UnitBase | str): The unit the data should be converted to.
        """
        if isinstance(target_unit, str):
            target_unit = u.Unit(target_unit)

        if self.metadata.unit != target_unit:
            data_with_unit = u.Quantity(self._data, self.metadata.unit)
            self._data = typing.cast("NDArray[np.generic]", data_with_unit.to_value(target_unit))

            self.metadata.unit = target_unit

    @overload
    def get_data(self, target_unit: u.UnitBase | str) -> NDArray[np.floating | np.integer]: ...

    @overload
    def get_data(self, target_unit: None = None) -> NDArray[np.generic]: ...

    def get_data(self, target_unit: u.UnitBase | str | None = None) -> NDArray[np.generic]:
        """Gets the data of the variable.

        Args:
            target_unit (u.UnitBase | str | None): The unit to convert the data to
                before returning. If None, the data is returned in its current unit.
                Defaults to None.

        Returns:
            NDArray[np.generic]: The data of the variable.

        Raises:
            TypeError: If `target_unit` is provided and the data is not numeric.
        """
        if target_unit is None:
            return self._data

        if isinstance(target_unit, str):
            target_unit = u.Unit(target_unit)

        if not np.issubdtype(self._data.dtype, np.number):
            msg = f"Unit conversion is only supported for numeric types! Encountered for variable {self}."
            raise TypeError(msg)

        return typing.cast("NDArray[np.generic]", u.Quantity(self._data, self.metadata.unit).to_value(target_unit))

    def set_data(self, data: NDArray[np.generic], unit: Literal["same"] | str | u.UnitBase) -> None:  # noqa: PYI051
        """Sets the data and optionally updates the unit of the variable.

        Args:
            data (NDArray[np.generic]): The new data array.
            unit (Literal["same"] | str | u.UnitBase): The unit of the new data.
                If "same", the existing unit is kept. Can be a string representation
                of a unit or an `astropy.units.UnitBase` object.

        Raises:
            TypeError: If `unit` is not "same", a string, or an `astropy.units.UnitBase` object.
        """
        self._data = data

        if isinstance(unit, str):
            if unit != "same":
                self.metadata.unit = u.Unit(unit)
        elif isinstance(unit, u.UnitBase):
            self.metadata.unit = unit
        else:
            msg = "unit must be either a str or a astropy unit!"
            raise TypeError(msg)

    def transpose_data(self, seq: list[int] | tuple[int, ...]) -> None:
        """Transposes the internal data array.

        Args:
            seq (list[int] | tuple[int, ...]): The axes to transpose to. See
                `numpy.transpose` for details.
        """
        self._data = np.transpose(self._data, axes=seq)

    def apply_mask(self, mask: NDArray[np.bool_]) -> None:
        """Applies a boolean mask to the data.

        Elements where the mask is False are invalidated by setting them to NaN.

        Args:
            mask (NDArray[np.bool_]): Boolean array of the same shape as the data.
                False indicates values to be masked.

        Raises:
            TypeError: If the data is not a floating-point numeric type.
        """
        if not np.issubdtype(self._data.dtype, np.floating):
            msg = f"Masking is only supported for floating-point types! Encountered for variable {self}."
            raise TypeError(msg)

        self._data[~mask] = np.nan

    def apply_thresholds_on_data(self, lower_threshold: float = -np.inf, upper_threshold: float = np.inf) -> None:
        """Applies lower and upper thresholds to the data.

        Values outside the thresholds (exclusive) are set to NaN.

        Args:
            lower_threshold (float): The lower bound for the data. Defaults to
                negative infinity.
            upper_threshold (float): The upper bound for the data. Defaults to
                positive infinity.

        Raises:
            TypeError: If the data is not a floating-point numeric type.
        """
        if not np.issubdtype(self._data.dtype, np.floating):
            msg = f"Thresholds are only supported for floating-point types! Encountered for variable {self}."
            raise TypeError(msg)
        self._data = typing.cast("NDArray[np.floating]", self._data)

        self._data = np.where((self._data > lower_threshold) & (self._data < upper_threshold), self._data, np.nan)

    def truncate(self, time_variable: Variable, start_time: float | datetime, end_time: float | datetime) -> None:
        """Truncates the variable's data based on a time variable and a time range.

        Args:
            time_variable (Variable): A `Variable` object containing the time data.
            start_time (float | datetime): The start time for truncation. Can be a
                Unix timestamp (float) or a `datetime` object.
            end_time (float | datetime): The end time for truncation. Can be a
                Unix timestamp (float) or a `datetime` object.

        Raises:
            ValueError: If the length of the variable's data does not match the
                length of the `time_variable`'s data.
        """
        if isinstance(start_time, datetime):
            start_time = enforce_utc_timezone(start_time).timestamp()
        if isinstance(end_time, datetime):
            end_time = enforce_utc_timezone(end_time).timestamp()

        if self._data.shape[0] != time_variable.get_data().shape[0]:
            msg = f"Encountered length missmatch between variable and time variable! Variable: {self}"
            raise ValueError(msg)

        time_var_data = time_variable.get_data(ep.units.posixtime)

        self._data = self._data[(time_var_data >= start_time) & (time_var_data <= end_time)]

    def __hash__(self) -> int:
        """Computes a hash value for the variable based on its holding data.

        Returns:
            int: The integer hash value.
        """
        return hash(self._data.tobytes())

el_paso.typing.VariableInfo

Bases: NamedTuple, Generic[T_co]

A named tuple to store information about a variable in a data standard.

Source code in el_paso/data_standard.py
29
30
31
32
33
34
35
class VariableInfo(NamedTuple, Generic[T_co]):
    """A named tuple to store information about a variable in a data standard."""

    standard_name: T_co
    description: str
    unit: u.UnitBase
    dependencies: list[InternalName | str]

el_paso.typing.VariableMetadata dataclass

A class holding the metadata of a variable.

Attributes:

Name Type Description
unit UnitBase

The unit of the variable. Defaults to u.dimensionless_unscaled.

original_cadence_seconds float

The original cadence of the data in seconds. Defaults to 0.

source_files list[str]

The list of SourceFiles, which variable contains data from. Defaults to an empty list.

description str

The description of the variable explaining what kind of data this variable contains. Defaults to "".

processing_notes str

The processing notes of the variable explaining all steps done to achieve the final result. Defaults to "".

standard_name str

The name of the standard variable this variable complies to. Defaults to "".

Source code in el_paso/variable.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
@dataclass
class VariableMetadata:
    """A class holding the metadata of a variable.

    Attributes:
        unit (u.UnitBase): The unit of the variable. Defaults to
            `u.dimensionless_unscaled`.
        original_cadence_seconds (float): The original cadence of the data in seconds.
            Defaults to 0.
        source_files (list[str]): The list of SourceFiles, which variable contains
            data from. Defaults to an empty list.
        description (str): The description of the variable explaining what kind of data
            this variable contains. Defaults to "".
        processing_notes (str): The processing notes of the variable explaining all
            steps done to achieve the final result. Defaults to "".
        standard_name (str): The name of the standard variable this variable complies
            to. Defaults to "".
    """

    unit: u.UnitBase = u.dimensionless_unscaled
    original_cadence_seconds: float = 0
    source_files: list[str] = field(default_factory=list[str])
    description: str = ""
    processing_notes: str = ""
    standard_name: str = ""

    def __post_init__(self) -> None:
        """Initializes the processing_steps_counter attribute to 1 after the dataclass has been instantiated.

        This method is automatically called by the dataclass after the __init__ method.
        """
        self.processing_steps_counter = 1

        if ep.is_in_release_mode():
            self.processing_notes += ep.get_release_msg() + "\n"

    def add_processing_note(self, processing_note: str) -> None:
        """Adds a processing note to the metadata.

        The note is prefixed with the current processing steps counter and a newline
        character is appended. The processing steps counter is then incremented.

        Args:
            processing_note (str): The note to be added to the processing notes.
        """
        processing_note = f"{self.processing_steps_counter}) {processing_note}\n"

        self.processing_notes += processing_note
        self.processing_steps_counter += 1

el_paso.typing.VariableRequest module-attribute

Type alias for a request to compute magnetic field variables, consisting of a sequence of tuples where each tuple specifies the variable type and the magnetic field model to use for its computation.

el_paso.typing.ExtractionInfo dataclass

Store metadata required to extract a variable from a source file.

Attributes:

Name Type Description
name_or_column str | int

Name of the variable or column to extract from the source file.

unit UnitBase

Physical unit associated with the extracted variable.

is_time_dependent bool

Whether the variable is time-dependent.

If True, data from multiple files will be concatenated along the time axis.

If False, data from multiple files will be used to fill missing (np.nan) values instead of being concatenated.

result_key str | None

Key to use for the extracted variable in the resulting variables dictionary.

If None, name_or_column is used as the key.

dependent_variables list[str] | None

Names of variables that the extracted variable depends on.

This is mainly used for JSON extraction to determine how extracted data should be reshaped.

np_dtype DTypeLike | None

Optional NumPy dtype used to cast the extracted data.

If None, the dtype is inferred from the source data.

Source code in el_paso/extract_variables_from_files.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
@dataclass(frozen=True, slots=True, eq=False)
class ExtractionInfo:
    """Store metadata required to extract a variable from a source file.

    Attributes:
        name_or_column:
            Name of the variable or column to extract from the source file.

        unit:
            Physical unit associated with the extracted variable.

        is_time_dependent:
            Whether the variable is time-dependent.

            If ``True``, data from multiple files will be concatenated
            along the time axis.

            If ``False``, data from multiple files will be used to fill
            missing (`np.nan`) values instead of being concatenated.

        result_key:
            Key to use for the extracted variable in the resulting
            variables dictionary.

            If ``None``, ``name_or_column`` is used as the key.

        dependent_variables:
            Names of variables that the extracted variable depends on.

            This is mainly used for JSON extraction to determine how
            extracted data should be reshaped.

        np_dtype:
            Optional NumPy dtype used to cast the extracted data.

            If ``None``, the dtype is inferred from the source data.
    """

    name_or_column: str | int
    unit: u.UnitBase
    is_time_dependent: bool = True
    result_key: str | None = None
    dependent_variables: list[str] | None = None
    np_dtype: DTypeLike | None = None