helper.py

as_list(x)

Converts the input x to a list if it is not already a list.

Source code in utility\helper.py
485
486
487
488
489
def as_list(x):
    """
    Converts the input x to a list if it is not already a list.
    """
    return x if isinstance(x, list) else [x]

check_exp_inputs(exp, attribute_check=False)

Helper function to validate experiment inputs.

Parameters:
  • exp (Experiment) –

    Custom object containing experiment specifications.

Raises:
  • ValueError

    If input parameters are invalid.

Source code in utility\helper.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
def check_exp_inputs(exp, attribute_check=False):
    """
    Helper function to validate experiment inputs.

    Args:
        exp (Experiment): Custom object containing experiment specifications.

    Raises:
        ValueError: If input parameters are invalid.
    """

    ## Generell check on input parameter combinations and formats
    check_exp_input_parameters(exp)

    # Check whether parameters align with current setup in calibruns - "interpolation data"
    from utility.helper_calibration import check_transmission_targets_input
    check_transmission_targets_input(exp)

    ## Model specific checks
    if 'OpenMalaria'  in exp.models_to_run:
        check_exp_inputs_openmalaria(exp)

    if 'EMOD' in exp.models_to_run:
        check_exp_inputs_emod(exp)

    if 'malariasimulation' in exp.models_to_run:
        check_exp_inputs_malariasimulation(exp)

    # Check for interventions added to the framework
    intervention_params_to_check(exp)

    # Check if any attribute in list_attributes_nGT1 is not in sweep_list
    if attribute_check:
        attributes_dict = exp.__dict__
        list_attributes_nGT1 = list()

        for key, value in attributes_dict.items():
            # Check if the attribute is a list
            if isinstance(value, list) and len(value) > 1:
                if key not in ['models_to_run', 'eir_scalar_emod', 'eir_malariasimulation',
                               'eir_scalar_malariasimulation', 'eir_openmalaria',
                               'eir_scalar_openmalaria', 'carrying_capacity_step', 'age_groups_aggregates',
                               'plots_to_run',
                               'agebins', 'sweep_list', 'season_daily', 'season_month', 'seasonal', 'perennial',
                               'analyzer_list', 'analyzer_script', 'calib_config_pointer', 'models_to_run_pickup']:
                    list_attributes_nGT1.append(key)

        list_attr_notsweep = [attr for attr in list_attributes_nGT1 if attr not in attributes_dict['sweep_list']]
        if list_attr_notsweep:
            raise ValueError( f'The following attributes have more than 1 value but are not included in sweep_list: {", ".join(list_attr_notsweep)}')

configure_calib_scenarios(exp)

Configures experiment-specific attributes based on the run mode.

Source code in utility\helper.py
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
def configure_calib_scenarios(exp):
    """
    Configures experiment-specific attributes based on the run mode.
    """
    if exp.run_mode == 'calibrun':
        print("Notice: for calibrun simulations, importation will be set to 0")
        eir_range, x_temp_range = get_calib_range()
        exp.importation_rate = 0
        exp.output_target = np.arange(1, 31)
        exp.eir_range = eir_range
        exp.x_temp_range =x_temp_range
    else :
        exp.output_target = np.arange(1, 2)
        exp.eir_range = [50]
        exp.x_temp_range = [15]

    return exp

create_scenarios_csv(exp, col_list)

Generate a scenarios DataFrame based on the provided experiment object and column list.

This function constructs a DataFrame of all possible combinations of attributes specified in col_list and maps additional experiment-specific parameters, including entomology mode, number of seeds, and calibration target inputs. It also processes specific interventions such as cc_step if present.

Parameters:
  • exp (Experiment) –

    The experiment object containing attributes and settings for scenarios.

  • col_list (list) –

    List of attribute names to include in the scenarios DataFrame. Defaults to [‘output_target’, ‘case_management’, ‘seasonality’].

Returns:
  • pandas.DataFrame: A DataFrame representing all generated scenarios, including metadata and mapped inputs.

Source code in utility\helper.py
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
def create_scenarios_csv(exp, col_list):
    """
    Generate a scenarios DataFrame based on the provided experiment object and column list.

    This function constructs a DataFrame of all possible combinations of attributes specified
    in `col_list` and maps additional experiment-specific parameters, including entomology mode,
    number of seeds, and calibration target inputs. It also processes specific interventions
    such as `cc_step` if present.

    Args:
        exp (Experiment): The experiment object containing attributes and settings for scenarios.
        col_list (list, optional): List of attribute names to include in the scenarios DataFrame.
            Defaults to ['output_target', 'case_management', 'seasonality'].

    Returns:
        pandas.DataFrame: A DataFrame representing all generated scenarios, including metadata
                          and mapped inputs.
    """

    if col_list is None:
        col_list = ['output_target', 'case_management','seasonality']

    # Get the attribute values from the experiment object
    my_attrs = [getattr(exp, attr) for attr in col_list]

    # Generate all combinations of attribute values
    df_array = list(itertools.product(*my_attrs))

    # Create a DataFrame with the combinations
    df = pd.DataFrame(df_array, columns = col_list)

    # And additional columns to keep
    df['entomology_mode'] = exp.entomology_mode # optional, same across all scenarios
    df['num_seeds'] = exp.num_seeds  # optional, same across all scenarios+seeds (total number of seeds)

    # Specifc process for clinical and severe case management (not full factorial)
    df = process_case_management(df)

    # Add calibration target inputs
    df = map_model_calib_inputs(exp, df)

    # Adjust scenarios as required for specific interventions
    from utility.helper_interventions import scenario_df_to_update
    df = scenario_df_to_update(exp, df)

    # Reset index
    df.reset_index(inplace=True)
    df['index'] = df['index'] + 1
    df['scen_id'] = df['index']

    # Move 'index' and 'scen_id' to the first two columns
    columns_order = ['index', 'scen_id'] + [col for col in df.columns if col not in ['index', 'scen_id']]
    df = df[columns_order]

    return df

