Skip to content

Typing

el_paso.typing.ConsistencyCheck dataclass

A utility class for checking the consistency of data dimensions.

This class helps verify that multiple variables saved to a file have the same length for shared dimensions (e.g., time, pitch angle, energy).

Attributes:

Name Type Description
lengths dict[str | int, _SizeAttr]

Maps each named dimension (e.g. "time", "pitch_angle", "energy") to the variable name and size that were first observed for that dimension.

Source code in el_paso/data_standard.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
@dataclass
class ConsistencyCheck:
    """A utility class for checking the consistency of data dimensions.

    This class helps verify that multiple variables saved to a file have
    the same length for shared dimensions (e.g., time, pitch angle, energy).

    Attributes:
        lengths (dict[str | int, _SizeAttr]): Maps each named dimension (e.g. "time",
            "pitch_angle", "energy") to the variable name and size that were first
            observed for that dimension.
    """

    lengths: dict[str | int, _SizeAttr] = field(default_factory=dict[str | int, _SizeAttr])

    def check(self, data_shape: tuple[int, ...], dim_names_or_sizes: Sequence[str | int], var_name: str) -> None:
        if len(data_shape) != len(dim_names_or_sizes):
            msg = "Encountered size missmatch!"
            raise ValueError(msg)

        for i, dim_name_or_size in enumerate(dim_names_or_sizes):
            self.check_size(data_shape[i], dim_name_or_size, var_name)

    def check_size(self, provided_len: int, dim_name_or_size: str | int, var_name: str) -> None:
        if isinstance(dim_name_or_size, int):
            if dim_name_or_size != provided_len:
                msg = (
                    f"Length mismatch! Variable {var_name} should have length {dim_name_or_size}, "
                    f"but encountered {provided_len}!",
                )
                raise ValueError(msg)
            return

        if dim_name_or_size in self.lengths:
            if self.lengths[dim_name_or_size].size != provided_len:
                msg = (
                    f"Length mismatch! {dim_name_or_size} length of variable "
                    f"{self.lengths[dim_name_or_size].name}: {self.lengths[dim_name_or_size].size} "
                    f"and of variable {var_name}: {provided_len}"
                )
                raise ValueError(msg)
        else:
            self.lengths[dim_name_or_size] = _SizeAttr(var_name, provided_len)

el_paso.typing.DailyLEORBStrategy

Bases: MonthlyRBStrategy

Save PRBEM-standard LEO radiation-belt data into one NetCDF file per day.

This strategy extends MonthlyRBStrategy but splits the output into daily files instead of monthly ones, and fixes the output variable list and file format (NetCDF) for low-Earth-orbit radiation-belt missions.

Source code in el_paso/saving_strategies/daily_leo_rb_strategy.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class DailyLEORBStrategy(ep.typing.MonthlyRBStrategy):
    """Save PRBEM-standard LEO radiation-belt data into one NetCDF file per day.

    This strategy extends `MonthlyRBStrategy` but splits the output into daily
    files instead of monthly ones, and fixes the output variable list and file
    format (NetCDF) for low-Earth-orbit radiation-belt missions.
    """

    def _get_output_file_entries(self) -> list[ep.typing.InternalName]:
        """Return the standard variable list plus user-defined custom variables."""
        return [
            "FEDU",
            "FEIU",
            "FEDO",
            "FPDU",
            "Epoch",
            "Alpha_Eq",
            "Alpha_Eq_range",
            "Energy_FEDU",
            "Energy_FEIU",
            "Energy_FEDO",
            "Energy_FPDU",
            "Alpha",
            "Alpha_range",
            "B_Calc",
            "B_Eq",
            "InvK",
            "InvMu",
            "Position",
            "PSD",
            "R_Eq",
            "MLT",
            "L_m",
            "L_star",
            "Alpha_LC",
            "Alpha_LC_Eq",
            "Position_geo_alt",
            "Position_geo_lat",
            "Position_geo_lon",
        ]

    def get_file_path(
        self, interval_start: datetime, interval_end: datetime, output_file: ep.typing.OutputFile,  # noqa: ARG002
    ) -> Path:
        """Generate the daily file path for the configured format."""
        file_name = f"{self.get_file_name_stem()}_{interval_start.strftime('%Y%m%d')}_{self.mag_field}.nc"

        return self.get_file_path_stem() / file_name

    def get_time_intervals_to_save(
        self, start_time: datetime | None, end_time: datetime | None
    ) -> list[ep.typing.TimeInterval]:
        """Split the requested time range into full daily intervals."""
        time_intervals: list[ep.typing.TimeInterval] = []

        if start_time is None or end_time is None:
            msg = "start_time and end_time must be provided for DailyWaveStrategy!"
            raise ValueError(msg)

        current_time = start_time.replace(hour=0, minute=0, second=0, microsecond=0)
        while current_time <= end_time:
            interval_start = current_time
            interval_end = current_time + timedelta(days=1, microseconds=-1)

            time_intervals.append((interval_start, interval_end))
            current_time += timedelta(days=1)

        return time_intervals

el_paso.typing.DailyWaveStrategy

Bases: SavingStrategy

Save wave and density data into one NetCDF (.nc) file per day.

Appending to existing files is not yet implemented for this strategy.

Source code in el_paso/saving_strategies/daily_wave_strategy.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
class DailyWaveStrategy(SavingStrategy):
    """Save wave and density data into one NetCDF (.nc) file per day.

    Appending to existing files is not yet implemented for this strategy.
    """

    output_files: list[OutputFile]
    dependency_dict: dict[InternalName, list[str]]

    def __init__(
        self,
        base_data_path: str | Path,
        mission: str,
        satellite: str,
        instrument: str,
        data_standard: DataStandard[StandardName],
    ) -> None:
        """Initialize a monthly file saving strategy.

        Args:
            base_data_path (str | Path): Directory where daily files are written.
            mission (str): Mission name, used in file path and name generation.
            satellite (str): Satellite name, used in file path and name generation.
            instrument (str): Instrument name, used in file path and name generation.
            data_standard (DataStandard): Instance of the data standard implementation.

        Attributes:
            output_files: List of output file configurations, with variable names
                defined by ``_get_output_file_entries()``.
            dependency_dict: Dictionary defining NetCDF dimension dependencies for
                all variables in ``output_files``.
        """
        self.base_data_path = Path(base_data_path)
        self.mission = mission
        self.satellite = satellite
        self.instrument = instrument
        self.data_standard = data_standard

        self.output_files = [
            OutputFile("full", self._get_output_file_entries(), save_incomplete=True),
        ]

    def _get_output_file_entries(self) -> list[InternalName]:
        """Return the standard variable list plus user-defined custom variables."""
        return [
            "Epoch",
            "Wave_frequency",
            "Number_density",
            "Wave_ellipticity",
            "Wave_normal_angle",
            "Wave_planarity",
            "Magnetic_Power_Spectral_Density",
            "Wave_frequency_bandwidth",
            "B_total_obs",
            "MLat",
            "R_Eq",
            "MLT",
        ]

    def _sanitize_dimension_name(self, variable_name: str) -> str:
        """Return a NetCDF-safe root dimension name derived from a variable path."""
        return "".join(char if char.isalnum() else "_" for char in variable_name).strip("_") or "custom"

    def get_time_intervals_to_save(self, start_time: datetime | None, end_time: datetime | None) -> list[TimeInterval]:
        """Split the requested time range into full daily intervals."""
        time_intervals: list[TimeInterval] = []

        if start_time is None or end_time is None:
            msg = "start_time and end_time must be provided for DailyWaveStrategy!"
            raise ValueError(msg)

        current_time = start_time.replace(hour=0, minute=0, second=0, microsecond=0)
        while current_time <= end_time:
            interval_start = current_time
            interval_end = current_time + timedelta(days=1, microseconds=-1)

            time_intervals.append((interval_start, interval_end))
            current_time += timedelta(days=1)

        return time_intervals

    def get_file_path_stem(self) -> Path:
        return self.base_data_path / self.mission.upper() / self.satellite.lower()

    def get_file_name_stem(self) -> str:
        return self.satellite.lower() + "_" + self.instrument.lower()

    def get_file_path(self, interval_start: datetime, interval_end: datetime, output_file: OutputFile) -> Path:  # noqa: ARG002
        """Generate the daily file path for the configured format."""
        file_name = f"{self.get_file_name_stem()}_{interval_start.strftime('%Y%m%d')}.nc"

        return self.get_file_path_stem() / file_name

    def standardize_variable(
        self,
        variable: Variable,
        internal_name: InternalName,
        *,
        first_call_of_interval: bool,
    ) -> Variable:
        """Standardize a variable through the configured data standard."""
        return self.data_standard.standardize_variable(
            internal_name, variable, reset_consistency_check=first_call_of_interval
        )

    def save_single_file(self, file_path: Path, dict_to_save: SavedDataDict, *, append: bool = False) -> None:
        """Save one daily file."""
        if append:
            msg = "Appending is not implemented yet for DailyWaveStrategy!"
            raise NotImplementedError(msg)
        file_path.parent.mkdir(parents=True, exist_ok=True)

        logger.info(f"Saving file: {file_path.resolve()}")

        write_netcdf_file(file_path, dict_to_save, self.data_standard)

el_paso.typing.DataStandard

Bases: ABC, Generic[T_co]

Abstract base class for data standardization.

Source code in el_paso/data_standard.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
class DataStandard(ABC, Generic[T_co]):
    """Abstract base class for data standardization."""

    variable_infos: dict[InternalName, VariableInfo[T_co]]

    def __repr__(self) -> str:
        cls = type(self)

        constructor_params = inspect.signature(cls.__init__).parameters
        args = []

        for name in constructor_params:
            if name == "self":
                continue

            if hasattr(self, name):
                value = getattr(self, name)
                args.append(f"{name}={value!r}")

        return f"{cls.__name__}({', '.join(args)})"

    def __str__(self) -> str:
        return self.__repr__()

    def get_internal_name(self, standard_name: StandardName) -> InternalName | None:
        for internal_name, var_info in self.variable_infos.items():
            if var_info.standard_name == standard_name:
                return internal_name

        return None

    def get_standard_name(self, internal_name: InternalName) -> T_co:

        if internal_name not in self.variable_infos:
            msg = f"Internal name {internal_name} is not part of the {type(self)}!"
            raise ValueError(msg)

        return self.variable_infos[internal_name].standard_name

    def get_dependencies(self, internal_name: InternalName) -> list[InternalName | FixedDimensionName]:
        return self.variable_infos[internal_name].dependencies

    def standardize_variable(
        self, internal_name: InternalName, variable: Variable, *, reset_consistency_check: bool
    ) -> Variable:
        """Standardizes a variable according to the data standard's rules.

        This abstract method takes avariable and a standard name,
        and returns a new `el_paso.Variable` that conforms to the specified standard.

        Args:
            internal_name (str): The internal name of the variable to be standardized.
            variable (Variable): The variable to be standardized.
            reset_consistency_check (bool): If set to true, the consistency check will be reseted.

        Returns:
            Variable: The standardized variable.
        """
        if reset_consistency_check:
            self.consistency_check = ConsistencyCheck()

        if internal_name not in self.variable_infos:
            logger.warning(f"Encountered custom variable which cannot be standardized: {internal_name}")
            return variable

        variable_info = self.variable_infos[internal_name]

        variable.convert_to_unit(variable_info.unit)
        if len(variable.metadata.description) == 0:
            variable.metadata.description = variable_info.description
        assert_n_dim(variable, len(variable_info.dependencies), internal_name)
        self.consistency_check.check(variable.get_data().shape, variable_info.dependencies, internal_name)

        return variable

    def __eq__(self, other: object) -> bool:
        if not isinstance(other, DataStandard):
            return NotImplemented
        return type(self) is type(other) and self.variable_infos == other.variable_infos

