Skip to content

GOES

el_paso.recipes.goes.process_goes_r_mps_high.process_goes_r_mps_high

process_goes_r_mps_high

Process GOES-R MPS-HI MAGED electron data into pitch-angle resolved phase space densities.

Downloads and extracts the magnetometer (MAGN), MPS-HI energetic particle (MPSH), and ephemeris (EPHE) L2 data products for the given GOES-R satellite, bins them onto a common 5-minute time cadence, computes the local telescope pitch angles from the magnetic field direction, sorts the differential electron fluxes by ascending pitch angle, transforms the spacecraft position to GEO coordinates, computes T89 magnetic field quantities (B_Calc, B_Eq, MLT, R_Eq, Alpha_Eq, L_star, L_m, InvMu, InvK), and derives the electron phase space density. The resulting variables are saved using the requested saving strategy.

Parameters:

Name Type Description Default
sat_str Literal['goes18', 'goes19']

The GOES-R satellite to process.

required
processed_data_path str | Path

Directory where the processed output files are saved.

required
raw_data_path str | Path

Directory where the raw downloaded data files are stored.

required
start_time datetime

Start of the time interval to process.

required
end_time datetime

End of the time interval to process.

required
save_strategy Literal['gfz', 'netcdf']

Strategy used to save the processed data. "gfz" saves using the GFZ format, "netcdf" saves monthly NetCDF files. Defaults to "netcdf".

'netcdf'
num_cores int

Number of CPU cores used for the magnetic field computations. Defaults to 32.