create_simsetup_csv(exp)

Creates a CSV file containing simulation setup parameters. Args: exp (Experiment): Experiment object containing experiment specifications.

Source code in utility\helper.py
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
def create_simsetup_csv(exp):
    """
    Creates a CSV file containing simulation setup parameters.
    Args:
        exp (Experiment): Experiment object containing experiment specifications.
    """

    # If other parameters should be savedm these need to be added to setup_dic
    setup_dic = {'run_mode': exp.run_mode,
                 'job_directory': exp.job_directory,
                 'sim_out_dir': exp.sim_out_dir,
                 'emod_serialized_id': exp.emod_serialized_id,
                 'burnin_directory': exp.burnin_directory,
                 'nexps': exp.nexps,
                 'sweep_list': exp.sweep_list,
                 'monitoring_years': exp.monitoring_years,
                 'pop_size_emod': exp.emod_pop_size,
                 'pop_size_malariasimulation': exp.malariasimulation_pop_size,
                 'pop_size_openmalaria': exp.openmalaria_pop_size,
                 'agebins': exp.agebins,
                 'seasonality': exp.seasonality,
                 'seasonal_monthly': ';'.join(map(str, exp.seasonal)),
                 'perennial_monthly': ';'.join(map(str, exp.perennial)),
                 'seasonal_daily': ';'.join(map(str, exp.season_daily)),
                 'perennial_daily': ';'.join(map(str, exp.perennial_daily)),
                 'start_year': exp.start_year,
                 'end_year': exp.end_year,
                 'burnin_emod':exp.emod_burnin,
                 'burnin_malariasimulation': exp.malariasimulation_burnin,
                 'burnin_openmalaria': exp.openmalaria_burnin,
                 'sim_start_year_openmalaria': exp.sim_start_year_openmalaria,
                 'sim_start_year_emod': exp.sim_start_year_emod,
                 'sim_start_year_malariasimulation': exp.sim_start_year_malariasimulation,
                 'intervention_list': exp.intervention_list
                 }

    setup_df = pd.DataFrame.from_dict(setup_dic, orient='index', columns=['Value'])
    setup_df['parameter'] = setup_df.index
    save_dirs = [exp.job_directory, exp.sim_out_dir]
    for sdir in save_dirs:
        setup_df.to_csv(os.path.join(sdir, 'exp_setup_df.csv'), index=False)

exec(command)

Executes the specified command in a subprocess.

Parameters:
  • command (str) –

    The command to execute.

Returns:
  • subprocess.Popen: A Popen object representing the subprocess.

Raises:
  • ValueError

    If the command is empty or None.

Notes
  • This function runs the command in a new shell and captures both standard output and standard error.
  • The output can be accessed through the Popen object returned by this function.
Source code in utility\helper_slurm.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def exec(command):
    """
    Executes the specified command in a subprocess.

    Args:
        command (str): The command to execute.

    Returns:
        subprocess.Popen: A Popen object representing the subprocess.

    Raises:
        ValueError: If the command is empty or None.

    Notes:
        - This function runs the command in a new shell and captures both standard output
          and standard error.
        - The output can be accessed through the Popen object returned by this function.
    """

    return subprocess.Popen(command, shell=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE,
                            universal_newlines=True, text=True)

get_intervention_params(exp)

Retrieve intervention parameters for a specific experiment.

Parameters: - exp: Experiment object.

Returns: Intervention parameters based on the specified experiment.

Source code in utility\helper_simulation.py
16
17
18
19
20
21
22
23
24
25
26
27
def get_intervention_params(exp):
    """
    Retrieve intervention parameters for a specific experiment.

    Parameters:
    - exp: Experiment object.

    Returns:
    Intervention parameters based on the specified experiment.
    """
    from utility.helper_interventions import exp_params_to_update
    return exp_params_to_update(exp)

get_param_from_dataframe(df, name)

Retrieves a parameter value from a DataFrame and returns it in the correct format for storage in an experiment object (exp).

Example

exp.seasonality = get_param(exp_scen_df, ‘seasonality’, listparam=True)

Parameters:
  • df (DataFrame) –

    The DataFrame containing the parameter value.

  • name (str) –

    The name of the parameter.

  • listparam (bool) –

    Indicates whether the parameter value should be returned as a list. Defaults to False.

Returns:
  • Union[list, str]: The parameter value, either as a list or a string, based on the ‘listparam’ argument.

Source code in utility\helper.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def get_param_from_dataframe(df, name):
    """
    Retrieves a parameter value from a DataFrame and returns it in the correct format for storage in an experiment object (exp).

    Example:
        exp.seasonality = get_param(exp_scen_df, 'seasonality', listparam=True)

    Args:
        df (pandas.DataFrame): The DataFrame containing the parameter value.
        name (str): The name of the parameter.
        listparam (bool, optional): Indicates whether the parameter value should be returned as a list. Defaults to False.

    Returns:
        Union[list, str]: The parameter value, either as a list or a string, based on the 'listparam' argument.

    """

    # list of parameters expecting list format  # FIXME, better way to do?
    listparams = ['seasonality', 'sweep_list', 'case_management', 'eir', 'intervention_list', 'agebins']
    cc_params = ['eir_scalar_malariasimulation', 'eir_malariasimulation', 'eir_scalar_openmalaria', 'eir_scalar_emod',
                 'eir_openmalaria', 'eir_scalar_openmalaria', 'carrying_capacity_step']

    listparams = listparams + cc_params

    value = df[name].iat[0]

    if name in listparams:
        try:
            out = value.replace(' ', '').split(',')
            out = [float(i) if i.isdigit() else i for i in out]  # some might require int?
        except:
            out = [value]
    else:
        out = value

    return out