el_paso.typing.DensityNetCDFStrategy

Bases: MonthlyRBStrategy

Saving strategy for writing plasma density and related data to monthly NetCDF files.

This strategy extends MonthlyRBStrategy but implements saving to the NetCDF format (.nc), primarily targeting the time-series of density, position, and coordinate variables (e.g., L-star, MLT).

The variables included and their dependencies are configured based on whether the data is associated with the "RBSP" satellites or "Other".

Attributes:

Name Type Description
output_files list[OutputFile]

List of file configurations to be produced.

file_path Path

Base path for output files (inherited).

dependency_dict dict[str, list[str]]

Defines the NetCDF dimension names (e.g., 'time', 'xGEO_components') that each variable depends on.

Source code in el_paso/saving_strategies/density_netcdf_strategy.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
class DensityNetCDFStrategy(MonthlyRBStrategy):
    """Saving strategy for writing plasma density and related data to monthly NetCDF files.

    This strategy extends `MonthlyRBStrategy` but implements saving to the NetCDF
    format (`.nc`), primarily targeting the time-series of density, position, and
    coordinate variables (e.g., L-star, MLT).

    The variables included and their dependencies are configured based on whether
    the data is associated with the **"RBSP"** satellites or **"Other"**.

    Attributes:
        output_files (list[OutputFile]): List of file configurations to be produced.
        file_path (Path): Base path for output files (inherited).
        dependency_dict (dict[str, list[str]]): Defines the NetCDF dimension names
            (e.g., 'time', 'xGEO_components') that each variable depends on.
    """

    output_files: list[OutputFile]

    file_path: Path
    dependency_dict: dict[str, list[str]]

    def __init__(
        self,
        base_data_path: str | Path,
        mission: str,
        instrument: str,
        mag_field: MagneticFieldLiteral,
        satellite: Literal["RBSP", "Other"] = "Other",
        data_standard: Optional[DataStandard[StandardName]] = None,
    ) -> None:
        """Initializes the monthly NetCDF saving strategy.

        Args:
            base_data_path (str | Path): The base directory where the output NetCDF files will be saved.
            mission (str): The mission name, used in file path and name generation.
            instrument (str): The instrument name, used in file path and name generation.
            mag_field (MagneticFieldLiteral):
                A string specifying the magnetic field model used.
            satellite (Literal["RBSP", "Other"], optional):
                            Specifies the satellite associated with the data. This determines which set of
                            density-related variables are included in the output file. Defaults to "Other".
            data_standard (DataStandard | None, optional):
                An optional `DataStandard` instance to use for standardizing variables.
                If `None`, `ep.data_standards.PRBEMStandard` is used by default.
        """
        self.mag_field = mag_field
        self.data_standard = data_standard or ep.data_standards.PRBEMStandard()

        super().__init__(
            base_data_path=base_data_path,
            satellite=satellite,
            mission=mission,
            instrument=instrument,
            mag_field=self.mag_field,
            file_format="nc",
            data_standard=data_standard,
        )

        output_file_entries = [
            "time",
            "xGEO",
            "MLT",
            "R_eq",
            "Lstar",
            "xGEO_eq",
        ]

        self.dependency_dict = {
            "time": ["time"],
            "xGEO": ["time", "xGEO_components"],
            "MLT": ["time"],
            "R_eq": ["time"],
            "xGEO_eq": ["time", "xGEO_components"],
            "Lstar": ["time"],
        }

        if satellite == "Other":
            output_file_entries += ["density_local", "density_eq"]
            self.dependency_dict |= {"density_local": ["time"], "density_eq": ["time"]}

        elif satellite == "RBSP":
            output_file_entries += [
                "density_emfisis_local",
                "density_efw_local",
                "density_hiss_derived_local",
                "density_emfisis_eq",
                "density_efw_eq",
                "density_hiss_derived_eq",
            ]

            self.dependency_dict |= {
                "density_emfisis_local": ["time"],
                "density_efw_local": ["time"],
                "density_hiss_derived_local": ["time"],
                "density_emfisis_eq": ["time"],
                "density_efw_eq": ["time"],
                "density_hiss_derived_eq": ["time"],
            }

        else:
            msg = "Enountered invalid satellite! Valid names are: 'RBSP', 'Other'."
            raise ValueError(msg)

        self.output_files = [
            OutputFile("full", output_file_entries, save_incomplete=True),
        ]

    def _calculate_dimensions(self, data_dict: dict[str, np.ndarray]) -> dict[str, int]:
        """Calculate density NetCDF dimension sizes from the data dictionary."""
        dimensions = {"time": data_dict["time"].shape[0]}

        has_local_position = "xGEO" in data_dict and data_dict["xGEO"].size > 0
        has_equatorial_position = "xGEO_eq" in data_dict and data_dict["xGEO_eq"].size > 0
        if has_local_position or has_equatorial_position:
            dimensions["xGEO_components"] = 3

        return dimensions

    def standardize_variable(
        self, variable: ep.Variable, name_in_file: str, *, first_call_of_interval: bool
    ) -> ep.Variable:
        """Standardizes a variable based on the configured `DataStandard`.

        This method delegates the standardization process to a `DataStandard` instance,
        ensuring that the variable's units and dimensions are consistent with the
        defined standard.

        Args:
            variable (ep.Variable): The variable instance to be standardized.
            name_in_file (str): The name of the variable as it will appear in the file.
            first_call_of_interval (bool): Flag to indicate if it is the first call of a time interval

        Returns:
            ep.Variable: The standardized variable.
        """
        return self.data_standard.standardize_variable(
            name_in_file, variable, reset_consistency_check=first_call_of_interval
        )

el_paso.typing.FileWriter

Bases: Protocol

Callable interface for writing standardized EL-PASO data to disk.

Source code in el_paso/typing.py
168
169
170
171
172
173
174
175
176
177
178
class FileWriter(Protocol):
    """Callable interface for writing standardized EL-PASO data to disk."""

    def __call__(
        self,
        file_path: Path,
        data_dict: SavedDataDict,
        data_standard: DataStandard,
    ) -> None:
        """Write `data_dict` to `file_path` using `data_standard`."""
        ...

el_paso.typing.GFZMetaData

Bases: DatasetMetadata

Metadata container for GFZStandard.

Attributes:

Name Type Description
datetime VariableMetadata

VariableMetadata

time VariableMetadata

VariableMetadata

energy_channels VariableMetadata

VariableMetadata

alpha_local VariableMetadata

VariableMetadata

alpha_eq_model VariableMetadata

VariableMetadata

alpha_eq_real VariableMetadata

VariableMetadata

InvMu VariableMetadata

VariableMetadata

InvMu_real VariableMetadata

VariableMetadata

InvK VariableMetadata

VariableMetadata

InvV VariableMetadata

VariableMetadata

Lstar VariableMetadata

VariableMetadata

Lm VariableMetadata

VariableMetadata

Flux VariableMetadata

VariableMetadata

PSD VariableMetadata

VariableMetadata

MLT VariableMetadata

VariableMetadata

B_eq VariableMetadata

VariableMetadata

B_total VariableMetadata

VariableMetadata

xGEO VariableMetadata

VariableMetadata

P VariableMetadata

VariableMetadata

R0 VariableMetadata

VariableMetadata

density VariableMetadata

VariableMetadata

Source code in el_paso/dataset/metadata.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
class GFZMetaData(DatasetMetadata):
    """Metadata container for GFZStandard.

    Attributes:
        datetime: VariableMetadata
        time: VariableMetadata
        energy_channels: VariableMetadata
        alpha_local: VariableMetadata
        alpha_eq_model: VariableMetadata
        alpha_eq_real: VariableMetadata
        InvMu: VariableMetadata
        InvMu_real: VariableMetadata
        InvK: VariableMetadata
        InvV: VariableMetadata
        Lstar: VariableMetadata
        Lm: VariableMetadata
        Flux: VariableMetadata
        PSD: VariableMetadata
        MLT: VariableMetadata
        B_eq: VariableMetadata
        B_total: VariableMetadata
        xGEO: VariableMetadata
        P: VariableMetadata
        R0: VariableMetadata
        density: VariableMetadata
    """

    datetime: VariableMetadata
    time: VariableMetadata
    energy_channels: VariableMetadata
    alpha_local: VariableMetadata
    alpha_eq_model: VariableMetadata
    alpha_eq_real: VariableMetadata
    InvMu: VariableMetadata
    InvMu_real: VariableMetadata
    InvK: VariableMetadata
    InvV: VariableMetadata
    Lstar: VariableMetadata
    Lm: VariableMetadata
    Flux: VariableMetadata
    PSD: VariableMetadata
    MLT: VariableMetadata
    B_eq: VariableMetadata
    B_total: VariableMetadata
    xGEO: VariableMetadata  # noqa: N815
    P: VariableMetadata
    R0: VariableMetadata
    density: VariableMetadata

el_paso.typing.GFZStandard

Bases: DataStandard[GFZVarNames]

A data standard used historically at the GFZ German Research Centre for Geosciences.

This standard defines rules for a set of canonical variable names by converting them to correct units and checking their array dimensions for consistency. It is tailored for compatibility with historical GFZ datasets and internal workflows.