32
Source code in el_paso/recipes/goes/process_goes_r_mps_high.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def process_goes_r_mps_high(
    sat_str: Literal["goes18", "goes19"],
    processed_data_path: str | Path,
    raw_data_path: str | Path,
    start_time: datetime,
    end_time: datetime,
    save_strategy: Literal["gfz", "netcdf"] = "netcdf",
    num_cores: int = 32,
) -> None:
    """Process GOES-R MPS-HI MAGED electron data into pitch-angle resolved phase space densities.

    Downloads and extracts the magnetometer (MAGN), MPS-HI energetic particle (MPSH), and
    ephemeris (EPHE) L2 data products for the given GOES-R satellite, bins them onto a common
    5-minute time cadence, computes the local telescope pitch angles from the magnetic field
    direction, sorts the differential electron fluxes by ascending pitch angle, transforms the
    spacecraft position to GEO coordinates, computes T89 magnetic field quantities (B_Calc, B_Eq,
    MLT, R_Eq, Alpha_Eq, L_star, L_m, InvMu, InvK), and derives the electron phase space density.
    The resulting variables are saved using the requested saving strategy.

    Args:
        sat_str (Literal["goes18", "goes19"]): The GOES-R satellite to process.
        processed_data_path (str | Path): Directory where the processed output files are saved.
        raw_data_path (str | Path): Directory where the raw downloaded data files are stored.
        start_time (datetime): Start of the time interval to process.
        end_time (datetime): End of the time interval to process.
        save_strategy (Literal["gfz", "netcdf"], optional): Strategy used to save the processed
            data. "gfz" saves using the GFZ format, "netcdf" saves monthly NetCDF files. Defaults
            to "netcdf".
        num_cores (int, optional): Number of CPU cores used for the magnetic field computations.
            Defaults to 32.
    """
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)

    data_path_stem = f"{raw_data_path}/YYYY/MM/{sat_str}/"

    magn_vars = _get_magn_variables(sat_str, data_path_stem, start_time, end_time)
    mps_vars = _get_mps_high_variables(sat_str, data_path_stem, start_time, end_time)
    ephe_vars = _get_ephe_variables(sat_str, data_path_stem, start_time, end_time)

    time_bin_methods_magn = {
        "b_brf": ep.TimeBinMethod.NanMean,
    }

    binned_time_var = ep.processing.bin_by_time(
        time_variable=magn_vars["time"],
        variables=magn_vars,
        time_bin_method_dict=time_bin_methods_magn,
        time_binning_cadence=timedelta(minutes=5),
        start_time=start_time,
        end_time=end_time,
    )

    time_bin_methods_mps = {
        "diff_flux": ep.TimeBinMethod.NanMedian,
        "diff_flux_uncert": ep.TimeBinMethod.NanMedian,
        "diff_energy": ep.TimeBinMethod.Repeat,
        "int_flux": ep.TimeBinMethod.NanMedian,
        "int_flux_uncert": ep.TimeBinMethod.NanMedian,
        "int_energy": ep.TimeBinMethod.Repeat,
    }

    _ = ep.processing.bin_by_time(
        time_variable=mps_vars["time"],
        variables=mps_vars,
        time_bin_method_dict=time_bin_methods_mps,
        time_binning_cadence=timedelta(minutes=5),
        start_time=start_time,
        end_time=end_time,
    )

    time_bin_methods_ephe = {
        "xgse": ep.TimeBinMethod.NanMean,
    }

    _ = ep.processing.bin_by_time(
        time_variable=ephe_vars["time"],
        variables=ephe_vars,
        time_bin_method_dict=time_bin_methods_ephe,
        time_binning_cadence=timedelta(minutes=5),
        start_time=start_time,
        end_time=end_time,
    )

    mps_vars["diff_flux"].transpose_data((0, 2, 1))
    mps_vars["diff_flux"].apply_thresholds_on_data(lower_threshold=0)

    # calculate xGEO
    datetimes = [datetime.fromtimestamp(t, tz=timezone.utc) for t in binned_time_var.get_data(ep.units.posixtime)]
    xgeo_data = ep.processing.magnetic_field_utils.Coords().transform(
        datetimes,
        ephe_vars["xgse"].get_data(ep.units.RE).astype(np.float64),
        ep.IRBEM_SYSAXIS_GSE,
        ep.IRBEM_SYSAXIS_GEO,
    )
    xgeo_var = ep.Variable(data=xgeo_data, original_unit=ep.units.RE)

    # calculate pitch angles
    tele_alpha_angles_var = ep.Variable(data=TELE_ALPHA_ANGLES, original_unit=u.deg)
    tele_beta_angles_var = ep.Variable(data=TELE_BETA_ANGLES, original_unit=u.deg)
    local_pa_var = ep.processing.compute_pitch_angles_for_telescopes(
        magn_vars["b_brf"],
        tele_alpha_angles_var,
        tele_beta_angles_var,
    )

    # fold pitch angles around 90 degree
    local_pa = local_pa_var.get_data(u.degree)
    local_pa_folded = np.where(local_pa > 90, local_pa - 90, local_pa)
    local_pa_var.set_data(local_pa_folded, unit=u.degree)

    # sort pitch angles in ascending order and apply to fluxes
    idx_sorted = np.argsort(local_pa_var.get_data(), axis=1)
    sorted_local_pa = np.take_along_axis(local_pa_var.get_data(), idx_sorted, axis=1)
    n_energy = mps_vars["diff_flux"].get_data().shape[1]
    sorted_diff_flux = np.take_along_axis(
        mps_vars["diff_flux"].get_data(), np.tile(idx_sorted[:, np.newaxis, :], [1, n_energy, 1]), axis=2
    )

    local_pa_var.set_data(sorted_local_pa, unit="same")
    mps_vars["diff_flux"].set_data(sorted_diff_flux, unit="same")

    # average energies over pitch angles
    diff_energy_avg = np.squeeze(np.mean(mps_vars["diff_energy"].get_data(u.MeV), axis=1))
    mps_vars["diff_energy"].set_data(diff_energy_avg, unit=u.MeV)

    # Calculate magnetic field variables
    variables_to_compute: ep.processing.VariableRequest = [
        ("B_Calc", "T89"),
        ("B_Eq", "T89"),
        ("MLT", "T89"),
        ("R_Eq", "T89"),
        ("Alpha_Eq", "T89"),
        ("L_star", "T89"),
        ("L_m", "T89"),
        ("InvMu", "T89"),
        ("InvK", "T89"),
    ]

    magnetic_field_variables = ep.processing.compute_magnetic_field_variables(
        time_var=binned_time_var,
        xgeo_var=xgeo_var,
        energy_var=mps_vars["diff_energy"],
        pa_local_var=local_pa_var,
        particle_species="electron",
        variables_to_compute=variables_to_compute,
        irbem_options=ep.processing.magnetic_field_utils.IrbemOptions(),
        num_cores=num_cores,
    )

    psd_var = ep.processing.compute_phase_space_density(
        mps_vars["diff_flux"], mps_vars["diff_energy"], particle_species="electron"
    )

    variables_to_save: dict[InternalName, ep.Variable] = {
        "Epoch": binned_time_var,
        "FEDU": mps_vars["diff_flux"],
        "Position": xgeo_var,
        "Energy_FEDU": mps_vars["diff_energy"],
        "Alpha": local_pa_var,
        "PSD": psd_var,
        "Alpha_Eq": magnetic_field_variables["Alpha_Eq_T89"],
        "MLT": magnetic_field_variables["MLT_T89"],
        "L_star": magnetic_field_variables["L_star_T89"],
        "R_Eq": magnetic_field_variables["R_Eq_T89"],
        "B_Eq": magnetic_field_variables["B_Eq_T89"],
        "B_Calc": magnetic_field_variables["B_Calc_T89"],
        "InvMu": magnetic_field_variables["InvMu_T89"],
        "InvK": magnetic_field_variables["InvK_T89"],
    }

    if save_strategy in ("gfz", "both"):
        saving_strategy = ep.saving_strategies.GFZStrategy(
            Path(processed_data_path),
            mission="GOES",
            satellite=sat_str,
            instrument="MAGED",
            mag_field="T89",
            data_standard=ep.data_standards.GFZStandard(),
        )
    if save_strategy in ("netcdf", "both"):
        saving_strategy = ep.saving_strategies.MonthlyRBStrategy(
            Path(processed_data_path),
            mission="GOES",
            satellite=sat_str,
            instrument="MAGED",
            mag_field="T89",
            file_format="nc",
            data_standard=ep.data_standards.GFZStandard(),
        )

    ep.save(variables_to_save, saving_strategy, start_time, end_time, time_var=binned_time_var, append=True)