get_seasonal_eir(exp=None)

Generates seasonal EIR (Entomological Inoculation Rate) values.

Examples:

season_daily, season_month, seasonal, perennial = get_seasonal_eir() exp = get_seasonal_eir(exp)

Parameters:
  • exp (Experiment, default: None ) –

    Experiment object containing experiment specifications.

Returns:
  • tuple or Experiment: A tuple containing the daily, monthly, seasonal, and perennial EIR values if exp is None.

  • If exp is provided, the Experiment object with updated seasonal EIR attributes is returned.

  • If exp is None:

    • season_daily (list): Defined seasonal shape of daily EIR values.
    • season_month (list): Monthly EIR values calculated from the seasonal pattern.
    • seasonal (list): Seasonal EIR values rescaled within 0 to 1.
    • perennial (list): Perennial EIR values per month.
  • If exp is provided:

    • exp (Experiment): Experiment object with updated seasonality attributes.
Source code in utility\helper_simulation.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def get_seasonal_eir(exp=None):
    """
    Generates seasonal EIR (Entomological Inoculation Rate) values.

    Examples:
        season_daily, season_month, seasonal, perennial = get_seasonal_eir()
        exp = get_seasonal_eir(exp)

    Args:
        exp (Experiment, optional): Experiment object containing experiment specifications.

    Returns:
        tuple or Experiment: A tuple containing the daily, monthly, seasonal, and perennial EIR values if exp is None.
        If exp is provided, the Experiment object with updated seasonal EIR attributes is returned.

        If exp is None:
        - season_daily (list): Defined seasonal shape of daily EIR values.
        - season_month (list): Monthly EIR values calculated from the seasonal pattern.
        - seasonal (list): Seasonal EIR values rescaled within 0 to 1.
        - perennial (list): Perennial EIR values per month.

        If exp is provided:
        -  exp (Experiment): Experiment object with updated seasonality attributes.
    """

    # Define the seasonal setting
    # Seasonal Profile is based off of EMOD CM = 0, seasonal setting, EIR = 20
    season_month = [2.969670822, 1.841811729, 1.327101791, 0.857686237, 0.611351676, 0.48901135,
                    0.480399615, 0.725799099, 1.140032967, 2.215493305, 3.229497298, 3.844139468]

    season_daily = monthly_to_daily_EIR(season_month)

    eir_sum = sum(season_month)
    seasonal = [(x / eir_sum) for x in season_month] # rescale to sum =1
    perennial = [x / 12 for x in [1] * 12]
    perennial_daily = [x/365 for x in [1]*365]

    if exp is not None:
        exp.season_daily = season_daily
        exp.season_month = season_month
        exp.seasonal = seasonal
        exp.perennial = perennial
        exp.perennial_daily = perennial_daily
        return exp
    else:
        return season_daily, season_month, seasonal, perennial

get_simulation_time_params(exp)

Calculates simulation time parameters for EMOD, malariasimulation and OpenMalaria based on the provided arguments.

Parameters:
  • exp (Experiment) –

    Experiment object containing experiment specifications. - start_year (int): Monitoring start year (required by OpenMalaria utils). - end_year (int): Monitoring and simulation end year (required by OpenMalaria utils). - burnin (int): Duration of pre-monitoring years to run. - emod_step (str): Whether EMOD runs in one or two steps (‘None’ (one run), ‘burnin’, or ‘pickup’ (two separate runs)).

Returns:
  • exp( Experiment ) –

    Experiment object with updated simulation time parameters. - sim_start_year: Simulation start year - monitoring_years: Number of years to monitor (requried in EMOD analyzer) - sim_dur_years: Total simulation duration years

Source code in utility\helper_simulation.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def get_simulation_time_params(exp):
    """
    Calculates simulation time parameters for EMOD, malariasimulation and OpenMalaria based on the provided arguments.

    Args:
        exp (Experiment): Experiment object containing experiment specifications.
            - start_year (int): Monitoring start year (required by OpenMalaria utils).
            - end_year (int): Monitoring and simulation end year (required by OpenMalaria utils).
            - burnin (int): Duration of pre-monitoring years to run.
            - emod_step (str): Whether EMOD runs in one or two steps ('None' (one run), 'burnin', or 'pickup' (two separate runs)).

    Returns:
        exp (Experiment): Experiment object with updated simulation time parameters.
            - sim_start_year: Simulation start year
            - monitoring_years: Number of years to monitor (requried in EMOD analyzer)
            - sim_dur_years: Total simulation duration years
    """

    start_year = exp.start_year
    end_year = exp.end_year
    models_to_run = exp.models_to_run
    emod_step = exp.emod_step

    # Calculate the number of years to monitor
    monitoring_years = end_year - start_year

    if emod_step is None:
        if 'EMOD' in models_to_run:
            print(" --------| Running EMOD burnin + pickup time in one simulation run |--------")
        burnin_start_year = start_year - exp.emod_burnin
        sim_dur_years = end_year - burnin_start_year
        sim_start_year = burnin_start_year
    elif emod_step == 'burnin':
        if 'EMOD' in models_to_run:
            print(" --------| Running EMOD burnin (step 1) |--------")
        sim_dur_years = exp.emod_burnin
        burnin_start_year = start_year - exp.emod_burnin
        sim_start_year = burnin_start_year
    elif emod_step == 'pickup':
        if 'EMOD' in models_to_run:
            print(" --------| Running EMOD pickup from serialized burnin (step 2) |--------")
        sim_dur_years = end_year - start_year
        sim_start_year = start_year
    else:
        raise ValueError(f'Please specify valid emod_step, {emod_step} is not valid')

    # Update experiment object with simulation start year and duration
    exp.sim_start_year_emod = sim_start_year
    exp.sim_start_year_openmalaria = start_year - exp.openmalaria_burnin
    exp.sim_start_year_malariasimulation = start_year - exp.malariasimulation_burnin
    exp.monitoring_years = monitoring_years
    exp.sim_dur_years = sim_dur_years
    return exp