Source code in el_paso/data_standards/gfz_standard.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
class GFZStandard(DataStandard[GFZVarNames]):
    """A data standard used historically at the GFZ German Research Centre for Geosciences.

    This standard defines rules for a set of canonical variable names by converting them
    to correct units and checking their array dimensions for consistency. It is tailored
    for compatibility with historical GFZ datasets and internal workflows.
    """

    def __init__(self) -> None:
        """Initializes the GFZStandard with a ConsistencyCheck object."""
        self.consistency_check = ConsistencyCheck()

        self.variable_infos: dict[InternalName, VariableInfo] = {
            "Epoch": VariableInfo[GFZVarNames]("time", "Time in MATLAB datenum format.", ep.units.datenum, ["Epoch"]),
            "Position": VariableInfo[GFZVarNames](
                "xGEO", "Position in geographic cartesian coordinates.", ep.units.RE, ["Epoch", "Position_components"]
            ),
            "Position_geo_alt": VariableInfo[GFZVarNames](
                "geo_alt", "Altitude in geographic cartesian coordinates.", u.km, ["Epoch"]
            ),
            "Position_geo_lat": VariableInfo[GFZVarNames](
                "geo_lat", "Latitude in geographic cartesian coordinates.", u.deg, ["Epoch"]
            ),
            "Position_geo_lon": VariableInfo[GFZVarNames](
                "geo_lon", "Longitude in geographic cartesian coordinates.", u.deg, ["Epoch"]
            ),
            "Energy_FEDU": VariableInfo[GFZVarNames](
                "energy_channels", "Central energy of measured differential flux.", u.MeV, ["Epoch", "Energy_FEDU"]
            ),
            "Energy_FEIU": VariableInfo[GFZVarNames](
                "energy_FEIU", "Central energy of measured integral flux.", u.MeV, ["Epoch", "Energy_FEIU"]
            ),
            "Energy_FEDO": VariableInfo[GFZVarNames](
                "energy_FEDO", "Central energy of measured omnidirecitonal flux.", u.MeV, ["Epoch", "Energy_FEDO"]
            ),
            "Energy_FPDU": VariableInfo[GFZVarNames](
                "energy_FPDU", "Central energy of measured proton differential flux.", u.MeV, ["Epoch", "Energy_FPDU"]
            ),
            "FEDU": VariableInfo[GFZVarNames](
                "Flux",
                "Electron differential unidirectional flux.",
                (u.cm**2 * u.s * u.sr * u.keV) ** (-1),
                ["Epoch", "Energy_FEDU", "Alpha"],
            ),
            "FEIU": VariableInfo[GFZVarNames](
                "FEIU",
                "Electron integral unidirectional flux.",
                (u.cm**2 * u.s * u.sr) ** (-1),
                ["Epoch", "Energy_FEIU", "Alpha"],
            ),
            "FEDO": VariableInfo[GFZVarNames](
                "FEDO",
                "Electron differential omnidirectional flux.",
                (u.cm**2 * u.s * u.keV) ** (-1),
                ["Epoch", "Energy_FEDO", "Alpha_range"],
            ),
            "FPDU": VariableInfo[GFZVarNames](
                "FPDU",
                "Proton differential unidirectional flux.",
                (u.cm**2 * u.s * u.sr * u.keV) ** (-1),
                ["Epoch", "Energy_FPDU", "Alpha"],
            ),
            "Alpha": VariableInfo[GFZVarNames](
                "alpha_local", "Local pitch angles of the particles.", u.radian, ["Epoch", "Alpha"]
            ),
            "Alpha_range": VariableInfo[GFZVarNames](
                "alpha_local_range",
                "Local pitch angle ranges of the particles.",
                u.radian,
                ["Epoch", "Alpha_range", "min_max"],
            ),
            "Alpha_Eq": VariableInfo[GFZVarNames](
                "alpha_eq_model", "Calculated equatorial pitch angles of the particles.", u.radian, ["Epoch", "Alpha"]
            ),
            "Alpha_Eq_range": VariableInfo[GFZVarNames](
                "alpha_eq_range",
                "Equatorial pitch angle ranges of the particles.",
                u.radian,
                ["Epoch", "Alpha_range", "min_max"],
            ),
            "Alpha_LC": VariableInfo[GFZVarNames](
                "alpha_lc", "Local loss cone size at the satellite location.", u.radian, ["Epoch"]
            ),
            "Alpha_LC_Eq": VariableInfo[GFZVarNames](
                "alpha_lc_eq",
                "Local loss cone size at the satellite location mapped to the equator.",
                u.radian,
                ["Epoch"],
            ),
            "PSD": VariableInfo[GFZVarNames](
                "PSD",
                "Calculated phase space density of particles.",
                (u.m * u.kg * u.m / u.s) ** (-3),
                ["Epoch", "Energy_FEDU", "Alpha"],
            ),
            "MLT": VariableInfo[GFZVarNames](
                "MLT", "Magnetic local time at the satellite location.", u.hour, ["Epoch"]
            ),
            "L_star": VariableInfo[GFZVarNames](
                "Lstar", "Calculated Lstar of the particles.", u.dimensionless_unscaled, ["Epoch", "Alpha"]
            ),
            "L_m": VariableInfo[GFZVarNames](
                "Lm", "Calculated Lm of the particles.", u.dimensionless_unscaled, ["Epoch", "Alpha"]
            ),
            "B_Eq": VariableInfo[GFZVarNames]("B_eq", "Calculated magnetic field at the equator.", u.nT, ["Epoch"]),
            "B_Calc": VariableInfo[GFZVarNames](
                "B_total", "Calculated magnetic field at the satellite location.", u.nT, ["Epoch"]
            ),
            "B_total_obs": VariableInfo[GFZVarNames](
                "B_sat", "Observered magnetic field at the satellite location.", u.nT, ["Epoch"]
            ),
            "R_Eq": VariableInfo[GFZVarNames](
                "R0", "Radial distance of the satellite location mapped to the equator.", ep.units.RE, ["Epoch"]
            ),
            "InvMu": VariableInfo[GFZVarNames](
                "InvMu", "Calculated first adiabatic invariant.", u.MeV / u.G, ["Epoch", "Energy_FEDU", "Alpha"]
            ),
            "InvK": VariableInfo[GFZVarNames](
                "InvK", "Calculated modified second adiabatic invariant.", ep.units.RE * u.G**0.5, ["Epoch", "Alpha"]
            ),
            "Wave_frequency": VariableInfo[GFZVarNames](
                "freq", "Frequency of the power spectral density.", u.Hz, ["Wave_frequency"]
            ),
            "Wave_ellipticity": VariableInfo[GFZVarNames](
                "ellipticity",
                "Frequency of the power spectral density.",
                u.dimensionless_unscaled,
                ["Epoch", "Wave_frequency"],
            ),
            "Wave_planarity": VariableInfo[GFZVarNames](
                "planarity",
                "Frequency of the power spectral density.",
                u.dimensionless_unscaled,
                ["Epoch", "Wave_frequency"],
            ),
            "Wave_frequency_bandwidth": VariableInfo[GFZVarNames](
                "freq_bw", "Frequency of the power spectral density.", u.Hz, ["Wave_frequency"]
            ),
            "Wave_normal_angle": VariableInfo[GFZVarNames](
                "wave_wna", "Frequency of the power spectral density.", u.degree, ["Epoch", "Wave_frequency"]
            ),
            "MLat": VariableInfo[GFZVarNames]("MLat", "Frequency of the power spectral density.", u.degree, ["Epoch"]),
            "Magnetic_Power_Spectral_Density": VariableInfo[GFZVarNames](
                "BB", "Frequency of the power spectral density.", u.dimensionless_unscaled, ["Epoch", "Wave_frequency"]
            ),
        }

el_paso.typing.GFZStrategy

Bases: SavingStrategy

A concrete saving strategy for saving data based on the satellite mission into separate monthly files.

This strategy implements the data standard used at GFZ in the past. It organizes the output files into a specific directory structure (e.g., base_path/MISSION/SATELLITE/Processed_Mat_Files/) and standardizes variables to specific units and dimensions before saving. The data is saved in .mat format.

Attributes:

Name Type Description
output_files list[OutputFile]

Pre-defined list of files to be saved, each with a specific set of variables.

base_data_path Path

The root directory for all saved data.

mission str

The name of the space mission (e.g., "MMS").

satellite str

The name of the satellite (e.g., "MMS1").

instrument str

The name of the instrument.

kext str

A model-related identifier, with "TS04" being mapped to "T04s" for backward compatibility.

Methods:

Name Description
__init__

Initializes the strategy with file paths and metadata.

standardize_variable

Standardizes variables to specific units and dimensions based on their name.

get_time_intervals_to_save

Splits the given time range into a list of monthly intervals.

get_file_path

Generates a complete file path based on the mission, satellite, and date.

append_data

Appends new data to an existing file by concatenating NumPy arrays based on time.

Source code in el_paso/saving_strategies/gfz_strategy.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
class GFZStrategy(SavingStrategy):
    """A concrete saving strategy for saving data based on the satellite mission into separate monthly files.

    This strategy implements the data standard used at GFZ in the past.
    It organizes the output files into a specific directory structure
    (e.g., `base_path/MISSION/SATELLITE/Processed_Mat_Files/`) and standardizes
    variables to specific units and dimensions before saving. The data is saved
    in `.mat` format.

    Attributes:
        output_files (list[OutputFile]): Pre-defined list of files to be saved,
            each with a specific set of variables.
        base_data_path (Path): The root directory for all saved data.
        mission (str): The name of the space mission (e.g., "MMS").
        satellite (str): The name of the satellite (e.g., "MMS1").
        instrument (str): The name of the instrument.
        kext (str): A model-related identifier, with "TS04" being mapped to "T04s"
            for backward compatibility.

    Methods:
        __init__: Initializes the strategy with file paths and metadata.
        standardize_variable: Standardizes variables to specific units and dimensions based on their name.
        get_time_intervals_to_save: Splits the given time range into a list of monthly intervals.
        get_file_path: Generates a complete file path based on the mission, satellite, and date.
        append_data: Appends new data to an existing file by concatenating NumPy arrays based on time.
    """

    output_files: list[OutputFile]

    file_path: Path

    def __init__(
        self,
        base_data_path: str | Path,
        mission: str,
        satellite: str,
        instrument: str,
        mag_field: ep.typing.MagneticFieldLiteral,
        data_standard: Optional[DataStandard[StandardName]] = None,
    ) -> None:
        """Initializes the data organization strategy.

        Args:
            base_data_path (str | Path): The base directory for saving all data.
            mission (str): The mission name.
            satellite (str): The satellite name.
            instrument (str): The instrument name.
            mag_field (str): The model extension type. "TS04" is remapped to "T04s".
            data_standard (DataStandard | None, optional): An optional `DataStandard` instance to use for
                standardizing variables. If `None`, `ep.data_standards.GFZStandard` is used by default.
        """
        self.base_data_path = Path(base_data_path)
        self.mission = mission
        self.satellite = satellite
        self.instrument = instrument
        self.data_standard = data_standard or GFZStandard()

        # for backwards compatibility
        if mag_field == "TS04":
            mag_field = "T04s"
        self.mag_field = mag_field

        self.output_files = [
            OutputFile("flux", ["Epoch", "FEDU"]),
            OutputFile("alpha_and_energy", ["Epoch", "Alpha", "Alpha_Eq", "Energy_FEDU"]),
            OutputFile("mlt", ["Epoch", "MLT"]),
            OutputFile("lstar", ["Epoch", "L_star"]),
            OutputFile("lm", ["Epoch", "L_m"]),
            OutputFile("psd", ["Epoch", "PSD"]),
            OutputFile("xGEO", ["Epoch", "Position"]),
            OutputFile("invmu_and_invk", ["Epoch", "InvMu", "InvK"]),
            OutputFile("bfield", ["Epoch", "B_Eq", "B_Calc"]),
            OutputFile("R0", ["Epoch", "R_Eq"]),
        ]

        self._loader = ep.utils.load_mat_data

    def get_time_intervals_to_save(self, start_time: datetime | None, end_time: datetime | None) -> list[TimeInterval]:
        """Splits the time range into a list of full-month intervals.

        This method iterates from the start month to the end month, creating a new
        (start, end) tuple for each calendar month.

        Args:
            start_time (datetime | None): The start of the time range.
            end_time (datetime | None): The end of the time range.

        Returns:
            list[TimeInterval]: A list of tuples, where each tuple represents a
                monthly time interval.

        Raises:
            ValueError: If either `start_time` or `end_time` is not provided.
        """
        time_intervals: list[TimeInterval] = ep.utils.get_monthly_datetime_intervals(start_time, end_time)

        return time_intervals

    def get_file_path_stem(self) -> Path:
        return self.base_data_path / self.mission.upper() / self.satellite.lower() / "Processed_Mat_Files"

    def get_file_name_stem(self) -> str:
        return self.satellite.lower() + "_" + self.instrument.lower()

    def get_file_path(self, interval_start: datetime, interval_end: datetime, output_file: OutputFile) -> Path:
        """Generates a structured file path for the given time interval and output file.

        The path follows a specific format:
        `base_path/MISSION/SATELLITE/Processed_Mat_Files/satellite_instrument_YYYYMMDDtoYYYYMMDD_filename_ver4.mat`

        Args:
            interval_start (datetime): The start of the time interval.
            interval_end (datetime): The end of the time interval.
            output_file (OutputFile): The output file configuration.

        Returns:
            Path: The generated file path.
        """
        interval = ep.utils.get_monthly_datetime_intervals(interval_start, interval_end)[0]
        start_year_month_day = interval[0].strftime("%Y%m%d")
        end_year_month_day = interval[1].strftime("%Y%m%d")

        file_name = self.get_file_name_stem() + f"_{start_year_month_day}to{end_year_month_day}_{output_file.name}"

        if output_file.name in ["alpha_and_energy", "lstar", "lm", "invmu_and_invk", "mlt", "bfield", "R0"]:
            file_name += f"_n4_4_{self.mag_field}"

        file_name += "_ver4.mat"

        return self.get_file_path_stem() / file_name

