Skip to content

ESA (NGRM)

el_paso.recipes.esa.process_ngrm_satellite.process_ngrm_electron_fluxes

process_ngrm_electron_fluxes

Process ESA NGRM electron flux data for the given satellite into omnidirectional fluxes and PSD.

This downloads the NGRM L1d CSV data for the given satellite from the ESA SWE HAPI service, extracts the differential electron flux channels, time, ECI position, and L-shell, converts the time stamps to POSIX time and the ECI position to GEO coordinates, and combines the flux channels into a single omnidirectional differential flux variable (converted to (cm^2 s keV)^-1 units with negative values clipped to zero). The data is then time-binned to bin_cadence, magnetic field model quantities (B_Calc, B_Eq, MLT_Eq, R_Eq, Alpha_Eq, L_m, and optionally L_star) are computed using the T89 model for a fixed set of local pitch angles, the pitch angle distribution (FEDU) and phase space density (PSD) are derived from the omnidirectional flux, and all resulting variables are saved to disk (appending to existing files) using either the provided saving_strategy or a default daily/monthly strategy depending on the satellite.

Parameters:

Name Type Description Default
satellite Literal['EDRS-C', 'S6-MF', 'MTG-S1', 'MTG-I1']

Identifier of the NGRM-carrying satellite to process.

required
raw_data_path str | Path

Base directory used for downloading and locating the raw NGRM CSV files.

required
processed_data_path str | Path

Base directory in which the processed output files are saved.

required
start_time datetime

Start of the time range to process.

required
end_time datetime

End of the time range to process.

required
num_cores int

Number of CPU cores used for the magnetic field computations. Defaults to 32.

32
bin_cadence timedelta

Time binning cadence applied to the extracted variables. Defaults to timedelta(seconds=10).

timedelta(seconds=10)
skip_existing bool

If True, skip downloading files that already exist locally. Defaults to True.

True
client_id str | None

Client ID for the ESA SWE authentication. If None, it is read from the CLIENT_ID environment variable. Defaults to None.

None
client_secret str | None

Client secret for the ESA SWE authentication. If None, it is read from the CLIENT_SECRET environment variable. Defaults to None.

None
saving_strategy SavingStrategy | None

Strategy used to save the processed variables. If None, a DailyLEORBStrategy is used for "S6-MF" and a MonthlyRBStrategy for all other satellites. Defaults to None.

None
calculate_Lstar bool

If True, also compute the L* magnetic field quantity. Defaults to True.

True

Raises:

Type Description
ValueError

If client_id or client_secret is not provided and not available via the CLIENT_ID/CLIENT_SECRET environment variables.