make_dirs(exp, overwrite=False)

Creates necessary directories for a simulation experiment.

Args: exp (Experiment): The experiment object containing project directory information. overwrite (bool, optional): If True, existing directories will be overwritten. Defaults to False.

Returns: exp (Experiment): The experiment object with updated directory paths.

Source code in utility\helper.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def make_dirs(exp, overwrite=False):
    """
    Creates necessary directories for a simulation experiment.

    Args:
    exp (Experiment): The experiment object containing project directory information.
    overwrite (bool, optional): If True, existing directories will be overwritten. Defaults to False.

    Returns:
    exp (Experiment): The experiment object with updated directory paths.
    """

    # Define paths based on experiment parameters
    # If custom_subdir is specified, writes directories under given name
    if exp.custom_subdir is not None:
        exp_suite_path = os.path.join(exp.custom_subdir, exp.SUITEname)
    else:
        exp_suite_path = os.path.join(exp.SUITEname)

    # Define the job directory path and simulation output directory path
    exp.suite_directory = os.path.join(exp.job_directory_manifest,
                                       exp_suite_path)  # Grouping folder for simulation exps
    exp.job_directory = os.path.join(exp.job_directory_manifest, exp_suite_path,
                                     exp.exp_name)  # stores simulation files to run
    exp.sim_out_dir = os.path.join(exp.output_directory_manifest, exp_suite_path,
                                   exp.exp_name)  # stores results csvs and figures

    # Create directories
    os.makedirs(exp.job_directory, exist_ok=overwrite)
    os.makedirs(os.path.join(exp.job_directory, 'log'), exist_ok=overwrite)
    os.makedirs(exp.sim_out_dir, exist_ok=overwrite)

    # Create subdirectories for specific simulation models included in exp.models_to_run
    if 'EMOD' in exp.models_to_run:
        os.makedirs(os.path.join(exp.sim_out_dir, 'EMOD'), exist_ok=overwrite)

    if exp.emod_step != 'burnin':
        if 'OpenMalaria' in exp.models_to_run:
            os.makedirs(os.path.join(exp.sim_out_dir, 'OpenMalaria'), exist_ok=overwrite)
        if 'malariasimulation' in exp.models_to_run:
            os.makedirs(os.path.join(exp.sim_out_dir, 'malariasimulation'), exist_ok=overwrite)
    return exp

map_model_calib_inputs(exp, df)

Maps model inputs based on output targets.

Parameters:
  • exp (Experiment) –

    The experiment object.

  • df (DataFrame) –

    The DataFrame.

Returns: pandas.DataFrame: Updated DataFrame.

Source code in utility\helper.py
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
def map_model_calib_inputs(exp, df):
    """
    Maps model inputs based on output targets.

    Args:
        exp (Experiment): The experiment object.
        df (pandas.DataFrame): The DataFrame.
    Returns:
        pandas.DataFrame: Updated DataFrame.
    """
    models_to_run = list(set(exp.models_to_run + exp.models_to_run_pickup))

    if exp.run_mode == 'calibrun' or exp.run_mode == 'test' or exp.run_mode is None:
        for model in models_to_run:
            for i, row in df.iterrows():
                value = exp.eir_range[int(row['output_target']) - 1] if model != 'EMOD' else exp.x_temp_range[int(row['output_target']) - 1]
                df.loc[i, f'model_input_{model.lower()}'] = value
    else:
        for model in models_to_run:
            if model in  ['malariasimulation', 'OpenMalaria']:
                df[f'model_input_{model}'] = output_target_to_eir(df, exp, model=model)
            if model == 'EMOD':
                if exp.entomology_mode == 'dynamic':
                    df['model_input_emod'] = output_target_to_xTemp(df, exp, model='EMOD')
                elif exp.entomology_mode == 'forced':
                    df['model_input_emod'] = output_target_to_eir(df, exp, model='EMOD')

    return df

monthly_to_daily_EIR(monthly_EIR)

Convert monthly EIR values to daily using cubic spline interpolation.

Parameters:
  • monthly_EIR (list of floats) –

    List of monthly EIRs.

Returns:
  • list of floats: List of daily EIRs.

Source code in utility\helper_simulation.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
def monthly_to_daily_EIR(monthly_EIR):
    """
    Convert monthly EIR values to daily using cubic spline interpolation.

    Args:
        monthly_EIR (list of floats): List of monthly EIRs.

    Returns:
        list of floats: List of daily EIRs.
    """

    if len(monthly_EIR) == 12:
        monthly_EIR.append((monthly_EIR[0] + monthly_EIR[-1]) / 2)
    elif len(monthly_EIR) != 13:
        raise Exception('Monthly EIR should have 12 or 13 values')
    x_monthly = np.linspace(0, 364, num=13, endpoint=True)
    x_daily = np.linspace(0, 364, num=365, endpoint=True)
    EIR = interp1d(x_monthly, monthly_EIR, kind='cubic')
    daily_EIR = EIR(x_daily)
    daily_EIR /= 30
    daily_EIR = daily_EIR.tolist()
    daily_EIR = [max(x, 0) for x in daily_EIR]

    return daily_EIR

