Skip to content

General Utilities

el_paso.utils

Attributes

Classes

el_paso.utils.Hashabledict

Bases: dict[Any, Any]

A dictionary subclass that is hashable.

This class enables a dictionary to be used in sets or as keys in other dictionaries by providing a custom hash implementation based on its contents.

Source code in el_paso/utils.py
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
class Hashabledict(dict[Any, Any]):
    """A dictionary subclass that is hashable.

    This class enables a dictionary to be used in sets or as keys in other dictionaries
    by providing a custom hash implementation based on its contents.
    """

    def __hash__(self) -> int:
        """Computes a hash value for the dictionary.

        The hash is computed based on the frozensets of the dictionary's keys
        and values. This ensures that two `Hashabledict` instances with the same
        key-value pairs will have the same hash, regardless of the order of
        insertion.

        Returns:
            int: The hash value of the dictionary.
        """
        return hash((frozenset(self), frozenset(self.itervalues())))  # ty:ignore[unresolved-attribute]
Methods:
__hash__
__hash__

Computes a hash value for the dictionary.

The hash is computed based on the frozensets of the dictionary's keys and values. This ensures that two Hashabledict instances with the same key-value pairs will have the same hash, regardless of the order of insertion.

Returns:

Name Type Description
int int

The hash value of the dictionary.

Source code in el_paso/utils.py
282
283
284
285
286
287
288
289
290
291
292
293
def __hash__(self) -> int:
    """Computes a hash value for the dictionary.

    The hash is computed based on the frozensets of the dictionary's keys
    and values. This ensures that two `Hashabledict` instances with the same
    key-value pairs will have the same hash, regardless of the order of
    insertion.

    Returns:
        int: The hash value of the dictionary.
    """
    return hash((frozenset(self), frozenset(self.itervalues())))  # ty:ignore[unresolved-attribute]

Functions:

el_paso.utils.assert_n_dim

assert_n_dim

Asserts that a variable's data has a specific number of dimensions.

Raises a ValueError if the provided variable's data does not match the expected number of dimensions.

Parameters:

Name Type Description Default
var Variable

The variable instance to check.

required
n_dims int

The expected number of dimensions.

required
name_in_file str

The name of the variable, used in the error message.

required
Source code in el_paso/utils.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
def assert_n_dim(var: ep.Variable, n_dims: int, name_in_file: str) -> None:
    """Asserts that a variable's data has a specific number of dimensions.

    Raises a `ValueError` if the provided variable's data does not match the
    expected number of dimensions.

    Parameters:
        var (ep.Variable): The variable instance to check.
        n_dims (int): The expected number of dimensions.
        name_in_file (str): The name of the variable, used in the error message.
    """
    provided = var.get_data().ndim

    if provided != n_dims:
        msg = (
            f"Encountered dimension missmatch for variable with name {name_in_file}:"
            f"should be {n_dims}, got: {provided}!"
        )
        raise ValueError(msg)

el_paso.utils.datenum_to_datetime

datenum_to_datetime

Converts a MATLAB datenum value to a timezone-aware datetime object.

This function leverages pandas to convert the datenum (days since year 0) into a UTC-aware datetime object.

Parameters:

Name Type Description Default
datenum_val float

The MATLAB datenum value.

required

Returns:

Name Type Description
datetime datetime

The converted datetime object with UTC timezone.

Source code in el_paso/utils.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
def datenum_to_datetime(datenum_val: float) -> datetime:
    """Converts a MATLAB datenum value to a timezone-aware datetime object.

    This function leverages pandas to convert the datenum (days since year 0)
    into a UTC-aware datetime object.

    Parameters:
        datenum_val (float): The MATLAB datenum value.

    Returns:
        datetime: The converted datetime object with UTC timezone.
    """
    return (
        pd.to_datetime(datenum_val - 719529, unit="D", origin=pd.Timestamp("1970-01-01"))
        .to_pydatetime()
        .replace(tzinfo=timezone.utc)
    )

el_paso.utils.datetime_to_datenum

datetime_to_datenum

Converts a datetime object to a MATLAB datenum value.

This function calculates the datenum value, which represents the number of days since year 0, including a fractional component for the time of day.