Source code in el_paso/recipes/esa/process_ngrm_satellite.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
@timed_function("process_ngrm_electron_fluxes")
def process_ngrm_electron_fluxes(
    satellite: Literal["EDRS-C", "S6-MF", "MTG-S1", "MTG-I1"],
    raw_data_path: str | Path,
    processed_data_path: str | Path,
    start_time: datetime,
    end_time: datetime,
    num_cores: int = 32,
    bin_cadence: timedelta = timedelta(seconds=10),
    skip_existing: bool = True,  # noqa: FBT001, FBT002,
    client_id: str | None = None,
    client_secret: str | None = None,
    saving_strategy: ep.SavingStrategy | None = None,
    *,
    calculate_Lstar: bool = True,
) -> None:
    """Process ESA NGRM electron flux data for the given satellite into omnidirectional fluxes and PSD.

    This downloads the NGRM L1d CSV data for the given satellite from the ESA SWE HAPI service,
    extracts the differential electron flux channels, time, ECI position, and L-shell, converts
    the time stamps to POSIX time and the ECI position to GEO coordinates, and combines the flux
    channels into a single omnidirectional differential flux variable (converted to
    (cm^2 s keV)^-1 units with negative values clipped to zero). The data is then time-binned to
    `bin_cadence`, magnetic field model quantities (B_Calc, B_Eq, MLT_Eq, R_Eq, Alpha_Eq, L_m, and
    optionally L_star) are computed using the T89 model for a fixed set of local pitch angles, the
    pitch angle distribution (FEDU) and phase space density (PSD) are derived from the omnidirectional
    flux, and all resulting variables are saved to disk (appending to existing files) using either the
    provided `saving_strategy` or a default daily/monthly strategy depending on the satellite.

    Args:
        satellite (Literal["EDRS-C", "S6-MF", "MTG-S1", "MTG-I1"]): Identifier of the NGRM-carrying
            satellite to process.
        raw_data_path (str | Path): Base directory used for downloading and locating the raw NGRM CSV files.
        processed_data_path (str | Path): Base directory in which the processed output files are saved.
        start_time (datetime): Start of the time range to process.
        end_time (datetime): End of the time range to process.
        num_cores (int, optional): Number of CPU cores used for the magnetic field computations. Defaults to 32.
        bin_cadence (timedelta, optional): Time binning cadence applied to the extracted variables.
            Defaults to timedelta(seconds=10).
        skip_existing (bool, optional): If True, skip downloading files that already exist locally.
            Defaults to True.
        client_id (str | None, optional): Client ID for the ESA SWE authentication. If None, it is read
            from the `CLIENT_ID` environment variable. Defaults to None.
        client_secret (str | None, optional): Client secret for the ESA SWE authentication. If None, it
            is read from the `CLIENT_SECRET` environment variable. Defaults to None.
        saving_strategy (ep.SavingStrategy | None, optional): Strategy used to save the processed
            variables. If None, a `DailyLEORBStrategy` is used for "S6-MF" and a `MonthlyRBStrategy`
            for all other satellites. Defaults to None.
        calculate_Lstar (bool, optional): If True, also compute the L* magnetic field quantity.
            Defaults to True.

    Raises:
        ValueError: If `client_id` or `client_secret` is not provided and not available via the
            `CLIENT_ID`/`CLIENT_SECRET` environment variables.
    """
    data_path_stem = f"{raw_data_path}/NGRM/{satellite}/YYYY/MM/"
    file_name_stem = f"{satellite}_ngrm_YYYYMMDD_L1d.csv"

    if client_id is None:
        client_id = os.environ.get("CLIENT_ID")
    if client_secret is None:
        client_secret = os.environ.get("CLIENT_SECRET")

    if client_id is None:
        msg = "Client ID not found! Either load it from environment variables or pass it as an argument."
        raise ValueError(msg)

    if client_secret is None:
        msg = "Client secret not found! Either load it from environment variables or pass it as an argument."
        raise ValueError(msg)

    ep.download(
        start_time,
        end_time,
        save_path=data_path_stem,
        file_cadence="daily",
        download_url=SATELLITE_TO_ID[satellite],
        file_name_stem="",
        rename_file_name_stem=file_name_stem,
        authentication_info=(client_id, client_secret),
        method="esa_swe",
        skip_existing=skip_existing,
    )

    flux_unit = typing.cast("u.Unit", (u.cm**2 * u.s * u.sr * u.MeV) ** (-1))

    extraction_infos = [
        ep.ExtractionInfo(result_key="Epoch_iso", name_or_column="Time", unit=u.dimensionless_unscaled),
        ep.ExtractionInfo(
            result_key="FEDO_ch1", name_or_column="Differential electron flux (0.18 MeV)", unit=flux_unit
        ),
        ep.ExtractionInfo(
            result_key="FEDO_ch2", name_or_column="Differential electron flux (0.27 MeV)", unit=flux_unit
        ),
        ep.ExtractionInfo(
            result_key="FEDO_ch3", name_or_column="Differential electron flux (0.40 MeV)", unit=flux_unit
        ),
        ep.ExtractionInfo(
            result_key="FEDO_ch4", name_or_column="Differential electron flux (0.60 MeV)", unit=flux_unit
        ),
        ep.ExtractionInfo(
            result_key="FEDO_ch5", name_or_column="Differential electron flux (0.88 MeV)", unit=flux_unit
        ),
        ep.ExtractionInfo(
            result_key="FEDO_ch6", name_or_column="Differential electron flux (1.30 MeV)", unit=flux_unit
        ),
        ep.ExtractionInfo(
            result_key="FEDO_ch7", name_or_column="Differential electron flux (1.93 MeV)", unit=flux_unit
        ),
        ep.ExtractionInfo(
            result_key="FEDO_ch8", name_or_column="Differential electron flux (2.90 MeV)", unit=flux_unit
        ),
        ep.ExtractionInfo(
            result_key="FEDO_ch9", name_or_column="Differential electron flux (3.40 MeV)", unit=flux_unit
        ),
        ep.ExtractionInfo(
            result_key="FEDO_ch10", name_or_column="Differential electron flux (4.00 MeV)", unit=flux_unit
        ),
        ep.ExtractionInfo(result_key="x_ECI", name_or_column="X", unit=u.km),
        ep.ExtractionInfo(result_key="y_ECI", name_or_column="Y", unit=u.km),
        ep.ExtractionInfo(result_key="z_ECI", name_or_column="Z", unit=u.km),
        ep.ExtractionInfo(result_key="L", name_or_column="L", unit=ep.units.RE),
    ]
    try:
        variables = ep.extract_variables_from_files(
            start_time,
            end_time,
            file_cadence="daily",
            data_path=data_path_stem,
            file_name_stem=file_name_stem,
            extraction_infos=extraction_infos,
            pd_read_csv_kwargs={"index_col": False},
        )
    except Exception:
        logger.exception(f"Error extracting variables for {satellite}")
        return

    time_format = "%Y-%m-%dT%H:%M:%S.%fZ" if satellite in ["MTG-I1", "MTG-S1"] else "%Y-%m-%dT%H:%M:%SZ"

    # convert iso strings to posixtime
    datetimes = ep.processing.convert_string_to_datetime(variables["Epoch_iso"], time_format=time_format)
    variables["Epoch"] = ep.Variable(
        data=np.asarray([t.timestamp() for t in datetimes]), original_unit=ep.units.posixtime
    )
    del variables["Epoch_iso"]

    # convert ECI coordinates to GEO using astropy
    coords_ECI = GCRS(
        CartesianRepresentation(
            x=variables["x_ECI"].get_data(), y=variables["y_ECI"].get_data(), z=variables["z_ECI"].get_data()
        ),
        obstime=datetimes,
    )
    coords_ITRS = coords_ECI.transform_to(ITRS(obstime=datetimes))
    xgeo_data = np.stack((coords_ITRS.x, coords_ITRS.y, coords_ITRS.z)).T

    variables["xGEO"] = ep.Variable(data=xgeo_data.value, original_unit=u.km)  # ty:ignore[unresolved-attribute]
    del variables["x_ECI"], variables["y_ECI"], variables["z_ECI"]

    # create flux variable
    flux_data = np.stack(
        [
            variables["FEDO_ch1"].get_data(),
            variables["FEDO_ch2"].get_data(),
            variables["FEDO_ch3"].get_data(),
            variables["FEDO_ch4"].get_data(),
            variables["FEDO_ch5"].get_data(),
            variables["FEDO_ch6"].get_data(),
            variables["FEDO_ch7"].get_data(),
            variables["FEDO_ch8"].get_data(),
            variables["FEDO_ch9"].get_data(),
            variables["FEDO_ch10"].get_data(),
        ]
    ).T.astype(np.float64)

    # convert to proper omnidirectional units
    flux_data = flux_data * 4 * np.pi

    variables["FEDO"] = ep.Variable(data=flux_data, original_unit=flux_unit * u.sr)
    for i in range(1, 11):
        del variables[f"FEDO_ch{i}"]
    variables["FEDO"].apply_thresholds_on_data(lower_threshold=0)
    variables["FEDO"].convert_to_unit((u.cm**2 * u.s * u.keV) ** (-1))

    # get energies
    variables["Energy"] = ep.Variable(data=np.asarray(NGRM_ENERGIES), original_unit=u.MeV)

    time_bin_methods = {
        "Energy": ep.TimeBinMethod.Repeat,
        "FEDO": ep.TimeBinMethod.NanMedian,
        "FEDU": ep.TimeBinMethod.NanMedian,
        "xGEO": ep.TimeBinMethod.NanMean,
        "L": ep.TimeBinMethod.NanMean,
    }

    binned_time_var = ep.processing.bin_by_time(
        variables["Epoch"], variables, time_bin_methods, bin_cadence, start_time=start_time, end_time=end_time
    )

    pa_local_data = np.tile(np.arange(5, 91, 5), (len(binned_time_var.get_data()), 1)).astype(np.float64)
    variables["PA_local_FEDU"] = ep.Variable(data=pa_local_data, original_unit=u.deg)

    variables_to_compute: ep.processing.VariableRequest = [
        ("B_Calc", "T89"),
        ("B_Eq", "T89"),
        ("MLT_Eq", "T89"),
        ("B_Eq", "T89"),
        ("R_Eq", "T89"),
        ("Alpha_Eq", "T89"),
        ("L_m", "T89"),
    ]

    if calculate_Lstar:
        variables_to_compute.append(("L_star", "T89"))  # ty:ignore[invalid-argument-type]

    magnetic_field_variables = ep.processing.compute_magnetic_field_variables(
        time_var=binned_time_var,
        xgeo_var=variables["xGEO"],
        energy_var=variables["Energy"],
        pa_local_var=variables["PA_local_FEDU"],
        particle_species="electron",
        variables_to_compute=variables_to_compute,
        irbem_options=ep.processing.magnetic_field_utils.IrbemOptions(
            lstar_quantity=ep.processing.magnetic_field_utils.LstarQuantity.LSTAR
            if calculate_Lstar
            else ep.processing.magnetic_field_utils.LstarQuantity.NONE,
        ),
        num_cores=num_cores,
    )

    variables |= magnetic_field_variables

    FEDU_var = ep.processing.construct_pitch_angle_distribution(
        variables["FEDO"], variables["PA_local_FEDU"], magnetic_field_variables["Alpha_Eq_T89"]
    )
    FEDU_var.apply_thresholds_on_data(lower_threshold=0)

    psd_var = ep.processing.compute_phase_space_density(FEDU_var, variables["Energy"], particle_species="electron")

    variables_to_save: dict[ep.typing.InternalName, ep.Variable] = {
        "Epoch": binned_time_var,
        "FEDU": FEDU_var,
        # "FEDO": variables["FEDO"], disabled for now, since Alpha_range is missing
        "Energy_FEDU": variables["Energy"],
        "Alpha": variables["PA_local_FEDU"],
        "Alpha_Eq": magnetic_field_variables["Alpha_Eq_T89"],
        "R_Eq": magnetic_field_variables["R_Eq_T89"],
        "MLT": magnetic_field_variables["MLT_Eq_T89"],
        "L_m": magnetic_field_variables["L_m_T89"],
        "L_star": magnetic_field_variables["L_star_T89"],
        "B_Calc": magnetic_field_variables["B_Calc_T89"],
        "B_Eq": magnetic_field_variables["B_Eq_T89"],
        "Position": variables["xGEO"],
        "PSD": psd_var,
    }

    if saving_strategy is None:
        save_srat_class = (
            ep.saving_strategies.DailyLEORBStrategy if satellite == "S6-MF" else ep.saving_strategies.MonthlyRBStrategy
        )

        saving_strategy = save_srat_class(
            base_data_path=Path(processed_data_path),
            mission="ESA",
            satellite=f"{satellite.lower()}",
            instrument="ngrm",
            mag_field="T89",
            file_format=".nc",
            data_standard=ep.data_standards.GFZStandard(),
        )

    ep.save(variables_to_save, saving_strategy, start_time, end_time, time_var=binned_time_var, append=True)