param_variation(df, exp)

Perform parameter variation for malariasimulation simulations.

Parameters: - df: DataFrame containing data from scenarios.csv. - exp: Experiment object.

Returns: DataFrame with added column ‘malariasimulation_pv’ representing malariasimulation parameter variation values.

Source code in utility\helper_simulation.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def param_variation(df, exp):
    """
    Perform parameter variation for malariasimulation simulations.

    Parameters:
    - df: DataFrame containing data from scenarios.csv.
    - exp: Experiment object.

    Returns:
    DataFrame with added column 'malariasimulation_pv' representing malariasimulation parameter variation values.
    """
    if exp.malariasimulation_parameter_variation and 'malariasimulation' in exp.models_to_run:
        from random import sample
        if exp.num_seeds > 1000:
            print("Warning: num_seeds > 1000, therefore malariasimulation parameter variation will have repeat values")
            par_var = [(i % 1000) + 1 for i in sample(range(1, exp.num_seeds + 1), exp.num_seeds)]
            for i, row in df.iterrows():
                df.loc[i, 'malariasimulation_pv'] = int(par_var[df.loc[i, 'seed'] - 1])
        else:
            par_var = sample(range(1, 1001), exp.num_seeds)
            for i, row in df.iterrows():
                df.loc[i, 'malariasimulation_pv'] = int(par_var[df.loc[i, 'seed'] - 1])
    return df

parse_args()

Parses command-line arguments for simulation specifications.

Returns: argparse.Namespace: Parsed command-line arguments.

Source code in utility\helper.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def parse_args():
    """
    Parses command-line arguments for simulation specifications.

    Returns:
    argparse.Namespace: Parsed command-line arguments.
    """

    # Set the description for the argument parser
    description = "Simulation specifications"
    parser = argparse.ArgumentParser(description=description)

    # Add the required argument for the job directory
    parser.add_argument(
        "-d",
        "--directory",
        type=str,
        required=True,
        help="Job Directory where exp.obj is located",
    )

    # Parse the command-line arguments and return the result
    return parser.parse_args()

process_case_management(df)

Processes and splits the case management column.

Handles cases where case_management contains either a list of two values or a single value in a list.

Parameters:
  • df (DataFrame) –

    The DataFrame.

Returns:
  • pandas.DataFrame: Updated DataFrame.

Source code in utility\helper.py
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
def process_case_management(df):
    """
    Processes and splits the case management column.

    Handles cases where `case_management` contains either a list of two values
    or a single value in a list.

    Args:
        df (pandas.DataFrame): The DataFrame.

    Returns:
        pandas.DataFrame: Updated DataFrame.
    """

    for i, row in df.iterrows():
        df.loc[i, 'cm_clinical'] = pd.to_numeric(df.loc[i, 'case_management'][0], errors='coerce')
        df.loc[i, 'cm_severe'] = pd.to_numeric(df.loc[i, 'case_management'][1], errors='coerce')
    df = df.drop('case_management', axis=1)

    return df

rep_scen_df(df)

Repeats scenario data in a DataFrame based on the number of seeds.

Parameters:
  • df (DataFrame) –

    The DataFrame containing the scenario data.

Returns:
  • pandas.DataFrame: A DataFrame with repeated scenario data based on the number of seeds.

Source code in utility\helper.py
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
def rep_scen_df(df):
    """
    Repeats scenario data in a DataFrame based on the number of seeds.

    Args:
        df (pandas.DataFrame): The DataFrame containing the scenario data.

    Returns:
        pandas.DataFrame: A DataFrame with repeated scenario data based on the number of seeds.
    """

    try:
        df = df.drop(['index'], axis=1)
    except:
        pass
    rep_df = pd.DataFrame(np.repeat(df.to_numpy(), df.num_seeds[0], axis=0), columns=df.columns)
    rep_df.reset_index(inplace=True)
    rep_df['index'] = rep_df['index'] + 1
    rep_df['seed'] = rep_df.groupby('scen_id').cumcount() + 1
    return rep_df

save_exp_scen(exp, scen_df, save_dirs)

Saves the experiment and scenario data to the specified save directories. Args: exp (Experiment): Experiment object containing experiment specifications. scen_df (pandas.DataFrame): The scenario data to be saved. save_dirs (list): List of directories to save the data.

Source code in utility\helper.py
278
279
280
281
282
283
284
285
286
287
288
289
290
291
def save_exp_scen(exp, scen_df, save_dirs):
    """
    Saves the experiment and scenario data to the specified save directories.
    Args:
        exp (Experiment): Experiment object containing experiment specifications.
        scen_df (pandas.DataFrame): The scenario data to be saved.
        save_dirs (list): List of directories to save the data.
    """
    for sdir in save_dirs:
        # Save the scenario data as a CSV file
        scen_df.to_csv(os.path.join(sdir, 'scenarios.csv'), index=False)

        # Save the experiment object using pickle
        pickle.dump(exp, open(os.path.join(sdir, "exp.obj"), "wb"))

save_scen(scen_df, fname, save_dirs=None)

Saves a scenario DataFrame to a CSV file in the specified directories. Args: scen_df (pandas.DataFrame): The DataFrame containing the scenario data. fname (str): The name of the CSV file to be saved. save_dirs (list of str, optional): List of directories to save the CSV file. Defaults to None.

