class FiveDayAgebinAnalyzer(IAnalyzer):
"""
A class inherited from idmtools IAnalyzer and modified for analyzing EMOD summary reports to generate
and save a results dataframe for defined age groups over time (5day and years).
We have a 5 Day Agebin Analyzer to accommodate OpenMalaria, which has a minimum reporting period of 5 days.
Args:
expt_name (str): Name of the experiment.
sweep_variables (list): List of sweep variables. Defaults to None.
working_dir (str): Working directory for the analysis. Defaults to './'.
start_year (int): Start year of the simulation experiment. Defaults to 1920.
end_year (int): End year of the simulation experiment. Defaults to 2020.
age_groups_aggregates (list): List of age group aggregates. Defaults to None.
burnin (None or int): Number of burn-in iterations. Defaults to None.
cc (bool): Flag for change in conditions. Defaults to False.
Saves:
mmmpy_5day.csv (or mmmpy_ccstep_daily.csv if cc is True)
Returns:
None
"""
def __init__(self, expt_name, sweep_variables=None, working_dir='./', start_year=1920,
end_year=2020, age_groups_aggregates=None, burnin=None, intervention_analyzer_columns = []):
super(FiveDayAgebinAnalyzer, self).__init__(working_dir=working_dir,
filenames=[f"output/MalariaSummaryReport_FiveDaily_{x}.json"
for x in range(start_year, end_year)]
)
self.sweep_variables = sweep_variables or ["Run_Number"]
self.expt_name = expt_name
self.start_year = start_year
self.end_year = end_year
self.age_groups_aggregates = age_groups_aggregates or [[0, 0.5], [0.5, 1], [1, 2], [2, 5], [5, 10], [10, 15],
[15, 20], [20, 100], [0, 5], [0, 100]]
self.emod_burnin = burnin
self.intervention_analyzer_columns = intervention_analyzer_columns
def map(self, data, simulation: Simulation):
"""
Process the simulation data and map it to a DataFrame.
Args:
data (dict): Dictionary containing simulation data.
simulation (Simulation): The simulation instance for extracting tags.
Returns:
pd.DataFrame: Mapped DataFrame containing simulation data for age bins.
"""
adf = pd.DataFrame()
# Loop over summary reports (separate ones for each year)
for year, fname in zip(range(self.start_year, self.end_year), self.filenames):
age_bins = data[fname]['Metadata']['Age Bins']
pfpr2to10 = data[fname]['DataByTime']['PfPR_2to10-True'][:73]
# timestep = data[fname]['DataByTime']['Time Of Report'][:73]
for age in list(range(0, len(age_bins))):
# Extract data per agebin
d = data[fname]['DataByTimeAndAgeBins']['PfPR by Age Bin-True'][:73]
pfpr = [x[age] for x in d]
d = data[fname]['DataByTimeAndAgeBins']['New Infections by Age Bin'][:73]
new_infect = [x[age] for x in d]
d = data[fname]['DataByTimeAndAgeBins']['Annual Clinical Incidence by Age Bin'][:73]
clinical_cases = [x[age] for x in d]
d = data[fname]['DataByTimeAndAgeBins']['Annual Severe Incidence by Age Bin'][:73]
severe_cases = [x[age] for x in d]
d = data[fname]['DataByTimeAndAgeBins']['Average Population by Age Bin'][:73]
pop = [x[age] for x in d]
# Combine extracted data into a dataframe
simdata = pd.DataFrame({'day': [t * 5 for t in range(1, 74)],
'prevalence': pfpr,
'nInfect': new_infect,
'clinical_incidence': clinical_cases, # per person per year
'severe_incidence': severe_cases, # per person per year
'nHost': pop})
simdata['nUncomp'] = [x * (365 / 5) for x in simdata['clinical_incidence']] # per person per 5 days
simdata['nSevere'] = [x * (365 / 5) for x in simdata['severe_incidence']] # per person per 5 days
simdata['year'] = year
simdata['agebin'] = age_bins[age]
simdata['prevalence_2to10'] = pfpr2to10
adf = pd.concat([adf, simdata])
# Add varying parameter values that describe and identify the simulation scenarios
for sweep_var in self.sweep_variables:
if sweep_var in simulation.tags.keys():
try:
adf[sweep_var] = simulation.tags[sweep_var]
except:
adf[sweep_var] = '-'.join([str(x) for x in simulation.tags[sweep_var]])
return adf
def reduce(self, all_data):
"""
Aggregate and process all simulation data.
Args:
all_data (dict): Dictionary containing data from multiple simulations.
Returns:
None: If no data is returned or if processing is complete.
"""
selected = [data for sim, data in all_data.items()]
if len(selected) == 0:
print("\nWarning: No data have been returned... Exiting...")
return
df = pd.concat(selected).reset_index(drop=True)
print(f'\nSaving outputs to: {os.path.join(self.working_dir, "EMOD")}')
if self.emod_burnin is not None:
df = df[df['year'] >= self.start_year + self.emod_burnin]
df_pfpr2to10 = df.groupby(self.sweep_variables + ['year', 'day'])[['prevalence_2to10']].agg('mean').reset_index()
df['nPatent'] = df['prevalence'] * df['nHost'] ## total patent infections per annum
df['nUncomp'] = df['clinical_incidence'] * (df['nHost'] / (365 / 5)) ## total cases per annum
df['nSevere'] = df['severe_incidence'] * (df['nHost'] / (365 / 5)) ## total cases per annum
cdf = pd.DataFrame()
# Loop over the age groups to aggregate agebins to the defined groups
for i in range(0, len(self.age_groups_aggregates)):
ages = self.age_groups_aggregates[i]
ageCond_labels = f'{str(ages[0])}-{str(ages[1])}'
adf = df[(df.agebin > ages[0]) & (df.agebin <= ages[1])]
if adf.empty:
pass
else:
adf = adf.groupby(self.sweep_variables + ['year', 'day'])[
['nPatent', 'nUncomp', 'nSevere', 'nHost']].agg('sum').reset_index()
adf['prevalence'] = adf['nPatent'] / (adf['nHost'])
adf['clinical_incidence'] = adf['nUncomp'] / (adf['nHost'] / (365 / 5)) # (events per person per year)
adf['severe_incidence'] = adf['nSevere'] / (adf['nHost'] / (365 / 5)) # (events per person per year)
adf['ageGroup'] = ageCond_labels
cdf = pd.concat([cdf, adf])
cdf = pd.merge(left=cdf, right=df_pfpr2to10, on=self.sweep_variables + ['year', 'day'])
scen_df = pd.read_csv(os.path.join(self.working_dir, 'scenarios.csv'))
if scen_df.entomology_mode[0] == 'dynamic':
eirdf = pd.read_csv(os.path.join(self.working_dir, 'EMOD', 'EIR_yr.csv'))
eirdf = eirdf.groupby(self.sweep_variables + ['year'])[['Annual EIR']].agg('mean').reset_index()
cdf = pd.merge(left=cdf, right=eirdf, on=self.sweep_variables + ['year'])
eirdf_daily = pd.read_csv(os.path.join(self.working_dir, 'EMOD', 'EIR_daily.csv'))
eirdf_daily = eirdf_daily.groupby(self.sweep_variables + ['year', 'day', 'timestep'])[
['eir', 'n_total_mos_pop', 'n_infectious_mos']].agg('mean').reset_index()
eirdf_daily['5day'] = eirdf_daily['day'].apply(lambda x: 5 * math.ceil(x / 5))
eirdf_5day = eirdf_daily.groupby(self.sweep_variables + ['5day', 'year']).agg({'eir': ['sum'],
'n_total_mos_pop': ['mean'],
'n_infectious_mos': ['mean'],
'timestep': [
'max']}).reset_index()
eirdf_5day.columns = eirdf_5day.columns.get_level_values(0)
eirdf_5day = eirdf_5day.rename(columns={'5day': 'day'})
cdf = pd.merge(left=cdf, right=eirdf_5day, on=self.sweep_variables + ['year', 'day'])
else:
cdf['eir'] = cdf['transmission_intensity_EMOD']
cdf['inputEIR'] = cdf['transmission_intensity_EMOD']
cdf = cdf.merge(scen_df, on='scen_id', how='inner')
# Rename columns for alignment with OpenMalaria results
cdf = cdf.rename(columns={"Run_Number": "seed"})
cdf['seed'] = cdf['seed'] + 1
cdf['mortality'] = ''
# cdf['date'] = cdf.apply( lambda x: datetime.date(int(x['year']), 1, 1) + datetime.timedelta(x['day'] - 1), axis=1)
# Save the processed DataFrame to a CSV file
print(f'\nSaving outputs to: {os.path.join(self.working_dir)}/EMOD')
cols_to_keep = ['scen_id', 'index', 'seed', 'target_output_values', 'transmission_intensity_EMOD', 'timestep',
'day', 'year', 'ageGroup', 'seasonality', 'cm_clinical', 'entomology_mode',
'eir', 'prevalence_2to10', 'prevalence', 'clinical_incidence', 'severe_incidence',
'n_total_mos_pop', 'n_infectious_mos'] + self.intervention_analyzer_columns
cdf = cdf[cols_to_keep]
cdf.to_csv((os.path.join(self.working_dir, 'EMOD', 'mmmpy_5day.csv')), index=False)