Skip to content

Scripts

scripts.inspect_cdf_file.inspect_cdf_file

inspect_cdf_file

Prints a formatted table of metadata for all variables in a CDF file.

This function opens a CDF (Common Data Format) file, retrieves key metadata for each variable, and presents it in a clear, human-readable table. The table includes the variable name, data type, units, data shape, fill value, and a description. This is useful for quickly understanding the contents and structure of a CDF file.

Parameters:

Name Type Description Default
file_path str

The path to the CDF file.

required

Raises:

Type Description
CDFError

If the file is not a valid CDF file or an error occurs while reading it.

Source code in scripts/inspect_cdf_file.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def inspect_cdf_file(file_path: str) -> None:
    """Prints a formatted table of metadata for all variables in a CDF file.

    This function opens a CDF (Common Data Format) file, retrieves key metadata
    for each variable, and presents it in a clear, human-readable table. The table
    includes the variable name, data type, units, data shape, fill value, and a
    description. This is useful for quickly understanding the contents and
    structure of a CDF file.

    Parameters:
        file_path (str): The path to the CDF file.

    Raises:
        cdflib.CDFError: If the file is not a valid CDF file or an error occurs
                         while reading it.
    """
    cdf_file = cdflib.CDF(file_path)

    variable_names = cdf_file.cdf_info().zVariables

    var_attrs_to_print = []

    for var in variable_names:
        var_attrs_full = cdf_file.varattsget(var)
        vdr_info = cdf_file.varinq(var)
        var_data = cdf_file.varget(var)

        var_shape = var_data.shape

        units = var_attrs_full.get("UNITS", "")

        desc = var_attrs_full.get("CATDESC", "")

        fillvall = var_attrs_full.get("FILLVAL", "")

        data_type = vdr_info.Data_Type_Description

        var_attrs_to_print.append([var, data_type, units, var_shape, fillvall, desc])

    flux = cdf_file.varget("FPDU")
    energy = cdf_file.varget("HOPE_ENERGY_Ion")[0,:]

    from matplotlib import pyplot as plt
    import numpy as np
    print(flux.shape)
    plt.pcolormesh(range(flux.shape[0]), np.log10(energy), np.log10(flux[1:,5,1:]).T, cmap="jet")
    plt.colorbar()
    plt.ylim(0, np.log10(50))
    plt.show() 


    print(  # noqa: T201
        tabulate(
            var_attrs_to_print,
            headers=["Variable name", "Data Type", "Units", "Data Shape", "Fill value", "Description"],
        )
    )

scripts.submit_slurm_jobs.submit_slurm_jobs_in_chunks

submit_slurm_jobs_in_chunks

Submits HPC jobs in time-based chunks.

This function divides a specified time range into smaller intervals (daily, monthly, or yearly) and submits a separate job for each interval to an HPC cluster using the sbatch command. It assumes a job script template named job_script_template.sh exists in the same directory. The chunk start and end times are passed to the job script as command-line arguments.

Parameters:

Name Type Description Default
start_time_str str

The start of the time range, in a format parsable by dateutil.parser. Example: '2023-01-01T00:00:00'.

required
end_time_str str

The end of the time range, in a format parsable by dateutil.parser. Example: '2023-03-31T23:59:59'.

required
chunk_type ChunkType

The type of time chunk to use for job submission. Valid options are ChunkType.DAILY, ChunkType.MONTHLY, or ChunkType.YEARLY.

required

Raises:

Type Description
CalledProcessError

If an sbatch command fails to execute with a non-zero exit code.

Source code in scripts/submit_slurm_jobs.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def submit_slurm_jobs_in_chunks(
    start_time_str: str, end_time_str: str, chunk_type: ChunkType, job_script_path: str = "job_script_template.sh"
) -> None:
    """Submits HPC jobs in time-based chunks.

    This function divides a specified time range into smaller intervals (daily,
    monthly, or yearly) and submits a separate job for each interval to an HPC
    cluster using the `sbatch` command. It assumes a job script template named
    `job_script_template.sh` exists in the same directory. The chunk start and
    end times are passed to the job script as command-line arguments.

    Parameters:
        start_time_str (str): The start of the time range, in a format parsable
                              by `dateutil.parser`. Example: '2023-01-01T00:00:00'.
        end_time_str (str): The end of the time range, in a format parsable
                            by `dateutil.parser`. Example: '2023-03-31T23:59:59'.
        chunk_type (ChunkType): The type of time chunk to use for job submission.
                                Valid options are `ChunkType.DAILY`, `ChunkType.MONTHLY`,
                                or `ChunkType.YEARLY`.

    Raises:
        subprocess.CalledProcessError: If an `sbatch` command fails to execute
                                        with a non-zero exit code.
    """
    # Convert string times to datetime objects
    start_time = dateutil.parser.parse(start_time_str).replace(tzinfo=timezone.utc)
    end_time = dateutil.parser.parse(end_time_str).replace(tzinfo=timezone.utc)

    time_intervals = _get_time_intervals(start_time, end_time, chunk_type)

    for start_interval, end_interval in time_intervals:
        # Format the times for the command line arguments
        chunk_start_str = start_interval.strftime("%Y-%m-%dT%H:%M:%S")
        chunk_end_str = end_interval.strftime("%Y-%m-%dT%H:%M:%S")

        print(f"Submitting job for time range: {chunk_start_str} to {chunk_end_str}")

        # Construct the sbatch command
        command = [
            "sbatch",
            job_script_path,
            chunk_start_str,
            chunk_end_str,
        ]

        try:
            # Execute the sbatch command and check for errors
            subprocess.run(command, check=True)  # noqa: S603
        except subprocess.CalledProcessError as e:
            print(f"Error submitting job: {e}")
            break