Source code in utility\helper.py
266
267
268
269
270
271
272
273
274
275
def save_scen(scen_df, fname, save_dirs=None):
    """
    Saves a scenario DataFrame to a CSV file in the specified directories.
    Args:
        scen_df (pandas.DataFrame): The DataFrame containing the scenario data.
        fname (str): The name of the CSV file to be saved.
        save_dirs (list of str, optional): List of directories to save the CSV file. Defaults to None.
    """
    for sdir in save_dirs:
        scen_df.to_csv(os.path.join(sdir, fname), index=False)

shell_header_quest(sh_account, time_str='6:00:00', memG=3, job_name='myjob', arrayJob=None, mem_scl=1)

Generates the SLURM shell script header for submitting jobs to a high-performance computing cluster.

Parameters:
  • sh_account (dict) –

    Dictionary containing account information for SLURM, including the account name (‘A’), partition (‘p’), and whether it is a buy-in account (‘buyin’).

  • time_str (str, default: '6:00:00' ) –

    Time limit for the job in the format ‘HH:MM:SS’. Defaults to ‘6:00:00’.

  • memG (int, default: 3 ) –

    Memory required for the job in gigabytes. Defaults to 3.

  • job_name (str, default: 'myjob' ) –

    Name of the job. Defaults to ‘myjob’.

  • arrayJob (str, default: None ) –

    Specification for array jobs. If provided, the script will include array job parameters. Defaults to None.

  • mem_scl (float, default: 1 ) –

    Memory scaling factor. Defaults to 1.

Returns:
  • str

    The SLURM shell script header, which includes job submission parameters formatted for the SLURM workload manager.

Raises:
  • OSError

    If unable to create the ‘log’ directory.

Notes
  • The function checks if the ‘log’ directory exists and creates it if it does not.
  • The partition is selected based on the job time limit if the account is not a buy-in account.
  • The generated header includes job error and output log file paths based on whether the job is an array job or a single job.
Source code in utility\helper_slurm.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def shell_header_quest(sh_account, time_str='6:00:00', memG=3, job_name='myjob', arrayJob=None, mem_scl=1):
    """
    Generates the SLURM shell script header for submitting jobs to a high-performance computing cluster.

    Args:
        sh_account (dict): Dictionary containing account information for SLURM, including the
            account name ('A'), partition ('p'), and whether it is a buy-in account ('buyin').
        time_str (str, optional): Time limit for the job in the format 'HH:MM:SS'.
            Defaults to '6:00:00'.
        memG (int, optional): Memory required for the job in gigabytes. Defaults to 3.
        job_name (str, optional): Name of the job. Defaults to 'myjob'.
        arrayJob (str, optional): Specification for array jobs. If provided, the script will
            include array job parameters. Defaults to None.
        mem_scl (float, optional): Memory scaling factor. Defaults to 1.

    Returns:
        str: The SLURM shell script header, which includes job submission parameters
            formatted for the SLURM workload manager.

    Raises:
        OSError: If unable to create the 'log' directory.

    Notes:
        - The function checks if the 'log' directory exists and creates it if it does not.
        - The partition is selected based on the job time limit if the account is not a buy-in account.
        - The generated header includes job error and output log file paths based on whether
          the job is an array job or a single job.
    """

    # Create 'log' subfolder if it doesn't exist
    if not os.path.exists('log'):
        os.makedirs(os.path.join('log'))

    # If not running on buyin account, need to select partition based on time required
    if not sh_account['buyin']:
        t = datetime.strptime(time_str, '%H:%M:%S').time().hour
        if t < 4:
            sh_account["p"] = 'short'
        if t >= 4:
            sh_account["p"] = 'normal'
        if t >= 12:
            sh_account["p"] = 'long'

    header = f'#!/bin/bash\n' \
             f'#SBATCH -A {sh_account["A"]}\n' \
             f'#SBATCH -p {sh_account["p"]}\n' \
             f'#SBATCH -t {time_str}\n' \
             f'#SBATCH -N 1\n' \
             f'#SBATCH --ntasks-per-node=1\n' \
             f'#SBATCH --mem-per-cpu={int(memG * mem_scl)}G\n' \
             f'#SBATCH --job-name="{job_name}"\n'
    if arrayJob is not None:
        array = arrayJob
        err = f'#SBATCH --error=log/{job_name}_%A_%a.err\n'
        out = f'#SBATCH --output=log/{job_name}_%A_%a.out\n'
        header = header + array + err + out
    else:
        err = f'#SBATCH --error=log/{job_name}.%j.err\n'
        out = f'#SBATCH --output=log/{job_name}.%j.out\n'
        header = header + err + out
    return header

str_to_digit(x)

Converts a string to a float if possible, otherwise returns the original string. Args: x (str): The input string. Returns: float or str: The converted float if conversion is successful, otherwise the original string.

Source code in utility\helper.py
506
507
508
509
510
511
512
513
514
515
516
517
518
def str_to_digit(x):
    """
    Converts a string to a float if possible, otherwise returns the original string.
    Args:
        x (str): The input string.
    Returns:
        float or str: The converted float if conversion is successful, otherwise the original string.
    """
    try:
        x = float(x)
    except:
        x = x
    return x

submit_run_plotters(exp)

Submits and manages the creation of shell scripts to run standardized plots based on the models specified in the experiment.

Parameters:
  • exp (Experiment) –

    The experiment object that contains attributes such as models_to_run and plots_to_run, which determine which models and plots should be processed.

Returns:
  • None

Notes
  • The function checks which models (EMOD, malariasimulation, OpenMalaria) are specified in exp.models_to_run and sets corresponding job IDs.
  • It prepares a submission script for a set of default plots (relationship, timeseries, agecurves) and also handles custom plots if specified in exp.plots_to_run.
  • Memory requirements for each plot are defined and passed to the submission script. The default is 20 GB, but this can be adjusted based on specific plot needs (e.g., ccstep requires 80 GB).
  • If exp.plots_to_run is set to 'all', all available plots will be processed.