Parameters:

Name Type Description Default
datetime_val datetime

The datetime object to convert.

required

Returns:

Name Type Description
float float

The corresponding MATLAB datenum value.

Source code in el_paso/utils.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def datetime_to_datenum(datetime_val: datetime) -> float:
    """Converts a datetime object to a MATLAB datenum value.

    This function calculates the datenum value, which represents the number of days
    since year 0, including a fractional component for the time of day.

    Parameters:
        datetime_val (datetime): The datetime object to convert.

    Returns:
        float: The corresponding MATLAB datenum value.
    """
    mdn = datetime_val + timedelta(days=366)
    dt = datetime(datetime_val.year, datetime_val.month, datetime_val.day, 0, 0, 0, tzinfo=timezone.utc)
    frac = (datetime_val - dt).seconds / (24.0 * 60.0 * 60.0)

    return mdn.toordinal() + round(frac, 6)

el_paso.utils.enforce_utc_timezone

enforce_utc_timezone

Ensures a datetime object has UTC timezone information.

If the provided datetime object is naive (lacks timezone info), it is assigned the UTC timezone. If it already has a timezone, it is returned unchanged.

Parameters:

Name Type Description Default
time datetime

The datetime object to process.

required

Returns:

Name Type Description
datetime datetime

The datetime object with timezone.utc assigned.

Source code in el_paso/utils.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
def enforce_utc_timezone(time: datetime) -> datetime:
    """Ensures a datetime object has UTC timezone information.

    If the provided datetime object is naive (lacks timezone info), it is assigned
    the UTC timezone. If it already has a timezone, it is returned unchanged.

    Parameters:
        time (datetime): The datetime object to process.

    Returns:
        datetime: The datetime object with `timezone.utc` assigned.
    """
    if time.tzinfo is None:
        time = time.replace(tzinfo=timezone.utc)
    return time

el_paso.utils.extract_version

extract_version

Extracts the version string from a file name.

The function looks for a version string pattern _v* (e.g., '_v1.2.3' or '_v1_2-3') located just before the file extension. It returns the base file name and a parsed version object. If no version is found, it returns the original file name and a default version '0'.

Parameters:

Name Type Description Default
file_name str | Path

The name or path of the file.

required

Returns:

Type Description
tuple[str, Version]

tuple[str, version_pkg.Version]: A tuple containing: - The base file name without the version string. - The parsed version object (packaging.version.Version).

Source code in el_paso/utils.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def extract_version(file_name: str | Path) -> tuple[str, version_pkg.Version]:
    """Extracts the version string from a file name.

    The function looks for a version string pattern `_v*` (e.g., '_v1.2.3' or '_v1_2-3')
    located just before the file extension. It returns the base file name and a
    parsed version object. If no version is found, it returns the original file name
    and a default version '0'.

    Parameters:
        file_name (str | Path): The name or path of the file.

    Returns:
        tuple[str, version_pkg.Version]: A tuple containing:
            - The base file name without the version string.
            - The parsed version object (`packaging.version.Version`).
    """
    # convert to str in case of Path object
    file_name = str(file_name)

    # Regular expression to find the version part (_v* or _v*.*-*.*) before the file extension
    match = re.search(r"_(v[\d._-]+)(?=\.\w+$)", file_name)
    if match:
        base_name = file_name[: match.start()]
        ver_str = match.group(1)
        # Normalize the version string by replacing separators with dots
        normalized_ver_str = re.sub(r"[_-]", ".", ver_str.replace("v", ""))
        return base_name, version_pkg.parse(normalized_ver_str)
    return file_name, version_pkg.parse("0")

el_paso.utils.fill_str_template_with_time

fill_str_template_with_time

Fills a string template with time-based placeholders.

This function replaces common time-based placeholders in a string with the corresponding values from a datetime object. The placeholders are case-sensitive.

Parameters:

Name Type Description Default
input_str str

The input string containing placeholders like 'yyyymmdd', 'YYYYMMDD', 'YYYY', 'MM', and 'DD'.

required
time datetime

The datetime object to use for filling the template.

required

Returns:

Name Type Description
str str