el_paso.typing.MonthlyRBStrategy

Bases: SavingStrategy

Save PRBEM-standard data into one monthly file per interval.

The strategy supports NetCDF, CDF, HDF5, and MATLAB output through a format dispatch table. Existing files can be appended by loading the current file, replacing overlapping timestamps with the new data block, and atomically rewriting the merged data.

Source code in el_paso/saving_strategies/monthly_rb_strategy.py
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
class MonthlyRBStrategy(SavingStrategy):
    """Save PRBEM-standard data into one monthly file per interval.

    The strategy supports NetCDF, CDF, HDF5, and MATLAB output through a format
    dispatch table. Existing files can be appended by loading the current file,
    replacing overlapping timestamps with the new data block, and atomically
    rewriting the merged data.
    """

    output_files: list[OutputFile]
    dependency_dict: dict[InternalName, list[str]]

    def __init__(
        self,
        base_data_path: str | Path,
        mission: str,
        satellite: str,
        instrument: str,
        mag_field: MagneticFieldLiteral,
        data_standard: DataStandard[StandardName],
        file_format: MFSFormats = "nc",
    ) -> None:
        """Initialize a monthly file saving strategy.

        Args:
            base_data_path (str | Path): Directory where monthly files are written.
            mission (str): Mission name, used in file path and name generation.
            satellite (str): Satellite name, used in file path and name generation.
            instrument (str): Instrument name, used in file path and name generation.
            mag_field (MagneticFieldLiteral): Magnetic field model name. Monthly files use one model.
            file_format (MFSFormats): One of ``"nc"``, ``"cdf"``, ``"h5"``, or ``"mat"``.
                A leading dot is also accepted.
            data_standard (DataStandard): Instance of the data standard implementation.

        Attributes:
            output_files: List of output file configurations, with variable names
                defined by ``_get_output_file_entries()``.
            dependency_dict: Dictionary defining NetCDF dimension dependencies for
                all variables in ``output_files``.
        """
        self.base_data_path = Path(base_data_path)
        self.mission = mission
        self.satellite = satellite
        self.instrument = instrument
        self.mag_field = mag_field
        self.data_standard = data_standard
        self.file_format = ep.utils.normalize_file_format(file_format)

        self.output_files = [
            OutputFile("full", self._get_output_file_entries(), save_incomplete=True),
        ]

    def _get_output_file_entries(self) -> list[InternalName]:
        """Return the standard variable list plus user-defined custom variables."""
        return [
            "FEDU",
            "Epoch",
            "Alpha_Eq",
            "Energy_FEDU",
            "Alpha",
            "B_Calc",
            "B_Eq",
            "InvK",
            "InvMu",
            "Position",
            "PSD",
            "R_Eq",
            "MLT",
            "L_m",
            "L_star",
        ]

    def _sanitize_dimension_name(self, variable_name: str) -> str:
        """Return a NetCDF-safe root dimension name derived from a variable path."""
        return "".join(char if char.isalnum() else "_" for char in variable_name).strip("_") or "custom"

    def get_time_intervals_to_save(self, start_time: datetime | None, end_time: datetime | None) -> list[TimeInterval]:
        """Split the requested time range into full monthly intervals."""
        time_intervals: list[TimeInterval] = []

        if start_time is None or end_time is None:
            msg = "start_time and end_time must be provided for MonthlyRBStrategy!"
            raise ValueError(msg)

        current_time = start_time.replace(day=1)
        while current_time <= end_time:
            year = current_time.year
            month = current_time.month
            eom_day = calendar.monthrange(year, month)[1]

            month_start = datetime(year, month, 1, 0, 0, 0, tzinfo=timezone.utc)
            month_end = datetime(year, month, eom_day, 23, 59, 59, tzinfo=timezone.utc)
            time_intervals.append((month_start, month_end))
            current_time = (
                datetime(year + 1, 1, 1, tzinfo=timezone.utc)
                if month == 12
                else datetime(year, month + 1, 1, tzinfo=timezone.utc)
            )

        return time_intervals

    def get_file_path_stem(self) -> Path:
        return self.base_data_path / self.mission.upper() / self.satellite.lower()

    def get_file_name_stem(self) -> str:
        return self.satellite.lower() + "_" + self.instrument.lower()

    def get_file_path(self, interval_start: datetime, interval_end: datetime, output_file: OutputFile) -> Path:  # noqa: ARG002
        """Generate the monthly file path for the configured format."""
        start_year_month_day = interval_start.strftime("%Y%m%d")
        end_year_month_day = interval_end.strftime("%Y%m%d")
        file_name = (
            f"{self.get_file_name_stem()}_{start_year_month_day}to{end_year_month_day}_"
            f"{self.mag_field}{self.file_format}"
        )

        return self.get_file_path_stem() / file_name

el_paso.typing.OutputFile

Bases: NamedTuple

Represents an output file with its name and a list of variable names to save.

Attributes:

Name Type Description
name str

The name of the output file.

names_to_save list[str]

List of variable names to be saved in the output file.

save_incomplete bool

If True, allows saving even if some variables are missing.

Source code in el_paso/saving_strategy.py
38
39
40
41
42
43
44
45
46
47
48
49
class OutputFile(NamedTuple):
    """Represents an output file with its name and a list of variable names to save.

    Attributes:
        name (str): The name of the output file.
        names_to_save (list[str]): List of variable names to be saved in the output file.
        save_incomplete (bool): If True, allows saving even if some variables are missing.
    """

    name: str
    names_to_save: list[InternalName]
    save_incomplete: bool = False

el_paso.typing.PRBEMMetaData

Bases: DatasetMetadata

Metadata container for PRBEMStandard.

Attributes:

Name Type Description
datetime VariableMetadata

VariableMetadata

Epoch VariableMetadata

VariableMetadata

FEDU VariableMetadata

VariableMetadata

FEDO VariableMetadata

VariableMetadata

FEIU VariableMetadata

VariableMetadata

Energy_FEDU VariableMetadata

VariableMetadata

Alpha VariableMetadata

VariableMetadata

Alpha_Eq VariableMetadata

VariableMetadata

Position VariableMetadata

VariableMetadata

B_Calc VariableMetadata

VariableMetadata

B_Eq VariableMetadata

VariableMetadata

L_star VariableMetadata

VariableMetadata

I VariableMetadata

VariableMetadata

MLT VariableMetadata

VariableMetadata

L_m VariableMetadata

VariableMetadata

PSD VariableMetadata

VariableMetadata

R_Eq VariableMetadata

VariableMetadata

InvMu VariableMetadata

VariableMetadata

InvK VariableMetadata

VariableMetadata

Source code in el_paso/dataset/metadata.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
class PRBEMMetaData(DatasetMetadata):
    """Metadata container for PRBEMStandard.

    Attributes:
        datetime: VariableMetadata
        Epoch: VariableMetadata
        FEDU: VariableMetadata
        FEDO: VariableMetadata
        FEIU: VariableMetadata
        Energy_FEDU: VariableMetadata
        Alpha: VariableMetadata
        Alpha_Eq: VariableMetadata
        Position: VariableMetadata
        B_Calc: VariableMetadata
        B_Eq: VariableMetadata
        L_star: VariableMetadata
        I: VariableMetadata
        MLT: VariableMetadata
        L_m: VariableMetadata
        PSD: VariableMetadata
        R_Eq: VariableMetadata
        InvMu: VariableMetadata
        InvK: VariableMetadata
    """

    datetime: VariableMetadata
    Epoch: VariableMetadata
    FEDU: VariableMetadata
    FEDO: VariableMetadata
    FEIU: VariableMetadata
    Energy_FEDU: VariableMetadata
    Alpha: VariableMetadata
    Alpha_Eq: VariableMetadata
    Position: VariableMetadata
    B_Calc: VariableMetadata
    B_Eq: VariableMetadata
    L_star: VariableMetadata
    I: VariableMetadata  # noqa: E741
    MLT: VariableMetadata
    L_m: VariableMetadata
    PSD: VariableMetadata
    R_Eq: VariableMetadata
    InvMu: VariableMetadata
    InvK: VariableMetadata

el_paso.typing.PRBEMStandard

Bases: DataStandard[PRBEMName]

A data standard of the Panel for Radiation Belt Environment Modeling (PRBEM).

This class defines and applies a specific set of data standards for variables defined by the PRBEM. It standardizes variables by converting them to canonical units and performing consistency checks on their dimensions and shapes, ensuring they conform to the expected format for each standard name.

Source code in el_paso/data_standards/prbem_standard.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
class PRBEMStandard(DataStandard[PRBEMName]):
    """A data standard of the Panel for Radiation Belt Environment Modeling (PRBEM).

    This class defines and applies a specific set of data standards for variables
    defined by the [PRBEM](https://prbem.github.io/documents/Standard_File_Format.pdf).
    It standardizes variables by converting them to canonical units and performing
    consistency checks on their dimensions and shapes, ensuring they conform to the
    expected format for each standard name.
    """

    def __init__(self) -> None:
        """Initializes the PRBEMStandard with a ConsistencyCheck object."""
        self.consistency_check = ConsistencyCheck()

        self.variable_infos: dict[str, VariableInfo[PRBEMName]] = {
            "Epoch": VariableInfo[PRBEMName]("Epoch", "Posix Time", ep.units.posixtime, dependencies=["Epoch"]),
            "FEDU": VariableInfo[PRBEMName](
                "FEDU",
                "Processed unidirectional differential electron flux",
                (u.cm**2 * u.s * u.sr * u.keV) ** (-1),
                dependencies=["Epoch", "Energy_FEDU", "Alpha"],
            ),
            "FPDU": VariableInfo[PRBEMName](
                "FPDU",
                "Processed unidirectional differential proton flux",
                (u.cm**2 * u.s * u.sr * u.keV) ** (-1),
                dependencies=["Epoch", "Energy_FPDU", "Alpha"],
            ),
            "Alpha": VariableInfo[PRBEMName](
                "Alpha", "Local pitch angle the instrument is looking at", u.deg, dependencies=["Epoch", "Alpha"]
            ),
            "Alpha_Eq": VariableInfo[PRBEMName](
                "Alpha_Eq",
                "Computed equatorial pitch angle the instrument is looking from Alpha, B_Calc and B_Eq",
                u.deg,
                dependencies=["Epoch", "Alpha"],
            ),
            "Energy_FEDU": VariableInfo[PRBEMName](
                "Energy_FEDU",
                "Central energy of unidirectional differential electron flux",
                u.MeV,
                dependencies=["Epoch", "Energy_FEDU"],
            ),
            "Energy_FPDU": VariableInfo[PRBEMName](
                "Energy_FPDU",
                "Central energy of unidirectional differential proton flux",
                u.MeV,
                dependencies=["Epoch", "Energy_FPDU"],
            ),
            "Position": VariableInfo[PRBEMName](
                "Position",
                "Spacecraft position in geographic cartesian coordinates",
                u.km,
                dependencies=["Epoch", "Position_components"],
            ),
            "B_Calc": VariableInfo[PRBEMName](
                "B_Calc",
                "Calculated magnetic field strength at the spacecraft position",
                u.nT,
                dependencies=["Epoch"],
            ),
            "B_Eq": VariableInfo[PRBEMName](
                "B_Eq",
                "Calculated magnetic field strength at magnetic equator",
                u.nT,
                dependencies=["Epoch"],
            ),
            "L_m": VariableInfo[PRBEMName](
                "L_m",
                "Calculated L McIlwain's L parameter",
                u.dimensionless_unscaled,
                dependencies=["Epoch", "Alpha"],
            ),
            "L_star": VariableInfo[PRBEMName](
                "L_star",
                "Calculated Roederer's L* parameter",
                u.dimensionless_unscaled,
                dependencies=["Epoch", "Alpha"],
            ),
            "InvMu": VariableInfo[PRBEMName](
                "InvMu", "Calculated first adiabatic invariant.", u.MeV / u.G, ["Epoch", "Energy_FEDU", "Alpha"]
            ),
            "InvK": VariableInfo[PRBEMName](
                "InvK", "Calculated modified second adiabatic invariant.", ep.units.RE * u.G**0.5, ["Epoch", "Alpha"]
            ),
            "R_Eq": VariableInfo[PRBEMName](
                "R_Eq", "Radial distance of the satellite location mapped to the equator.", ep.units.RE, ["Epoch"]
            ),
            "PSD": VariableInfo[PRBEMName](
                "PSD",
                "Calculated phase space density of particles.",
                (u.m * u.kg * u.m / u.s) ** (-3),
                ["Epoch", "Energy_FEDU", "Alpha"],
            ),
            "MLT": VariableInfo[PRBEMName]("MLT", "Magnetic local time at the satellite location.", u.hour, ["Epoch"]),
        }