Source code in utility\helper_slurm.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
def submit_run_plotters(exp):
    """
    Submits and manages the creation of shell scripts to run standardized plots
    based on the models specified in the experiment.

    Args:
        exp (Experiment): The experiment object that contains attributes such as
            `models_to_run` and `plots_to_run`, which determine which models and
            plots should be processed.

    Returns:
        None

    Notes:
        - The function checks which models (EMOD, malariasimulation, OpenMalaria) are
          specified in `exp.models_to_run` and sets corresponding job IDs.
        - It prepares a submission script for a set of default plots (`relationship`,
          `timeseries`, `agecurves`) and also handles custom plots if specified in
          `exp.plots_to_run`.
        - Memory requirements for each plot are defined and passed to the submission
          script. The default is 20 GB, but this can be adjusted based on specific
          plot needs (e.g., `ccstep` requires 80 GB).
        - If `exp.plots_to_run` is set to `'all'`, all available plots will be processed.
    """

    job_id_EMOD = False
    job_id_malariasimulation = False
    job_id_OpenMalaria = False
    if 'EMOD' in exp.models_to_run:
        job_id_EMOD = True
    if 'malariasimulation' in exp.models_to_run:
        job_id_malariasimulation = True
    if 'OpenMalaria' in exp.models_to_run:
        job_id_OpenMalaria = True

    plot_names_all = ['relationship', 'timeseries', 'agecurves']
    ## ['ccstep', 'smc'] custom plotter excluded, as these only apply for specific simulation experiments
    # plot_memrequests_all = {'sampleplots' : 10,'relationship' :10,'agecurves' :10 ,'ccstep' :20}

    ## Write  shell submission script for all, even if not running
    fdir = os.path.abspath(os.path.dirname(__file__))
    parent_dir = os.path.abspath(os.path.join(fdir, os.pardir))
    for plot_name in plot_names_all:
        pyscript_name = f'-m plotter.plot_{plot_name}'

        plot_memG = 20  # current default in submit_run_pyscript
        submit_run_pyscript(exp, pyscript=pyscript_name, shname=f'run_{plot_name}_plots.sh',
                            custom_args=f"--modelname {' '.join([x for x in exp.models_to_run])}",
                            job=f'{plot_name}_plots', memG=plot_memG,
                            wdir = parent_dir , write_only=True)

    ## Write and run plots specified in exp.plots_to_run
    plots_to_run = [p for p in exp.plots_to_run if not p == 'sampleplots']  # exp.plots_to_run
    if exp.plots_to_run[0] == 'all':
        plots_to_run = plot_names_all

    ## Overwriting and submitting those to run
    for plot_name in plots_to_run:
        pyscript_name = f'-m plotter.plot_{plot_name}'

        plot_memG = 20  # current default in submit_run_pyscript
        if plot_name == 'ccstep':
            plot_memG = 80  ## as specified in launch_ccstep.py, could reintroduce/use mem_scaling factor

        submit_run_pyscript(exp, pyscript=pyscript_name, shname=f'run_{plot_name}_plots.sh',
                            custom_args=f"--modelname {' '.join([x for x in exp.models_to_run])}",
                            job_id_EMOD=job_id_EMOD, job_id_malariasimulation=job_id_malariasimulation, job_id_OpenMalaria=job_id_OpenMalaria,
                            job=f'{plot_name}_plots', memG=plot_memG,   wdir = parent_dir)

submit_run_pyscript(exp, pyscript='plotter/plot_relationship.py', shname='run_relationship_plots.sh', custom_args='--modelname EMOD malariasimulation OpenMalaria', t='05:00:00', memG=20, job_id_EMOD=False, job_id_malariasimulation=False, job_id_OpenMalaria=False, job='pyjob', wdir=None, write_only=False)

Submits a job to run a specified Python script using SLURM.

Parameters:
  • exp (Experiment) –

    The experiment object containing job directory and other related information.

  • pyscript (str, default: 'plotter/plot_relationship.py' ) –

    The name of the Python script to run. Defaults to ‘plotter/plot_relationship.py’.

  • shname (str, default: 'run_relationship_plots.sh' ) –

    The name of the shell script to submit. Defaults to ‘run_relationship_plots.sh’.

  • custom_args (str, default: '--modelname EMOD malariasimulation OpenMalaria' ) –

    Custom arguments to pass to the Python script. Defaults to ‘–modelname EMOD malariasimulation OpenMalaria’.

  • t (str, default: '05:00:00' ) –

    Wall time for the job in the format ‘HH:MM:SS’. Defaults to ‘05:00:00’.

  • memG (int, default: 20 ) –

    Memory required for the job in GB. Defaults to 20.

  • job_id_EMOD (bool, default: False ) –

    If True, sets the job ID as a dependency for EMOD. Defaults to False.

  • job_id_malariasimulation (bool, default: False ) –

    If True, sets the job ID as a dependency for malariasimulation. Defaults to False.

  • job_id_OpenMalaria (bool, default: False ) –

    If True, sets the job ID as a dependency for OpenMalaria. Defaults to False.

  • job (str, default: 'pyjob' ) –

    Name of the job. Defaults to ‘pyjob’.

  • wdir (str, default: None ) –

    Location of the working directory. If None, uses the current directory.

  • write_only (bool, default: False ) –

    If True, does not submit the job but only writes the script. Defaults to False.

Returns:
  • None