The string with all placeholders replaced by their time values.

Source code in el_paso/utils.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def fill_str_template_with_time(input_str: str, time: datetime) -> str:
    """Fills a string template with time-based placeholders.

    This function replaces common time-based placeholders in a string with
    the corresponding values from a `datetime` object. The placeholders
    are case-sensitive.

    Parameters:
        input_str (str): The input string containing placeholders like 'yyyymmdd', 'YYYYMMDD',
                         'YYYY', 'MM', and 'DD'.
        time (datetime): The datetime object to use for filling the template.

    Returns:
        str: The string with all placeholders replaced by their time values.
    """
    yyyymmdd_str = time.strftime("%Y%m%d")
    yyyy_str = time.strftime("%Y")
    mm_str = time.strftime("%m")
    dd_str = time.strftime("%d")

    return (
        input_str.replace("yyyymmdd", yyyymmdd_str)
        .replace("YYYYMMDD", yyyymmdd_str)
        .replace("YYYY", yyyy_str)
        .replace("MM", mm_str)
        .replace("DD", dd_str)
    )

el_paso.utils.get_file_by_version

get_file_by_version

Filters a list of file paths to find a specific version or the latest one.

If a specific version string (e.g., 'v1.2.3') is provided, the function returns the file that matches exactly. If the version parameter is 'latest', it returns the file with the highest version number among all provided file paths.

Parameters:

Name Type Description Default
file_paths Iterable[T]

An iterable of file paths (as strings or Path objects).

required
version str

The specific version string to match (e.g., 'v1.2.3') or 'latest' to retrieve the most recent version.

required

Returns:

Type Description
T | None

T | None: The file path that matches the criteria, or None if no matching file is found.

Source code in el_paso/utils.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
def get_file_by_version(file_paths: Iterable[T], version: str) -> T | None:
    """Filters a list of file paths to find a specific version or the latest one.

    If a specific version string (e.g., 'v1.2.3') is provided, the function returns
    the file that matches exactly. If the `version` parameter is 'latest', it
    returns the file with the highest version number among all provided file paths.

    Parameters:
        file_paths (Iterable[T]): An iterable of file paths (as strings or `Path` objects).
        version (str): The specific version string to match (e.g., 'v1.2.3') or 'latest'
                       to retrieve the most recent version.

    Returns:
        T | None: The file path that matches the criteria, or `None` if no matching
                  file is found.
    """
    latest_file = None

    if version != "latest":
        normalized_version = re.sub(r"[_-]", ".", version.replace("v", ""))
        target_version = version_pkg.parse(normalized_version)
    else:
        target_version = None

    for file in file_paths:
        _, ver_obj = extract_version(file)

        # Check if the current file matches the target version if specified
        if target_version and ver_obj == target_version:
            return file

        # If no specific version is targeted, find the highest version
        if latest_file is None or ver_obj > extract_version(latest_file)[1]:
            latest_file = file

    # Extract the file names from the dictionary
    return latest_file

el_paso.utils.load_cdf_data

load_cdf_data

Load all zVariables from an existing CDF file.

Source code in el_paso/utils.py
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
def load_cdf_data(file_path: Path) -> dict[StandardName, Any]:
    """Load all zVariables from an existing CDF file."""
    loaded_data: dict[StandardName, Any] = {"metadata": {}}
    cdf_file = cdflib.CDF(str(file_path))
    try:
        info = cdf_file.cdf_info()
        z_variables = getattr(info, "zVariables", None)
        if z_variables is None and isinstance(info, dict):
            z_variables = info.get("zVariables", [])  # ty:ignore[no-matching-overload]

        for variable_name in z_variables or []:
            try:
                loaded_data[variable_name] = np.asarray(cdf_file.varget(variable_name))
            except ValueError as exc:
                if "No records found" not in str(exc):
                    raise
                logger.warning(f"Skipping empty CDF variable {variable_name} in {file_path.name}")
                continue

            try:
                loaded_data["metadata"][variable_name] = cdf_file.varattsget(variable_name)
            except Exception:  # noqa: BLE001
                loaded_data["metadata"][variable_name] = {}
    finally:
        close = getattr(cdf_file, "close", None)
        if close is not None:
            close()

    return loaded_data