el_paso.typing.SavingStrategy

Bases: ABC

Abstract base class for defining strategies to save output files with specific time intervals and variables.

Attributes:

Name Type Description
output_files list[OutputFile]

List of output files to be managed by the saving strategy.

data_standard DataStandard[StandardName]

The data standard that defines the variable naming convention.

base_data_path Path

The base path where output files will be saved.

satellite str

The name of the satellite for which data is being saved.

mission str

The name of the mission for which data is being saved.

instrument str

The name of the instrument for which data is being saved.

mag_field MagneticFieldLiteral

The magnetic field model used for saving data, if applicable.

Methods:

Name Description
get_time_intervals_to_save

Abstract method to determine the time intervals for saving data between start_time and end_time.

get_file_path

Abstract method to generate the file path for a given time interval and output file.

standardize_variable

Abstract method to standardize a variable before saving, possibly renaming or formatting it.

get_target_variables

Selects and prepares variables to be saved in the output file, optionally truncating them to a time range.

save_single_file

Saves the provided dictionary to a file in the specified format (.mat, .h5, .nc, .cdf), optionally appending data.

append_data

Appends data to an existing output file by merging it with newly computed data and rewriting the file. Supported for any format with a registered loader/writer.

Source code in el_paso/saving_strategy.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
class SavingStrategy(ABC):
    """Abstract base class for defining strategies to save output files with specific time intervals and variables.

    Attributes:
        output_files (list[OutputFile]): List of output files to be managed by the saving strategy.
        data_standard (DataStandard[StandardName]): The data standard that defines the variable naming convention.
        base_data_path (Path): The base path where output files will be saved.
        satellite (str): The name of the satellite for which data is being saved.
        mission (str): The name of the mission for which data is being saved.
        instrument (str): The name of the instrument for which data is being saved.
        mag_field (MagneticFieldLiteral): The magnetic field model used for saving data, if applicable.

    Methods:
        get_time_intervals_to_save:
            Abstract method to determine the time intervals for saving data between start_time and end_time.

        get_file_path:
            Abstract method to generate the file path for a given time interval and output file.

        standardize_variable:
            Abstract method to standardize a variable before saving, possibly renaming or formatting it.

        get_target_variables:
            Selects and prepares variables to be saved in the output file, optionally truncating them to a time range.

        save_single_file:
            Saves the provided dictionary to a file in the specified format (.mat, .h5, .nc, .cdf),
            optionally appending data.

        append_data:
            Appends data to an existing output file by merging it with newly computed data
            and rewriting the file. Supported for any format with a registered loader/writer.
    """

    output_files: list[OutputFile]
    data_standard: DataStandard[StandardName]
    base_data_path: Path
    satellite: str
    mission: str
    instrument: str
    mag_field: MagneticFieldLiteral

    def __repr__(self) -> str:
        cls = type(self)

        constructor_params = inspect.signature(cls.__init__).parameters

        args = []

        for name in constructor_params:
            if name == "self":
                continue

            if hasattr(self, name):
                value = getattr(self, name)
                args.append(f"{name}={value!r}")

        return f"{cls.__name__}({', '.join(args)})"

    def __str__(self) -> str:
        return self.__repr__()

    @abstractmethod
    def get_time_intervals_to_save(self, start_time: datetime, end_time: datetime) -> list[TimeInterval]:
        """Generates a list of time intervals to save between the specified start and end times.

        Args:
            start_time (datetime): The starting datetime for the intervals.
            end_time (datetime): The ending datetime for the intervals.

        Returns:
            list[TimeInterval]: A list of tuples, each representing a time interval (start, end)
                                             to be saved.
        """

    @abstractmethod
    def get_file_path(self, interval_start: datetime, interval_end: datetime, output_file: OutputFile) -> Path:
        """Generates a file path for saving variables based on the provided interval and output file information.

        Args:
            interval_start (datetime): The start of the interval for which the file is being generated.
            interval_end (datetime): The end of the interval for which the file is being generated.
            output_file (OutputFile): An OutputFile containing the name of the output file,
                                      and which variables should be saved in this file.

        Returns:
            Path: The generated file path where the output data should be saved.
        """

    def standardize_variable(
        self,
        variable: ep.Variable,
        internal_name: InternalName,
        *,
        first_call_of_interval: bool,
    ) -> ep.Variable:
        """Standardize a variable through the configured data standard."""
        return self.data_standard.standardize_variable(
            internal_name, variable, reset_consistency_check=first_call_of_interval
        )

    def save_single_file(self, file_path: Path, dict_to_save: SavedDataDict, *, append: bool = False) -> None:
        """Save one monthly file, optionally appending to an existing file."""
        file_path.parent.mkdir(parents=True, exist_ok=True)
        format_name = ep.utils.normalize_file_format(file_path.suffix)
        writer = _writers.get(format_name)

        if writer is None:
            msg = f"The '{format_name}' format is not implemented."
            logger.error(msg)
            raise NotImplementedError(msg)

        if file_path.exists() and append:
            logger.info(f"Appending and saving to existing file: {file_path.resolve()}")
            self.append_data(file_path, dict_to_save)
            return

        logger.info(f"Saving file: {file_path.resolve()}")

        writer(file_path, dict_to_save, self.data_standard)

    def append_data(self, file_path: Path, data_dict_to_save: SavedDataDict) -> SavedDataDict:
        """Append data to any supported monthly file format.

        Existing data is loaded with the loader for ``file_path.suffix``, merged
        by timestamp with the new dictionary, and written to a temporary file
        before replacing the original file.
        """
        if not file_path.exists():
            msg = f"Cannot append: file does not exist: {file_path}"
            raise FileNotFoundError(msg)

        new_time = np.asarray(data_dict_to_save["Epoch"])
        if int(new_time.shape[0]) == 0:
            logger.info(f"No new time data to insert for {file_path.name}")
            return data_dict_to_save

        format_name = ep.utils.normalize_file_format(file_path.suffix)
        loader = _loaders.get(format_name)
        writer = _writers.get(format_name)
        if loader is None or writer is None:
            msg = f"Appending to '{format_name}' files is not supported by MonthlyRBStrategy."
            logger.error(msg)
            raise NotImplementedError(msg)

        if format_name == ".nc":
            self._validate_netcdf_appendable(file_path)

        logger.info(f"Loading existing data from {file_path.name}")
        existing_data = loader(file_path)

        logger.info(f"Merging and sorting data for {file_path.name}")
        merged_data = self._merge_and_sort_data(existing_data, data_dict_to_save)

        with tempfile.NamedTemporaryFile(suffix=format_name, delete=False, dir=file_path.parent) as tmp_file:
            tmp_path = Path(tmp_file.name)

        try:
            logger.info(f"Writing merged data to temporary file {tmp_path.name}")
            writer(tmp_path, merged_data, self.data_standard)

            logger.info(f"Replacing original file with merged data for {file_path.name}")
            shutil.move(str(tmp_path), str(file_path))
            logger.info(f"Successfully inserted data into {file_path.resolve()}")

            return merged_data  # noqa: TRY300
        except Exception:
            if tmp_path.exists():
                tmp_path.unlink()
            logger.exception("Failed to write merged data to temporary file")
            raise

    def _merge_and_sort_data(
        self,
        existing_data: dict[StandardName | Literal["metadata"], Any],
        new_data: SavedDataDict,
    ) -> SavedDataDict:
        """Merge two dictionaries along the time axis, replacing duplicate times."""

        def _normalize_1d(arr: np.ndarray) -> np.ndarray:
            arr = np.asarray(arr)
            if arr.ndim == 2 and arr.shape[1] == 1:
                return arr.reshape(-1)
            return arr

        existing_data_internal: SavedDataDict = {}
        for name, value in existing_data.items():
            if name == "metadata":
                existing_data_internal["metadata"] = value
            else:
                internal_name = self.data_standard.get_internal_name(name)
                if internal_name is None:
                    msg = f"Could not find necessary internal name for variable: {name}"
                    raise ValueError(msg)
                existing_data_internal[internal_name] = value

        existing_time = _normalize_1d(existing_data_internal["Epoch"])
        new_time = _normalize_1d(new_data["Epoch"])
        mask_keep_existing = ~np.isin(existing_time, new_time)
        insert_idx = int(np.searchsorted(existing_time, new_time[0]))

        merged: SavedDataDict = {}
        existing_metadata = existing_data_internal.get("metadata", {})
        new_metadata = new_data.get("metadata", {})
        if isinstance(existing_metadata, dict) and isinstance(new_metadata, dict):
            merged["metadata"] = {**existing_metadata, **new_metadata}
        elif "metadata" in new_data:
            merged["metadata"] = new_metadata
        elif "metadata" in existing_data_internal:
            merged["metadata"] = existing_metadata

        all_keys = set(existing_data_internal.keys()) | set(new_data.keys())
        for key in all_keys:
            if key == "metadata" or key.startswith("__"):
                continue

            if key not in existing_data_internal:
                merged[key] = new_data[key]
                continue

            if key not in new_data:
                merged[key] = existing_data_internal[key]
                continue

            v1 = _normalize_1d(np.asarray(existing_data_internal[key]))
            v2 = _normalize_1d(np.asarray(new_data[key]))

            if v1.ndim != v2.ndim:
                msg = f"{key}: ndim mismatch {v1.shape} vs {v2.shape}"
                logger.error(msg)
                raise ValueError(msg)

            if v1.ndim > 1 and v1.shape[1:] != v2.shape[1:]:
                msg = f"{key}: shape mismatch {v1.shape} vs {v2.shape}"
                logger.error(msg)
                raise ValueError(msg)

            v1_trunc = v1[mask_keep_existing]
            merged_val = v2 if v1_trunc.size == 0 else np.insert(v1_trunc, insert_idx, v2, axis=0)

            if key == "Epoch":
                t = np.asarray(merged_val)
                if len(np.unique(t)) != len(t):
                    msg = "Time values are not unique after merge for key 'time'"
                    logger.error(msg)
                    raise ValueError(msg)

            merged[key] = merged_val

        return merged

    def _validate_netcdf_appendable(self, file_path: Path) -> None:
        """Validate that the existing NetCDF file has an unlimited time dimension."""
        with nC.Dataset(file_path, "r", format="NETCDF4") as file:
            time_dim = file.dimensions.get("Epoch")
            if time_dim is None or not time_dim.isunlimited():
                msg = (
                    "Cannot append: the existing NetCDF file does not have an "
                    "unlimited 'Epoch' dimension. Recreate the file with 'Epoch' "
                    "created as unlimited (None)."
                )
                raise ValueError(msg)

    @abstractmethod
    def get_file_path_stem(self) -> Path:
        pass

    @abstractmethod
    def get_file_name_stem(self) -> str:
        pass

    def get_target_variables(
        self,
        output_file: OutputFile,
        variables_dict: dict[InternalName, ep.Variable],
        time_var: ep.Variable | None,
        start_time: datetime | None,
        end_time: datetime | None,
    ) -> dict[InternalName, ep.Variable] | None:
        """Retrieves and processes target variables for saving based on the specified output file.

        Args:
            output_file (OutputFile): The output file configuration containing variable names to save.
            variables_dict (dict[str, Variable]): Dictionary mapping variable names to Variable objects.
            time_var (Variable | None): The time variable used for truncation, if applicable.
            start_time (datetime | None): The start time for truncating variables, if specified.
            end_time (datetime | None): The end time for truncating variables, if specified.

        Returns:
            dict[str, Variable] | None:
                - A dictionary of processed Variable objects keyed by their names,
                    or None if any specified variable name is not found in variables_dict.

        Notes:
            - If no variable names are specified in output_file, all variables in variables_dict are processed.
            - Variables are deep-copied before processing.
            - Each variable is standardized using the `standardize_variable` method.
            - If a requested variable name is not found, a warning is issued and None is returned.
        """
        target_variables: dict[InternalName, ep.Variable] = {}
        first_call_of_interval = True

        # if no variables have been specified, we save all of them
        if len(output_file.names_to_save) == 0:
            for key, var in variables_dict.items():
                var_to_save = deepcopy(var)

                if start_time is not None and end_time is not None and time_var is not None:
                    var_to_save.truncate(time_var, start_time.timestamp(), end_time.timestamp())
                var_to_save = self.standardize_variable(var_to_save, key, first_call_of_interval=first_call_of_interval)
                first_call_of_interval = False

                target_variables[key] = var_to_save

            return target_variables

        missing_names = []

        for name_to_save in output_file.names_to_save:
            if name_to_save in variables_dict:
                var_to_save = deepcopy(variables_dict[name_to_save])

                if start_time is not None and end_time is not None and time_var is not None:
                    var_to_save.truncate(time_var, start_time.timestamp(), end_time.timestamp())

                var_to_save = self.standardize_variable(
                    var_to_save, name_to_save, first_call_of_interval=first_call_of_interval
                )
                first_call_of_interval = False

                target_variables[name_to_save] = var_to_save
            else:
                missing_names.append(name_to_save)
                if output_file.save_incomplete:
                    target_variables[name_to_save] = ep.Variable(
                        original_unit=u.dimensionless_unscaled, data=np.array([])
                    )
                else:
                    return None

        if len(missing_names) > 0:
            msg = f"Could not find target variable(s) {', '.join(sorted(missing_names))}!"
            logger.warning(msg, stacklevel=2)

        return target_variables

    def get_output_file(
        self, *, standard_name: StandardName | None = None, internal_name: InternalName | None = None
    ) -> OutputFile | None:
        if internal_name is None:
            if standard_name is None:
                msg = "Either standard_name or internal_name must be provided!"
                raise ValueError(msg)
            internal_name = self.data_standard.get_internal_name(standard_name)

        if internal_name is None:
            return None

        for output_file in self.output_files:
            if internal_name in output_file.names_to_save:
                return output_file

        return None

    def get_all_standard_names(self) -> list[StandardName]:
        all_standard_names: list[StandardName] = []

        for output_file in self.output_files:
            all_standard_names.extend(
                [self.data_standard.get_standard_name(internal_name) for internal_name in output_file.names_to_save]
            )

        return list(set(all_standard_names))