Raises:
  • FileNotFoundError

    If the specified job directory or dependencies do not exist.

Notes
  • The function generates a shell script that includes the SLURM header and the command to run the specified Python script.
  • It handles job dependencies based on the provided job IDs for EMOD, malariasimulation, and OpenMalaria.
  • The script is written to the job directory and submitted to the SLURM workload manager.
  • The submitted job ID is printed to the console for tracking purposes.
Source code in utility\helper_slurm.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def submit_run_pyscript(exp, pyscript='plotter/plot_relationship.py', shname='run_relationship_plots.sh',
                        custom_args='--modelname EMOD malariasimulation OpenMalaria', t='05:00:00', memG=20, job_id_EMOD=False,
                        job_id_malariasimulation=False, job_id_OpenMalaria=False,
                        job='pyjob',  wdir=None, write_only=False):
    """
    Submits a job to run a specified Python script using SLURM.

    Args:
        exp (Experiment): The experiment object containing job directory and other related information.
        pyscript (str, optional): The name of the Python script to run. Defaults to 'plotter/plot_relationship.py'.
        shname (str, optional): The name of the shell script to submit. Defaults to 'run_relationship_plots.sh'.
        custom_args (str, optional): Custom arguments to pass to the Python script.
            Defaults to '--modelname EMOD malariasimulation OpenMalaria'.
        t (str, optional): Wall time for the job in the format 'HH:MM:SS'. Defaults to '05:00:00'.
        memG (int, optional): Memory required for the job in GB. Defaults to 20.
        job_id_EMOD (bool, optional): If True, sets the job ID as a dependency for EMOD. Defaults to False.
        job_id_malariasimulation (bool, optional): If True, sets the job ID as a dependency for malariasimulation. Defaults to False.
        job_id_OpenMalaria (bool, optional): If True, sets the job ID as a dependency for OpenMalaria. Defaults to False.
        job (str, optional): Name of the job. Defaults to 'pyjob'.
        wdir (str, optional): Location of the working directory. If None, uses the current directory.
        write_only (bool, optional): If True, does not submit the job but only writes the script. Defaults to False.

    Returns:
        None

    Raises:
        FileNotFoundError: If the specified job directory or dependencies do not exist.

    Notes:
        - The function generates a shell script that includes the SLURM header and the command
          to run the specified Python script.
        - It handles job dependencies based on the provided job IDs for EMOD, malariasimulation,
          and OpenMalaria.
        - The script is written to the job directory and submitted to the SLURM workload manager.
        - The submitted job ID is printed to the console for tracking purposes.
    """

    # Get the current working directory
    if wdir is None:
        wdir = os.path.abspath(os.path.dirname(__file__))
        wdir = os.path.abspath(os.path.join(wdir, os.pardir))

    # Generate the SLURM shell script header for the job
    header_post = shell_header_quest(exp.sh_hpc_config, t, memG, job_name=job, mem_scl=1)

    # Generate the Python command to run the script
    pycommand = f'\npython {pyscript} -d {exp.sim_out_dir} {custom_args}'

    # Write the shell script to a file
    script_path = os.path.join(exp.job_directory, shname)
    file = open(script_path, 'w')
    file.write(header_post + exp.EMOD_venv + f'\ncd {wdir}' + pycommand)
    file.close()
    dependencies = '--dependency=afterany:'
    prior_dependency = False
    # Check if job_id is provided as a string to define dependendies
    if not write_only:
        if job_id_EMOD:
            id_EMOD = open(os.path.join(exp.job_directory, 'job_id_EMODanalyze.txt')).read().strip()
            dependencies = dependencies + f'{id_EMOD}'
            prior_dependency = True
        # Check if job_id_malariasimulation is provided as a string
        if job_id_malariasimulation:
            id_malariasimulation = open(os.path.join(exp.job_directory, 'job_id_malariasimulation_analyze.txt')).read().strip()
            if prior_dependency:
                dependencies = dependencies + f',{id_malariasimulation}'
            else:
                dependencies = dependencies + f'{id_malariasimulation}'
                prior_dependency = True
        if job_id_OpenMalaria:
            id_OpenMalaria = open(os.path.join(exp.job_directory, 'job_id_OManalyze.txt')).read().strip()
            if prior_dependency:
                dependencies = dependencies + f',{id_OpenMalaria}'
            else:
                dependencies = dependencies + f'{id_OpenMalaria}'
        # Submit job with dependency
        if not job_id_EMOD and not job_id_malariasimulation and not job_id_OpenMalaria:
            # Submit job without dependency
            p = subprocess.run(['sbatch', '--parsable', script_path], stdout=subprocess.PIPE,
                               cwd=str(exp.job_directory))
        else:
            p = subprocess.run(['sbatch', '--parsable', dependencies, script_path], stdout=subprocess.PIPE,
                               cwd=str(exp.job_directory))

        # Extract the SLURM job ID from the output
        slurm_job_id = p.stdout.decode('utf-8').strip().split(';')[0]

        # Print the submitted job ID
        print(f'Submitted {shname} to run {pyscript} - job id: {slurm_job_id}')

write_txt(txtobj, path, fname)

Writes a text object to a file. Args: txtobj (str): The text object to write. path (str): The path to the directory where the file will be saved. fname (str): The filename to use for the file.

Source code in utility\helper.py
492
493
494
495
496
497
498
499
500
501
502
503
def write_txt(txtobj, path, fname):
    """
    Writes a text object to a file.
    Args:
        txtobj (str): The text object to write.
        path (str): The path to the directory where the file will be saved.
        fname (str): The filename to use for the file.

    """
    file = open(os.path.join(path, fname), 'w')
    file.write(txtobj)
    file.close()