el_paso.utils.load_h5_data

load_h5_data

Load all datasets and dataset attributes from an HDF5 file.

Source code in el_paso/utils.py
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
def load_h5_data(file_path: Path) -> dict[StandardName, Any]:
    """Load all datasets and dataset attributes from an HDF5 file."""
    loaded_data: dict[StandardName, Any] = {"metadata": {}}

    def _recursively_load_datasets(group: h5py.Group | h5py.File, prefix: str = "") -> None:
        for key, item in group.items():
            full_path = f"{prefix}{key}" if prefix else key
            if isinstance(item, h5py.Dataset):
                loaded_data[full_path] = np.array(item)  # ty:ignore[invalid-assignment]
                loaded_data["metadata"][full_path] = dict(item.attrs.items())
            elif isinstance(item, h5py.Group):
                _recursively_load_datasets(item, f"{full_path}/")

    with h5py.File(file_path, "r") as file:
        _recursively_load_datasets(file)

    return loaded_data

el_paso.utils.load_mat_data

load_mat_data

Load an existing MATLAB file.

Source code in el_paso/utils.py
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
def load_mat_data(file_path: Path) -> dict[StandardName, Any]:
    """Load an existing MATLAB file."""
    loaded = loadmat(str(file_path), simplify_cells=True)
    data: dict[StandardName, Any] = {key: value for key, value in loaded.items() if not key.startswith("__")}

    if "metadata" in data and isinstance(data["metadata"], dict):
        for var_key, attrs in data["metadata"].items():
            if not isinstance(attrs, dict):
                continue
            data["metadata"][var_key] = {
                k: v.item()
                if isinstance(v, np.ndarray) and v.ndim == 0
                else v.tolist()
                if isinstance(v, np.ndarray) and v.size != 0
                else ""
                if isinstance(v, np.ndarray) and v.size == 0
                else v
                for k, v in attrs.items()
            }

    return data

el_paso.utils.load_netcdf_data

load_netcdf_data

Load all variables and variable metadata from a NetCDF file.

Source code in el_paso/utils.py
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
def load_netcdf_data(file_path: Path) -> dict[StandardName, Any]:
    """Load all variables and variable metadata from a NetCDF file."""
    loaded_data: dict[StandardName, Any] = {"metadata": {}}

    def _recursively_load(group: nC.Group | nC.Dataset, prefix: str = "") -> None:
        for var_name, variable in group.variables.items():
            full_path = f"{prefix}{var_name}" if prefix else var_name
            loaded_data[full_path] = np.array(variable[:])  # ty:ignore[invalid-assignment]
            loaded_data["metadata"][full_path] = {
                "unit": getattr(variable, "units", "unknown"),
                "source_files": getattr(variable, "source", "unknown"),
                "processing_notes": getattr(variable, "history", "unknown"),
                "description": getattr(variable, "description", "unknown"),
                "original_cadence_seconds": getattr(variable, "original_cadence_seconds", "unknown"),
                "standard_name": getattr(variable, "standard_name", "unknown"),
            }

        for group_name, subgroup in group.groups.items():
            _recursively_load(subgroup, f"{prefix}{group_name}/")

    if not file_path.exists():
        logger.error(f"File not found: {file_path}")
        return {}

    with nC.Dataset(file_path, "r", format="NETCDF4") as file:
        _recursively_load(file)

    return loaded_data

el_paso.utils.make_dict_hashable

make_dict_hashable

Converts a standard dictionary into a hashable one.

If the input is None, it is returned as is. Otherwise, a new Hashabledict instance is created and returned.

Parameters:

Name Type Description Default
dict_input dict | None

The dictionary to convert.

required

Returns:

Type Description
Hashabledict | None

Hashabledict | None: The new hashable dictionary, or None if the input was None.

Source code in el_paso/utils.py
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
def make_dict_hashable(dict_input: dict[Any, Any] | None) -> Hashabledict | None:
    """Converts a standard dictionary into a hashable one.

    If the input is `None`, it is returned as is. Otherwise, a new `Hashabledict`
    instance is created and returned.

    Parameters:
        dict_input (dict | None): The dictionary to convert.

    Returns:
        Hashabledict | None: The new hashable dictionary, or `None` if the input was `None`.
    """
    if dict_input is None:
        return dict_input

    return Hashabledict(dict_input)