el_paso.typing.SingleFileStrategy

Bases: SavingStrategy

A concrete saving strategy that saves all data to a single file.

This strategy implements the SavingStrategy abstract methods to manage saving all variables for the entire time range into a single output file. It is a simple, non-partitioning approach. Supports multiple file formats including MATLAB (.mat), HDF5 (.h5), NetCDF4 (.nc), and CDF (.cdf). Users can also register custom format writers for additional file formats.

Attributes:

Name Type Description
file_path Path

The path to the single output file where all data will be saved.

output_files list[OutputFile]

List of output files to be managed.

Methods:

Name Description
__init__

Initializes the strategy with file path and optional custom writers.

get_time_intervals_to_save

Returns the entire time range as a single interval.

get_file_path

Always returns the pre-defined single file path.

standardize_variable

Passes the variable through without any standardization.

save_single_file

Saves data to a file in the specified format using the dispatch table.

register_writer

Registers a custom format writer for a file extension.

Supported Formats
  • .mat: MATLAB format using scipy.io.savemat
  • .h5: HDF5 format using h5py with optional gzip compression
  • .nc: NetCDF4 format using netCDF4 with optional compression
  • .cdf: CDF (Common Data Format) using cdflib with gzip compression
  • Custom: Any user-defined format via register_writer() or format_writers parameter
Example
def write_custom(file_path: Path, data_dict: dict[str, Any]) -> None:
    # Custom writer implementation
    pass