el_paso.recipes.goes.process_goes_realtime.process_goes_real_time

process_goes_real_time

Process GOES real-time differential electron flux data into pitch-angle resolved phase space densities.

Downloads and extracts the real-time "differential-electrons-3-day" JSON product for the given GOES satellite, converts the timestamps and energy channel labels, sorts the energy channels and fluxes in ascending order, and bins the data onto a 5-minute time cadence. A fixed spacecraft position (from GEOCOORDS_DICT) and a fixed set of local pitch angles (5 to 90 degrees in 5-degree steps) are assigned, T89 magnetic field quantities (B_Calc, B_Eq, MLT, R_Eq, Alpha_Eq, L_star, L_m, InvMu, InvK) are computed, the omnidirectional flux is converted to a pitch-angle distribution, and the electron phase space density is derived. The resulting variables are saved using the requested saving strategy.

Parameters:

Name Type Description Default
sat_str Literal['primary', 'secondary']

Which GOES real-time satellite to process ("primary" corresponds to GOES19, "secondary" to GOES18).

required
processed_data_path str | Path

Directory where the processed output files are saved.

required
raw_data_path str | Path

Directory where the raw downloaded data files are stored.

required
start_time datetime

Start of the time interval to process.

required
end_time datetime

End of the time interval to process.

required
save_strategy Literal['gfz', 'netcdf', 'both']

Strategy used to save the processed data. "gfz" saves using the GFZ format, "netcdf" saves monthly NetCDF files, and "both" saves using both strategies. Defaults to "netcdf".

'netcdf'
num_cores int

Number of CPU cores used for the magnetic field computations. Defaults to 32.

32
skip_existing bool

If True, skip downloading files that already exist on disk. Defaults to True.