el_paso.utils.normalize_file_format

normalize_file_format

Return a normalized file extension for the requested monthly format.

Source code in el_paso/utils.py
417
418
419
420
421
422
423
424
425
426
427
def normalize_file_format(file_format: str) -> str:
    """Return a normalized file extension for the requested monthly format."""
    normalized = file_format.lower()
    if not normalized.startswith("."):
        normalized = f".{normalized}"

    if normalized not in {".nc", ".cdf", ".h5", ".mat"}:
        msg = "MonthlyRBStrategy supports only 'nc', 'cdf', 'h5', and 'mat' formats."
        raise ValueError(msg)

    return normalized

el_paso.utils.show_process_bar_for_map_async

show_process_bar_for_map_async

Displays a progress bar for a multiprocessing.pool.MapResult object.

This function creates a tqdm progress bar that tracks the completion of a parallel map operation. It polls the MapResult's internal state to update the progress bar until the operation is complete.

Parameters:

Name Type Description Default
map_result MapResult

The result object from Pool.map_async().

required
chunksize int

The chunk size used in the map_async call.

required
Source code in el_paso/utils.py
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
def show_process_bar_for_map_async(map_result: MapResult[Any], chunksize: int) -> None:
    """Displays a progress bar for a `multiprocessing.pool.MapResult` object.

    This function creates a `tqdm` progress bar that tracks the completion of
    a parallel map operation. It polls the `MapResult`'s internal state to
    update the progress bar until the operation is complete.

    Parameters:
        map_result (MapResult): The result object from `Pool.map_async()`.
        chunksize (int): The chunk size used in the `map_async` call.
    """
    init = cast("int", map_result._number_left) * chunksize  # ty:ignore[unresolved-attribute]
    with tqdm.tqdm(total=init) as t:
        while True:
            if map_result.ready():
                break
            t.n = init - map_result._number_left * chunksize  # ty:ignore[unresolved-attribute]
            t.refresh()
            time.sleep(1)

el_paso.utils.timed_function

timed_function

A decorator that logs the execution time of a function.

This decorator measures the time it takes for a decorated function to execute and logs the result to a logger at the INFO level. The log message can be prefixed with an optional function name.

Parameters:

Name Type Description Default
func_name str | None

An optional name to use in the log message. If None, a generic message is used.

None

Returns:

Name Type Description
Callable Callable[[Callable[P, R]], Callable[P, R]]

A decorator that wraps the target function with timing logic.

Source code in el_paso/utils.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def timed_function(func_name: str | None = None) -> Callable[[Callable[P, R]], Callable[P, R]]:
    """A decorator that logs the execution time of a function.

    This decorator measures the time it takes for a decorated function to execute
    and logs the result to a logger at the INFO level. The log message can be
    prefixed with an optional function name.

    Parameters:
        func_name (str | None): An optional name to use in the log message. If `None`,
                                a generic message is used.

    Returns:
        Callable: A decorator that wraps the target function with timing logic.
    """

    def timed_function_(f: Callable[P, R]) -> Callable[P, R]:
        @wraps(f)
        def wrap(*args: P.args, **kwargs: P.kwargs) -> R:
            tic = timeit.default_timer()
            result = f(*args, **kwargs)
            toc = timeit.default_timer()
            name = func_name or f"{f.__name__}"  # ty:ignore[unresolved-attribute]
            log = logging.getLogger(f.__module__)
            log.info(f"{name} finished in {toc - tic:0.3f} seconds", stacklevel=2)

            return result

        return wrap

    return timed_function_

el_paso.utils.write_cdf_file

write_cdf_file

Write a CDF file, resolving standard variable paths and embedding metadata.