strategy = SingleFileStrategy("output.myformat",format_writers={".myformat": write_custom})
ep.save(variables, saving_strategy=strategy, ...)
Source code in el_paso/saving_strategies/single_file_strategy.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
class SingleFileStrategy(SavingStrategy):
    """A concrete saving strategy that saves all data to a single file.

    This strategy implements the `SavingStrategy` abstract methods to manage saving all variables
    for the entire time range into a single output file. It is a simple, non-partitioning approach.
    Supports multiple file formats including MATLAB (.mat), HDF5 (.h5), NetCDF4 (.nc), and CDF (.cdf).
    Users can also register custom format writers for additional file formats.

    Attributes:
        file_path (Path): The path to the single output file where all data will be saved.
        output_files (list[OutputFile]): List of output files to be managed.

    Methods:
        __init__(file_path, format_writers): Initializes the strategy with file path and optional custom writers.
        get_time_intervals_to_save: Returns the entire time range as a single interval.
        get_file_path: Always returns the pre-defined single file path.
        standardize_variable: Passes the variable through without any standardization.
        save_single_file: Saves data to a file in the specified format using the dispatch table.
        register_writer: Registers a custom format writer for a file extension.

    Supported Formats:
        - .mat: MATLAB format using scipy.io.savemat
        - .h5: HDF5 format using h5py with optional gzip compression
        - .nc: NetCDF4 format using netCDF4 with optional compression
        - .cdf: CDF (Common Data Format) using cdflib with gzip compression
        - Custom: Any user-defined format via register_writer() or format_writers parameter

    Example:
        ```python
        def write_custom(file_path: Path, data_dict: dict[str, Any]) -> None:
            # Custom writer implementation
            pass
        strategy = SingleFileStrategy("output.myformat",format_writers={".myformat": write_custom})
        ep.save(variables, saving_strategy=strategy, ...)
        ```
    """

    output_files: list[OutputFile]
    file_path: Path
    _writers: dict[str, SingleFileFormatWriter]

    def __init__(
        self,
        file_path: str | Path,
        format_writers: dict[str, SingleFileFormatWriter] | None = None,
    ) -> None:
        """Initializes the SingleFileStrategy with the specified file path and optional custom format writers.

        Args:
            file_path (str | Path): The full path to the output file. The file extension determines
                the format unless a custom writer is registered.
            format_writers (dict[str, SingleFileFormatWriter] | None): Optional dictionary mapping file extensions
                (including the dot, e.g., ".myformat") to custom writer functions. Custom writers override
                built-in writers for the same extension. Defaults to None.

        Example:
            ```python
            def write_custom(file_path: Path, data_dict: dict[str, Any]) -> None:
                # Custom writer implementation
                pass
            strategy = SingleFileStrategy("output.myformat",format_writers={".myformat": write_custom})
            ep.save(variables, saving_strategy=strategy, ...)
            ```
        """
        self.file_path = Path(file_path)
        self.output_files = [OutputFile(self.file_path.name, [])]

        # Build the dispatch table with built-in writers
        self._writers: dict[str, SingleFileFormatWriter] = {
            ".mat": self._write_mat_file,
            ".h5": self._write_h5_file,
            ".nc": self._write_netcdf_file,
            ".cdf": self._write_cdf_file,
        }

        # Register custom writers (these override built-in writers if same extension)
        if format_writers:
            self._writers.update(format_writers)

    def get_file_path_stem(self) -> None:  # ty:ignore[invalid-method-override]
        pass

    def get_file_name_stem(self) -> None:  # ty:ignore[invalid-method-override]
        pass

    def get_time_intervals_to_save(self, start_time: datetime, end_time: datetime) -> list[TimeInterval]:
        """Returns the entire time range as a single interval.

        This strategy does not split data by time; it saves everything in one go.

        Args:
            start_time (datetime): The start time of the data range.
            end_time (datetime): The end time of the data range.

        Returns:
            list[TimeInterval]: A list containing a single tuple with the start and end times.
        """
        return [(start_time, end_time)]

    def get_file_path(
        self,
        interval_start: datetime,  # noqa: ARG002
        interval_end: datetime,  # noqa: ARG002
        output_file: OutputFile,  # noqa: ARG002
    ) -> Path:
        """Returns the pre-defined single file path, ignoring the interval.

        This method ensures all data is saved to the same file, regardless of the time interval.

        Args:
            interval_start (datetime): The start of the time interval (ignored).
            interval_end (datetime): The end of the time interval (ignored).
            output_file (OutputFile): The output file configuration (ignored).

        Returns:
            Path: The `file_path` of this strategy instance.
        """
        return self.file_path

    def standardize_variable(
        self,
        variable: Variable,
        internal_name: InternalName,  # noqa: ARG002
        *,
        first_call_of_interval: bool,  # noqa: ARG002
    ) -> Variable:
        """Does not modify the variable.

        This strategy does not perform any specific standardization on the variables before saving.

        Args:
            variable (Variable): The variable instance to be standardized.
            internal_name (InternalName): The internal name of the variable (ignored).
            first_call_of_interval (bool): Flag to indicate if it is the first call of a time interval

        Returns:
            Variable: The original variable instance, unchanged.
        """
        return variable

    def register_writer(self, extension: str, writer: SingleFileFormatWriter) -> None:
        """Register a custom format writer for a file extension.

        This method allows you to register custom writers for file formats not natively supported,
        or to override built-in writers. Custom writers are called when a file with the matching
        extension is saved.

        Args:
            extension (str): The file extension (including the dot), e.g., ".myformat" or ".bin".
            writer (SingleFileFormatWriter): A callable with signature `(Path, dict[str, Any]) -> None` that
                handles writing the data dictionary to the specified file path.

        Example:
            ```python
            def write_binary(path: Path, data: dict[str, Any]) -> None:
                import struct
                with open(path, 'wb') as f:
                    for key, value in data.items():
                        if key != "metadata":
                            f.write(value.tobytes())
            strategy = SingleFileStrategy("output.dat")
            strategy.register_writer(".dat", write_binary)
            ```
        """
        if not extension.startswith("."):
            extension = "." + extension
        self._writers[extension.lower()] = writer

    def _write_metadata_to_netcdf_variable(self, data_set: nC.Variable[Any], metadata: dict[str, Any]) -> None:
        """Attach metadata values that can be represented as NetCDF attributes."""
        for key, value in metadata.items():
            if isinstance(value, list):
                value = ", ".join(str(item) for item in value)

            if getattr(value, "size", None) == 0:
                continue

            setattr(data_set, key, value)

    def _write_netcdf_file(self, file_path: Path, data_dict: dict[str, Any]) -> None:
        """Write data dictionary to NetCDF4 (.nc) format.

        Creates hierarchical groups based on paths (e.g., "group1/group2/dataset" becomes nested groups).
        Applies zlib compression, shuffle filter, and creates dimension variables automatically.
        Writes metadata as variable attributes.

        Args:
            file_path (Path): Path to save the .nc file.
            data_dict (dict[str, Any]): Dictionary with variable data and metadata.
                Keys are path strings (e.g., "var_name" or "group/subgroup/var_name").
                The "metadata" key is skipped; metadata is stored as variable attributes.
        """
        with nC.Dataset(file_path, "w", format="NETCDF4") as file:
            for path, value in data_dict.items():
                if path == "metadata":
                    continue

                if value.size == 0:
                    continue

                path_parts = path.split("/")
                groups = path_parts[:-1]
                dataset_name = path_parts[-1]

                curr_hierarchy: nC.Group | nC.Dataset = file
                for group in groups:
                    if group not in curr_hierarchy.groups:
                        curr_hierarchy = curr_hierarchy.createGroup(group)
                    else:
                        curr_hierarchy = curr_hierarchy.groups[group]

                dimensions = []
                for axis, size in enumerate(value.shape):
                    dimension_name = f"{dataset_name}_dim_{axis}"
                    if dimension_name not in curr_hierarchy.dimensions:
                        curr_hierarchy.createDimension(dimension_name, size)
                    dimensions.append(dimension_name)

                data_set = typing.cast(
                    "nC.Variable[Any]",
                    curr_hierarchy.createVariable(
                        dataset_name, value.dtype, dimensions, zlib=True, complevel=5, shuffle=True
                    ),
                )

                data_set[...] = value

                if path in data_dict.get("metadata", {}):
                    self._write_metadata_to_netcdf_variable(data_set, data_dict["metadata"][path])

    def save_single_file(self, file_path: Path, dict_to_save: dict[str, Any], *, append: bool = False) -> None:  # ty:ignore[invalid-method-override]
        """Saves variable data to a single file in one of the supported formats.

        The file format is determined by the file extension. Built-in formats include .mat, .h5, .nc, and .cdf.
        Custom format writers can be registered via the format_writers parameter during initialization or
        via the register_writer() method.

        It is primarily designed to be used with the `el_paso.save()` function, which handles the logic of determining
        what data to save and when.

        Args:
            file_path (Path): The path to the file where the dictionary will be saved.
                              The file extension determines the format.
            dict_to_save (dict[str, Any]): The dictionary containing variable data to save.
                Keys are variable names (strings), values are NumPy arrays or other serializable data.
                Should include a "metadata" key with metadata dictionary.
            append (bool, optional): If True, attempts to append to an existing file.
                Only supported for CDF format. For other formats, raises NotImplementedError.
                Defaults to False.

        Raises:
            NotImplementedError: If the file format is not registered or supported,
                or if append is requested for formats that don't support it.
            Any exception raised by the format writer function.

        Supported Built-in Formats:
            - .mat: MATLAB format using scipy.io.savemat
            - .h5: HDF5 format using h5py with gzip compression
            - .nc: NetCDF4 format using netCDF4 with compression
            - .cdf: CDF (Common Data Format) using cdflib with gzip compression
        """
        logger.info(f"Saving file {file_path.name}...")

        file_path.parent.mkdir(parents=True, exist_ok=True)
        format_name = file_path.suffix.lower()

        # Look up the writer in the dispatch table
        writer = self._writers.get(format_name)

        if writer is None:
            msg = f"The '{format_name}' format is not implemented. Registered formats: {list(self._writers.keys())}"
            logger.error(msg)
            raise NotImplementedError(msg)

        if append:
            msg = f"Appending to existing files is not supported for '{format_name}' format."
            logger.error(msg)
            raise NotImplementedError(msg)
        writer(file_path, dict_to_save)

    def _write_mat_file(self, file_path: Path, data_dict: dict[str, Any]) -> None:
        """Write data dictionary to MATLAB .mat format.

        Args:
            file_path (Path): Path to save the .mat file.
            data_dict (dict[str, Any]): Dictionary with variable data and metadata.
        """
        savemat(str(file_path), data_dict)

    def _write_h5_file(self, file_path: Path, data_dict: dict[str, Any]) -> None:
        """Write data dictionary to HDF5 (.h5) format.

        Creates hierarchical groups based on paths (e.g., "group1/group2/dataset" becomes nested groups).
        Applies gzip compression and shuffling to all datasets. Writes metadata as dataset attributes.

        Args:
            file_path (Path): Path to save the .h5 file.
            data_dict (dict[str, Any]): Dictionary with variable data and metadata.
                Keys are path strings (e.g., "var_name" or "group/subgroup/var_name").
                The "metadata" key is skipped; metadata is stored as dataset attributes.
        """
        with h5py.File(file_path, "w") as file:
            for path, value in data_dict.items():
                if path == "metadata":
                    continue

                path_parts = path.split("/")
                groups = path_parts[:-1]
                dataset_name = path_parts[-1]

                curr_hierachy = file
                for group in groups:
                    if group not in curr_hierachy:
                        curr_hierachy = curr_hierachy.create_group(group)
                    else:
                        curr_hierachy = typing.cast("h5py.Group", curr_hierachy[group])

                data_set = curr_hierachy.create_dataset(dataset_name, data=value, compression="gzip", shuffle=True)

                if path in data_dict["metadata"]:
                    for key, metadata in data_dict["metadata"][path].items():
                        data_set.attrs[key] = metadata

    def _write_cdf_file(self, file_path: Path, data_dict: dict[str, Any]) -> None:
        """Write data dictionary to CDF (Common Data Format) format.

        Converts NumPy arrays to appropriate CDF data types and writes them as zVariables.
        Supports global attributes and per-variable attributes from the metadata dictionary.
        Applies gzip compression (Compress=6) to all variables.

        Args:
            file_path (Path): Path to save the .cdf file.
            data_dict (dict[str, Any]): Dictionary with variable data and metadata.
                Keys are variable names. The "metadata" key contains global and variable attributes.
                Metadata should follow the format: {var_name: {attr_name: attr_value, ...}, ...}
        """
        try:
            cdf_file = cdflib.cdfwrite.CDF(str(file_path), delete=True)

            try:
                metadata = data_dict.get("metadata")

                if isinstance(metadata, dict):
                    global_attrs: dict[str, dict[int, Any]] = {}

                    for attr_name, attr_value in metadata.items():
                        attr_name_str = str(attr_name)

                        if isinstance(attr_value, dict):
                            keys = list(attr_value.keys())
                            if all(isinstance(k, (int, np.integer)) or str(k).isdigit() for k in keys):
                                global_attrs[attr_name_str] = {int(k): v for k, v in attr_value.items()}
                            else:
                                for sub_key, sub_val in attr_value.items():
                                    if isinstance(sub_val, (list, tuple)) and len(sub_val) == 0:
                                        logger.warning(f"Skipping empty global attribute {attr_name_str}_{sub_key}")
                                        continue
                                    flat_name = f"{attr_name_str}_{sub_key}"
                                    global_attrs[flat_name] = {0: sub_val}

                        elif isinstance(attr_value, (list, tuple)):
                            if len(attr_value) == 0:
                                logger.warning(f"Skipping empty global attribute {attr_name_str}")
                                continue
                            global_attrs[attr_name_str] = dict(enumerate(attr_value))

                        else:
                            global_attrs[attr_name_str] = {0: attr_value}

                    if global_attrs:
                        cdf_file.write_globalattrs(global_attrs)

                for var_name, var_data in data_dict.items():
                    if var_name == "metadata":
                        continue

                    if getattr(var_data, "size", 0) == 0:
                        logger.warning(f"Skipping empty variable {var_name}")
                        continue

                    var_data_array = np.asarray(var_data)
                    if np.issubdtype(var_data_array.dtype, np.integer):
                        if var_data_array.dtype == np.int8:
                            cdf_dtype = cdflib.cdfwrite.CDF.CDF_INT1
                        elif var_data_array.dtype == np.int16:
                            cdf_dtype = cdflib.cdfwrite.CDF.CDF_INT2
                        elif var_data_array.dtype == np.int32:
                            cdf_dtype = cdflib.cdfwrite.CDF.CDF_INT4
                        else:
                            cdf_dtype = cdflib.cdfwrite.CDF.CDF_INT8

                    elif np.issubdtype(var_data_array.dtype, np.floating):
                        if var_data_array.dtype == np.float32:
                            cdf_dtype = cdflib.cdfwrite.CDF.CDF_FLOAT
                        else:
                            cdf_dtype = cdflib.cdfwrite.CDF.CDF_DOUBLE

                    else:
                        var_data_array = var_data_array.astype(np.float64)
                        cdf_dtype = cdflib.cdfwrite.CDF.CDF_DOUBLE

                    var_spec: dict[str, Any] = {
                        "Variable": var_name,
                        "Data_Type": cdf_dtype,
                        "Num_Elements": 1,
                        "Rec_Vary": True,
                        "Dim_Sizes": (list(var_data_array.shape[1:]) if var_data_array.ndim > 1 else []),
                    }

                    var_attrs: dict[str, Any] = {
                        "Compress": 6,
                    }

                    cdf_file.write_var(
                        var_spec,
                        var_attrs=var_attrs,
                        var_data=var_data_array,
                    )

            finally:
                cdf_file.close()

        except Exception as e:
            msg = f"Failed to write CDF file {file_path}: {e}"
            logger.exception(msg)
            raise RuntimeError(msg) from e

el_paso.typing.Variable

Variable class holding data and metadata.

Attributes:

Name Type Description
_data NDArray[generic]

The numerical data of the variable.

metadata VariableMetadata

An instance of VariableMetadata holding information about the variable.