True
Source code in el_paso/recipes/goes/process_goes_realtime.py
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
@timed_function("process_goes_real_time")
def process_goes_real_time(
    sat_str: Literal["primary", "secondary"],
    processed_data_path: str | Path,
    raw_data_path: str | Path,
    start_time: datetime,
    end_time: datetime,
    save_strategy: Literal["gfz", "netcdf", "both"] = "netcdf",
    num_cores: int = 32,
    skip_existing: bool = True,  # noqa: FBT001, FBT002,
) -> None:
    """Process GOES real-time differential electron flux data into pitch-angle resolved phase space densities.

    Downloads and extracts the real-time "differential-electrons-3-day" JSON product for the
    given GOES satellite, converts the timestamps and energy channel labels, sorts the energy
    channels and fluxes in ascending order, and bins the data onto a 5-minute time cadence.
    A fixed spacecraft position (from `GEOCOORDS_DICT`) and a fixed set of local pitch angles
    (5 to 90 degrees in 5-degree steps) are assigned, T89 magnetic field quantities (B_Calc, B_Eq,
    MLT, R_Eq, Alpha_Eq, L_star, L_m, InvMu, InvK) are computed, the omnidirectional flux is
    converted to a pitch-angle distribution, and the electron phase space density is derived.
    The resulting variables are saved using the requested saving strategy.

    Args:
        sat_str (Literal["primary", "secondary"]): Which GOES real-time satellite to process
            ("primary" corresponds to GOES19, "secondary" to GOES18).
        processed_data_path (str | Path): Directory where the processed output files are saved.
        raw_data_path (str | Path): Directory where the raw downloaded data files are stored.
        start_time (datetime): Start of the time interval to process.
        end_time (datetime): End of the time interval to process.
        save_strategy (Literal["gfz", "netcdf", "both"], optional): Strategy used to save the
            processed data. "gfz" saves using the GFZ format, "netcdf" saves monthly NetCDF files,
            and "both" saves using both strategies. Defaults to "netcdf".
        num_cores (int, optional): Number of CPU cores used for the magnetic field computations.
            Defaults to 32.
        skip_existing (bool, optional): If True, skip downloading files that already exist on
            disk. Defaults to True.
    """
    # Part 1: specify source files to extract variables
    data_path_stem = f"{raw_data_path}/GOES/YYYY/MM/{sat_str}/"
    rename_file_name_stem = f"{sat_str}_YYYYMMDD.json"
    url = f"https://services.swpc.noaa.gov/json/goes/{sat_str}/"

    ep.download(
        start_time,
        end_time,
        save_path=data_path_stem,
        file_cadence="daily",
        download_url=url,
        file_name_stem="differential-electrons-3-day.json",
        rename_file_name_stem=rename_file_name_stem,
        skip_existing=skip_existing,
    )

    extraction_infos = [
        ep.ExtractionInfo(
            result_key="Epoch",
            name_or_column="time_tag",
            unit=u.dimensionless_unscaled,
        ),
        ep.ExtractionInfo(
            result_key="Energy",
            name_or_column="energy",
            unit=u.keV,
            is_time_dependent=False,
        ),
        ep.ExtractionInfo(
            result_key="FEDO",
            name_or_column="flux",
            unit=(u.cm**2 * u.s * u.keV) ** (-1),
            dependent_variables=["time_tag", "energy"],
        ),
        ep.ExtractionInfo(
            result_key="sat_id",
            name_or_column="satellite",
            unit=u.dimensionless_unscaled,
            is_time_dependent=False,
        ),
    ]

    variables = ep.extract_variables_from_files(
        start_time,
        end_time,
        file_cadence="daily",
        data_path=data_path_stem,
        file_name_stem=rename_file_name_stem,
        extraction_infos=extraction_infos,
    )

    sat_name = "goes" + str(variables["sat_id"].get_data()[0])
    logger.info(f"Processing satellite: {sat_name}")

    # parse time strings
    datetimes = ep.processing.convert_string_to_datetime(variables["Epoch"], time_format="%Y-%m-%dT%H:%M:%SZ")
    variables["Epoch"].set_data(np.asarray([t.timestamp() for t in datetimes]), ep.units.posixtime)

    # generated weighted energy channels
    variables["Energy"].set_data(_remove_unit_from_energy_channels(variables["Energy"].get_data()), "same")

    # Get the sorting order based on the row
    sorting_order = np.argsort(variables["Energy"].get_data())

    # Apply the sorting order to all rows
    variables["Energy"].set_data(variables["Energy"].get_data()[sorting_order], "same")
    variables["FEDO"].set_data(variables["FEDO"].get_data()[:, sorting_order], "same")
    variables["FEDO"].apply_thresholds_on_data(lower_threshold=0)

    time_bin_methods = {
        "FEDO": ep.TimeBinMethod.NanMedian,
        "Energy": ep.TimeBinMethod.Repeat,
    }

    binned_time_var = ep.processing.bin_by_time(
        time_variable=variables["Epoch"],
        variables=variables,
        time_bin_method_dict=time_bin_methods,
        time_binning_cadence=timedelta(minutes=5),
        start_time=start_time,
        end_time=end_time,
    )

    binned_datetimes = [datetime.fromtimestamp(t, tz=timezone.utc) for t in binned_time_var.get_data()]
    geo_coords = GEOCOORDS_DICT[sat_str]
    variables["xGEO"] = ep.Variable(data=np.tile(geo_coords, (len(binned_datetimes), 1)), original_unit=ep.units.RE)

    # Local pitch angles from 5 to 90 deg
    pa_local_data = np.tile(np.arange(5, 91, 5), (len(binned_time_var.get_data()), 1)).astype(np.float64)
    variables["PA_local_FEDU"] = ep.Variable(data=pa_local_data, original_unit=u.deg)

    # Calculate magnetic field variables
    variables_to_compute: ep.processing.VariableRequest = [
        ("B_Calc", "T89"),
        ("B_Eq", "T89"),
        ("MLT", "T89"),
        ("R_Eq", "T89"),
        ("Alpha_Eq", "T89"),
        ("L_star", "T89"),
        ("L_m", "T89"),
        ("InvMu", "T89"),
        ("InvK", "T89"),
    ]

    magnetic_field_variables = ep.processing.compute_magnetic_field_variables(
        time_var=binned_time_var,
        xgeo_var=variables["xGEO"],
        energy_var=variables["Energy"],
        pa_local_var=variables["PA_local_FEDU"],
        particle_species="electron",
        variables_to_compute=variables_to_compute,
        irbem_options=ep.processing.magnetic_field_utils.IrbemOptions(),
        num_cores=num_cores,
    )

    FEDU_var = ep.processing.construct_pitch_angle_distribution(
        variables["FEDO"], variables["PA_local_FEDU"], magnetic_field_variables["Alpha_Eq_T89"]
    )
    FEDU_var.apply_thresholds_on_data(lower_threshold=0)

    psd_var = ep.processing.compute_phase_space_density(FEDU_var, variables["Energy"], particle_species="electron")

    vars_to_save: dict[ep.typing.InternalName, ep.Variable] = {
        "Epoch": binned_time_var,
        "FEDU": FEDU_var,
        "Position": variables["xGEO"],
        "Energy_FEDU": variables["Energy"],
        "Alpha": variables["PA_local_FEDU"],
        "PSD": psd_var,
        "Alpha_Eq": magnetic_field_variables["Alpha_Eq_T89"],
        "MLT": magnetic_field_variables["MLT_T89"],
        "L_star": magnetic_field_variables["L_star_T89"],
        "R_Eq": magnetic_field_variables["R_Eq_T89"],
        "B_Eq": magnetic_field_variables["B_Eq_T89"],
        "B_Calc": magnetic_field_variables["B_Calc_T89"],
        "InvMu": magnetic_field_variables["InvMu_T89"],
        "InvK": magnetic_field_variables["InvK_T89"],
    }

    if save_strategy in ("gfz", "both"):
        strategy = ep.saving_strategies.GFZStrategy(
            processed_data_path,
            mission="GOES",
            satellite="goes_" + sat_str,
            instrument="mps-high",
            mag_field="T89",
            data_standard=ep.data_standards.GFZStandard(),
        )

    if save_strategy in ("netcdf", "both"):
        strategy = ep.saving_strategies.MonthlyRBStrategy(
            base_data_path=Path(processed_data_path),
            mission="GOES",
            satellite="goes_" + sat_str,
            instrument="mps-high",
            mag_field="T89",
            file_format=".nc",
            data_standard=ep.data_standards.GFZStandard(),
        )

    ep.save(vars_to_save, strategy, start_time, end_time, time_var=binned_time_var, append=True)