Source code in el_paso/utils.py
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
def write_cdf_file(file_path: Path, data_dict: DataDict, data_standard: DataStandard) -> None:
    """Write a CDF file, resolving standard variable paths and embedding metadata."""
    try:
        cdf_file = cdflib.cdfwrite.CDF(str(file_path), delete=True)
        try:
            for internal_name, var_data in data_dict.items():
                if internal_name == "metadata":
                    continue

                if getattr(var_data, "size", 0) == 0:
                    logger.warning(f"Skipping empty variable {internal_name}")
                    continue

                # Resolve the canonical name via the data standard, matching H5/NC behaviour.
                # CDF does not support '/' in variable names, so we replace path separators
                # with '__' to preserve hierarchy information without violating the spec.
                path = data_standard.get_standard_name(internal_name)
                cdf_var_name = path
                value_to_write = var_data
                if isinstance(var_data, np.ndarray) and var_data.ndim == 2 and var_data.shape[1] == 1:
                    value_to_write = var_data.reshape(-1)

                var_data_array = np.asarray(value_to_write)
                if np.issubdtype(var_data_array.dtype, np.integer):
                    if var_data_array.dtype == np.int8:
                        cdf_dtype = cdflib.cdfwrite.CDF.CDF_INT1
                    elif var_data_array.dtype == np.int16:
                        cdf_dtype = cdflib.cdfwrite.CDF.CDF_INT2
                    elif var_data_array.dtype == np.int32:
                        cdf_dtype = cdflib.cdfwrite.CDF.CDF_INT4
                    else:
                        cdf_dtype = cdflib.cdfwrite.CDF.CDF_INT8
                elif np.issubdtype(var_data_array.dtype, np.floating):
                    cdf_dtype = (
                        cdflib.cdfwrite.CDF.CDF_FLOAT
                        if var_data_array.dtype == np.float32
                        else cdflib.cdfwrite.CDF.CDF_DOUBLE
                    )
                else:
                    var_data_array = var_data_array.astype(np.float64)
                    cdf_dtype = cdflib.cdfwrite.CDF.CDF_DOUBLE

                var_spec: dict[str, Any] = {
                    "Variable": cdf_var_name,
                    "Data_Type": cdf_dtype,
                    "Num_Elements": 1,
                    "Rec_Vary": True,
                    "Dim_Sizes": (list(var_data_array.shape[1:]) if var_data_array.ndim > 1 else []),
                }

                metadata_dict = data_dict.get("metadata", {})
                metadata: dict[str, Any] = {}
                if isinstance(metadata_dict, dict):
                    metadata = metadata_dict.get(path, metadata_dict.get(internal_name, {}))

                var_attrs = {}
                if isinstance(metadata, dict):
                    for attr_name, attr_value in metadata.items():
                        if _is_empty_cdf_attribute(attr_value):
                            logger.debug(f"Skipping empty CDF attribute {cdf_var_name}:{attr_name}")
                            continue
                        var_attrs[str(attr_name)] = attr_value
                if isinstance(metadata, dict):
                    for field, nc_key in {
                        "unit": "unit",
                        "source_files": "source_files",
                        "processing_notes": "processing_notes",
                        "description": "description",
                        "original_cadence_seconds": "original_cadence_seconds",
                        "standard_name": "standard_name",
                    }.items():
                        value = metadata.get(nc_key)
                        if value is None or _is_empty_cdf_attribute(value):
                            var_attrs.setdefault(nc_key, "empty")
                            continue
                        if value and not _is_empty_cdf_attribute(value):
                            var_attrs.setdefault(field, value)

                var_attrs["Compress"] = 6

                cdf_file.write_var(var_spec, var_attrs=var_attrs, var_data=var_data_array)
        finally:
            cdf_file.close()
    except Exception as e:
        msg = f"Failed to write CDF file {file_path}: {e}"
        logger.exception(msg)
        raise RuntimeError(msg) from e

el_paso.utils.write_h5_file

write_h5_file

Write an HDF5 file with hierarchical groups from slash-delimited paths.