Source code in el_paso/variable.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
class Variable:
    """Variable class holding data and metadata.

    Attributes:
        _data (NDArray[np.generic]): The numerical data of the variable.
        metadata (VariableMetadata): An instance of `VariableMetadata` holding
            information about the variable.
    """

    __slots__ = "_data", "metadata"

    _data: NDArray[np.generic]
    metadata: VariableMetadata

    def __init__(
        self,
        original_unit: u.UnitBase,
        data: NDArray[np.generic] | None = None,
        description: str = "",
        processing_notes: str = "",
        standard_name: StandardName = "",  # ty:ignore[invalid-parameter-default]
    ) -> None:
        """Initializes a Variable instance.

        Args:
            original_unit (u.UnitBase): The original unit of the data.
            data (NDArray[np.generic] | None): The numerical data. Defaults to an empty
                numpy array if None.
            description (str): A description of the variable. Defaults to "".
            processing_notes (str): Notes on how the data was processed. Defaults to "".
            standard_name (StandardName): The standard name of the variable. Defaults to "".
        """
        self._data = np.array([]) if data is None else data

        self.metadata = VariableMetadata(
            unit=original_unit,
            description=description,
            processing_notes=processing_notes,
            standard_name=standard_name,
        )

    def __repr__(self) -> str:
        """Returns a string representation of the Variable object."""
        return f"Variable holding {self._data.shape} data points with metadata: {self.metadata}"

    def convert_to_unit(self, target_unit: u.UnitBase | str) -> None:
        """Converts the data to a given unit.

        Args:
            target_unit (u.UnitBase | str): The unit the data should be converted to.
        """
        if isinstance(target_unit, str):
            target_unit = u.Unit(target_unit)

        if self.metadata.unit != target_unit:
            data_with_unit = u.Quantity(self._data, self.metadata.unit)
            self._data = typing.cast("NDArray[np.generic]", data_with_unit.to_value(target_unit))

            self.metadata.unit = target_unit

    @overload
    def get_data(self, target_unit: u.UnitBase | str) -> NDArray[np.floating | np.integer]: ...

    @overload
    def get_data(self, target_unit: None = None) -> NDArray[np.generic]: ...

    def get_data(self, target_unit: u.UnitBase | str | None = None) -> NDArray[np.generic]:
        """Gets the data of the variable.

        Args:
            target_unit (u.UnitBase | str | None): The unit to convert the data to
                before returning. If None, the data is returned in its current unit.
                Defaults to None.

        Returns:
            NDArray[np.generic]: The data of the variable.

        Raises:
            TypeError: If `target_unit` is provided and the data is not numeric.
        """
        if target_unit is None:
            return self._data

        if isinstance(target_unit, str):
            target_unit = u.Unit(target_unit)

        if not np.issubdtype(self._data.dtype, np.number):
            msg = f"Unit conversion is only supported for numeric types! Encountered for variable {self}."
            raise TypeError(msg)

        return typing.cast("NDArray[np.generic]", u.Quantity(self._data, self.metadata.unit).to_value(target_unit))

    def set_data(self, data: NDArray[np.generic], unit: Literal["same"] | str | u.UnitBase) -> None:  # noqa: PYI051
        """Sets the data and optionally updates the unit of the variable.

        Args:
            data (NDArray[np.generic]): The new data array.
            unit (Literal["same"] | str | u.UnitBase): The unit of the new data.
                If "same", the existing unit is kept. Can be a string representation
                of a unit or an `astropy.units.UnitBase` object.

        Raises:
            TypeError: If `unit` is not "same", a string, or an `astropy.units.UnitBase` object.
        """
        self._data = data

        if isinstance(unit, str):
            if unit != "same":
                self.metadata.unit = u.Unit(unit)
        elif isinstance(unit, u.UnitBase):
            self.metadata.unit = unit
        else:
            msg = "unit must be either a str or a astropy unit!"
            raise TypeError(msg)

    def transpose_data(self, seq: list[int] | tuple[int, ...]) -> None:
        """Transposes the internal data array.

        Args:
            seq (list[int] | tuple[int, ...]): The axes to transpose to. See
                `numpy.transpose` for details.
        """
        self._data = np.transpose(self._data, axes=seq)

    def apply_mask(self, mask: NDArray[np.bool_]) -> None:
        """Applies a boolean mask to the data.

        Elements where the mask is False are invalidated by setting them to NaN.

        Args:
            mask (NDArray[np.bool_]): Boolean array of the same shape as the data.
                False indicates values to be masked.

        Raises:
            TypeError: If the data is not a floating-point numeric type.
        """
        if not np.issubdtype(self._data.dtype, np.floating):
            msg = f"Masking is only supported for floating-point types! Encountered for variable {self}."
            raise TypeError(msg)

        self._data[~mask] = np.nan

    def apply_thresholds_on_data(self, lower_threshold: float = -np.inf, upper_threshold: float = np.inf) -> None:
        """Applies lower and upper thresholds to the data.

        Values outside the thresholds (exclusive) are set to NaN.

        Args:
            lower_threshold (float): The lower bound for the data. Defaults to
                negative infinity.
            upper_threshold (float): The upper bound for the data. Defaults to
                positive infinity.

        Raises:
            TypeError: If the data is not a floating-point numeric type.
        """
        if not np.issubdtype(self._data.dtype, np.floating):
            msg = f"Thresholds are only supported for floating-point types! Encountered for variable {self}."
            raise TypeError(msg)
        self._data = typing.cast("NDArray[np.floating]", self._data)

        self._data = np.where((self._data > lower_threshold) & (self._data < upper_threshold), self._data, np.nan)

    def truncate(self, time_variable: Variable, start_time: float | datetime, end_time: float | datetime) -> None:
        """Truncates the variable's data based on a time variable and a time range.

        Args:
            time_variable (Variable): A `Variable` object containing the time data.
            start_time (float | datetime): The start time for truncation. Can be a
                Unix timestamp (float) or a `datetime` object.
            end_time (float | datetime): The end time for truncation. Can be a
                Unix timestamp (float) or a `datetime` object.

        Raises:
            ValueError: If the length of the variable's data does not match the
                length of the `time_variable`'s data.
        """
        if isinstance(start_time, datetime):
            start_time = enforce_utc_timezone(start_time).timestamp()
        if isinstance(end_time, datetime):
            end_time = enforce_utc_timezone(end_time).timestamp()

        if self._data.shape[0] != time_variable.get_data().shape[0]:
            msg = f"Encountered length missmatch between variable and time variable! Variable: {self}"
            raise ValueError(msg)

        time_var_data = time_variable.get_data(ep.units.posixtime)

        self._data = self._data[(time_var_data >= start_time) & (time_var_data <= end_time)]

    def __hash__(self) -> int:
        """Computes a hash value for the variable based on its holding data.

        Returns:
            int: The integer hash value.
        """
        return hash(self._data.tobytes())

el_paso.typing.VariableInfo

Bases: NamedTuple, Generic[T_co]

A named tuple to store information about a variable in a data standard.

Source code in el_paso/data_standard.py
29
30
31
32
33
34
35
class VariableInfo(NamedTuple, Generic[T_co]):
    """A named tuple to store information about a variable in a data standard."""

    standard_name: T_co
    description: str
    unit: u.UnitBase
    dependencies: list[InternalName | FixedDimensionName]

el_paso.typing.VariableMetadata dataclass

A class holding the metadata of a variable.

Attributes:

Name Type Description
unit UnitBase

The unit of the variable. Defaults to u.dimensionless_unscaled.

original_cadence_seconds float

The original cadence of the data in seconds. Defaults to 0.

source_files list[str]

The list of SourceFiles, which variable contains data from. Defaults to an empty list.

description str

The description of the variable explaining what kind of data this variable contains. Defaults to "".

processing_notes str

The processing notes of the variable explaining all steps done to achieve the final result. Defaults to "".

standard_name str

The name of the standard variable this variable complies to. Defaults to "".

Source code in el_paso/variable.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
@dataclass
class VariableMetadata:
    """A class holding the metadata of a variable.

    Attributes:
        unit (u.UnitBase): The unit of the variable. Defaults to
            `u.dimensionless_unscaled`.
        original_cadence_seconds (float): The original cadence of the data in seconds.
            Defaults to 0.
        source_files (list[str]): The list of SourceFiles, which variable contains
            data from. Defaults to an empty list.
        description (str): The description of the variable explaining what kind of data
            this variable contains. Defaults to "".
        processing_notes (str): The processing notes of the variable explaining all
            steps done to achieve the final result. Defaults to "".
        standard_name (str): The name of the standard variable this variable complies
            to. Defaults to "".
    """

    unit: u.UnitBase = u.dimensionless_unscaled
    original_cadence_seconds: float = 0
    source_files: list[str] = field(default_factory=list[str])
    description: str = ""
    processing_notes: str = ""
    standard_name: str = ""

    def __post_init__(self) -> None:
        """Initializes the processing_steps_counter attribute to 1 after the dataclass has been instantiated.

        This method is automatically called by the dataclass after the __init__ method.
        """
        self.processing_steps_counter = 1

        if ep.is_in_release_mode():
            self.processing_notes += ep.get_release_msg() + "\n"

    def add_processing_note(self, processing_note: str) -> None:
        """Adds a processing note to the metadata.

        The note is prefixed with the current processing steps counter and a newline
        character is appended. The processing steps counter is then incremented.

        Args:
            processing_note (str): The note to be added to the processing notes.
        """
        processing_note = f"{self.processing_steps_counter}) {processing_note}\n"

        self.processing_notes += processing_note
        self.processing_steps_counter += 1

el_paso.typing.VariableRequest module-attribute

VariableRequest = Sequence[
    tuple[
        MagFieldVarTypes,
        MagneticFieldLiteral | MagneticField,
    ]
]

Type alias for a request to compute magnetic field variables, consisting of a sequence of tuples where each tuple specifies the variable type and the magnetic field model to use for its computation.

el_paso.typing.ExtractionInfo dataclass

Store metadata required to extract a variable from a source file.

Attributes:

Name Type Description
name_or_column str | int

Name of the variable or column to extract from the source file.

unit UnitBase

Physical unit associated with the extracted variable.

is_time_dependent bool

Whether the variable is time-dependent.

If True, data from multiple files will be concatenated along the time axis.

If False, data from multiple files will be used to fill missing (np.nan) values instead of being concatenated.

result_key str | None

Key to use for the extracted variable in the resulting variables dictionary.

If None, name_or_column is used as the key.

dependent_variables list[str] | None

Names of variables that the extracted variable depends on.

This is mainly used for JSON extraction to determine how extracted data should be reshaped.

np_dtype DTypeLike | None

Optional NumPy dtype used to cast the extracted data.

If None, the dtype is inferred from the source data.

Source code in el_paso/extract_variables_from_files.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
@dataclass(frozen=True, slots=True, eq=False)
class ExtractionInfo:
    """Store metadata required to extract a variable from a source file.

    Attributes:
        name_or_column:
            Name of the variable or column to extract from the source file.

        unit:
            Physical unit associated with the extracted variable.

        is_time_dependent:
            Whether the variable is time-dependent.

            If ``True``, data from multiple files will be concatenated
            along the time axis.

            If ``False``, data from multiple files will be used to fill
            missing (`np.nan`) values instead of being concatenated.

        result_key:
            Key to use for the extracted variable in the resulting
            variables dictionary.

            If ``None``, ``name_or_column`` is used as the key.

        dependent_variables:
            Names of variables that the extracted variable depends on.

            This is mainly used for JSON extraction to determine how
            extracted data should be reshaped.

        np_dtype:
            Optional NumPy dtype used to cast the extracted data.

            If ``None``, the dtype is inferred from the source data.
    """

    name_or_column: str | int
    unit: u.UnitBase
    is_time_dependent: bool = True
    result_key: str | None = None
    dependent_variables: list[str] | None = None
    np_dtype: DTypeLike | None = None