Source code in el_paso/utils.py
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
def write_h5_file(file_path: Path, data_dict: SavedDataDict, data_standard: DataStandard) -> None:
    """Write an HDF5 file with hierarchical groups from slash-delimited paths."""
    with h5py.File(file_path, "w") as file:
        for internal_name, value in data_dict.items():
            if internal_name == "metadata":
                continue
            path = data_standard.get_standard_name(internal_name)

            path_parts = path.split("/")
            groups = path_parts[:-1]
            dataset_name = path_parts[-1]

            curr_hierarchy = file
            for group in groups:
                if group not in curr_hierarchy:
                    curr_hierarchy = curr_hierarchy.create_group(group)
                else:
                    curr_hierarchy = cast("h5py.Group", curr_hierarchy[group])

            # Normalize 2D arrays with shape (n, 1) back to 1D for consistency with other formats
            value_to_write = value
            if isinstance(value, np.ndarray) and value.ndim == 2 and value.shape[1] == 1:
                value_to_write = value.reshape(-1)

            data_set = curr_hierarchy.create_dataset(
                dataset_name, data=value_to_write, compression="gzip", shuffle=True
            )

            metadata_dict = data_dict.get("metadata", {}).get(internal_name, {})
            if not isinstance(metadata_dict, dict):
                continue

            for key, metadata in metadata_dict.items():
                if getattr(metadata, "size", None) == 0:
                    continue
                data_set.attrs[key] = metadata

el_paso.utils.write_mat_file

write_mat_file

Write a MATLAB file, resolving standard variable paths and flattening hierarchy.

Data variables are stored under their flattened canonical names (/__). Per-variable metadata is stored in a parallel metadata struct whose field names mirror the data variable names, matching how HDF5 stores attrs per dataset.

Source code in el_paso/utils.py
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
def write_mat_file(file_path: Path, data_dict: DataDict, data_standard: DataStandard) -> None:
    """Write a MATLAB file, resolving standard variable paths and flattening hierarchy.

    Data variables are stored under their flattened canonical names (``/`` → ``__``).
    Per-variable metadata is stored in a parallel ``metadata`` struct whose field
    names mirror the data variable names, matching how HDF5 stores attrs per dataset.
    """
    mat_dict: dict[str, Any] = {}
    mat_metadata: dict[str, Any] = {}

    for internal_name, value in data_dict.items():
        if internal_name == "metadata":
            continue

        path = data_standard.get_standard_name(internal_name)
        mat_var_name = path.replace("/", "__")

        value_to_write = value
        if isinstance(value, np.ndarray) and value.ndim == 2 and value.shape[1] == 1:
            value_to_write = value.reshape(-1)

        mat_dict[mat_var_name] = value_to_write

        # Attach per-variable metadata under a matching key in the metadata struct,
        # mirroring how _write_h5_file stores attrs on each dataset.
        variable_meta = data_dict.get("metadata", {}).get(internal_name, {})
        if isinstance(variable_meta, dict) and variable_meta:
            mat_metadata[mat_var_name] = {
                "unit": variable_meta.get("unit", "unknown"),
                "source_files": variable_meta.get("source_files", "unknown"),
                "processing_notes": variable_meta.get("processing_notes", "unknown"),
                "description": variable_meta.get("description", "unknown"),
                "original_cadence_seconds": variable_meta.get("original_cadence_seconds", "unknown"),
                "standard_name": variable_meta.get("standard_name", "unknown"),
            }

    if mat_metadata:
        mat_dict["metadata"] = mat_metadata

    savemat(str(file_path), mat_dict)

el_paso.utils.write_netcdf_file

write_netcdf_file

Create and write a NetCDF file from a data dictionary.

Source code in el_paso/utils.py
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
def write_netcdf_file(file_path: Path, data_dict: DataDict, data_standard: DataStandard) -> None:
    """Create and write a NetCDF file from a data dictionary."""
    with nC.Dataset(file_path, "w", format="NETCDF4") as file:
        size_time = np.asarray(data_dict["Epoch"]).shape[0]
        if size_time == 0:
            logger.info(f"Skipping write for {file_path.name} (time has length 0).")
            return

        dimensions = _calculate_dimensions(data_dict, data_standard)
        for dim_name, dim_size in dimensions.items():
            if dim_name == "Epoch":
                # we create the time dimension as unilimited to allow for append later on
                file.createDimension(dim_name, size=None)
            else:
                file.createDimension(dim_name, dim_size)

        _write_data_to_netcdf_file(file, data_dict, data_standard)