import typing
from copy import deepcopy
from multiprocessing import Pool
import multiprocessing.connection
import numpy as np
from pymead.analysis.calc_aero_data import calculate_aero_data
from pymead.core.geometry_collection import GeometryCollection
from pymead.utils.pymead_mp import kill_xfoil_mses_processes, pool_terminate_multi_tiered
[docs]
class CustomGASettings:
pass
[docs]
class Chromosome:
[docs]
def __init__(self, geo_col_dict: dict, param_dict: dict, generation: int,
population_idx: int, mea_name: str = None, airfoil_name: str = None,
genes: list or None = None, verbose: bool = True, evaluate_geometric_constraints: bool = True):
"""
Chromosome class constructor. Each Chromosome is the member of a particular Population.
"""
# Keyword argument validation
if mea_name is None and airfoil_name is None:
raise ValueError("Must specify either mea_name (for MSES) or airfoil_name (for XFOIL) for the Chromosome")
elif mea_name is not None and airfoil_name is not None:
raise ValueError("Must specify only one of mea_name (for MSES) or airfoil_name (for XFOIL) for the "
"Chromosome")
self.geo_col_dict = geo_col_dict
self.geo_col = None
self.mea_name = mea_name
self.airfoil_name = airfoil_name
self.mea = None
self.airfoil = None
self.airfoil_list = None
self.mea_airfoil_names = None
self.param_dict = deepcopy(param_dict)
self.genes = deepcopy(genes)
self.generation = generation
self.population_idx = population_idx
self.evaluate_geometric_constraints = evaluate_geometric_constraints
# Might be able to remove a number of these attributes
self.coords = None
self.control_points = None
self.airfoil_state = None
self.verbose = verbose
self.airfoil_sys_generated = False
self.intersection_checked = False
self.valid_geometry = False
self.mutated = False
self.fitness = None
self.forces = None
[docs]
def generate(self):
"""
Chromosome generation flow
:return:
"""
self.geo_col = GeometryCollection.set_from_dict_rep(deepcopy(self.geo_col_dict))
self.mea = None if self.mea_name is None else self.geo_col.container()["mea"][self.mea_name]
self.airfoil = None if self.airfoil_name is None else self.geo_col.container()["airfoils"][self.airfoil_name]
self.airfoil_list = [self.airfoil] if self.airfoil is not None else self.mea.airfoils
if self.mea is not None:
self.mea_airfoil_names = [airfoil.name() for airfoil in self.mea.airfoils]
if self.verbose:
print(f'Generating chromosome idx = {self.population_idx}, gen = {self.generation}')
self.generate_airfoil_sys_from_genes()
if self.evaluate_geometric_constraints:
self.chk_self_intersection()
for airfoil in self.airfoil_list:
airfoil_name = airfoil.name()
if self.valid_geometry:
if self.param_dict["constraints"][airfoil_name]['min_radius_curvature'][1]:
self.chk_min_radius(airfoil_name=airfoil_name)
if self.valid_geometry:
if self.param_dict["constraints"][airfoil_name]['min_val_of_max_thickness'][1]:
self.chk_max_thickness(airfoil_name=airfoil_name)
if self.valid_geometry:
cnstr_spec = self.param_dict["constraints"][airfoil_name]["thickness_at_points"]
if cnstr_spec is not None and cnstr_spec[1]:
self.check_thickness_at_points(airfoil_name=airfoil_name)
if self.valid_geometry:
if self.param_dict["constraints"][airfoil_name]["min_area"][1]:
self.check_min_area(airfoil_name=airfoil_name)
if self.valid_geometry:
cnstr_spec = self.param_dict["constraints"][airfoil_name]["internal_geometry"]
if cnstr_spec is not None and cnstr_spec[1]:
if "Before" in cnstr_spec[2]:
self.check_contains_points(airfoil_name=airfoil_name)
else:
raise ValueError("Internal geometry timing after "
"aerodynamic evaluation not yet implemented")
# Set the equation_dict to None because it contains the unpicklable __builtins__ module
for param in self.geo_col.container()["params"].values():
param.equation_dict = None
for desvar in self.geo_col.container()["desvar"].values():
desvar.equation_dict = None
def get_coords(self):
if self.airfoil is not None:
coords = self.airfoil.get_scaled_coords()
else:
coords, transformation_kwargs = self.mea.get_coords_list_chord_relative(
max_airfoil_points=self.param_dict['mset_settings']["downsampling_max_pts"] if bool(
self.param_dict['mset_settings']["use_downsampling"]) else None,
curvature_exp=self.param_dict['mset_settings']["downsampling_curve_exp"]
)
return coords
[docs]
def generate_airfoil_sys_from_genes(self) -> dict:
"""
Converts Chromosome's gene list into a set of discrete airfoil system coordinates
:return:
"""
if self.genes is not None:
self.geo_col.assign_design_variable_values(dv_values=self.genes, bounds_normalized=True)
self.update_param_dict() # updates the MSES settings from the geometry (just for XCDELH right now)
self.coords = self.get_coords()
self.airfoil_sys_generated = True
return self.param_dict
def update_param_dict(self):
def _get_numerical_value_from_container(param: str):
containers_to_check = ["desvar", "params"]
for container in containers_to_check:
if param in self.geo_col.container()[container]:
return self.geo_col.container()[container][param].value()
raise ValueError(f"Could not find setting parameter {param} in the params or "
f"design variables containers")
if self.param_dict["tool"] != "MSES":
return
# Update the x/c-location of the actuator disks from the geometry collection value if necessary
if "XCDELH-Param" in self.param_dict["mses_settings"]:
xcdelh_params = self.param_dict["mses_settings"]["XCDELH-Param"]
for idx, xcdelh_param in enumerate(xcdelh_params):
if xcdelh_param:
if xcdelh_param in self.geo_col.container()["params"]:
self.param_dict["mses_settings"]["XCDELH"][idx] = self.geo_col.container()["params"][xcdelh_param].value()
elif xcdelh_param in self.geo_col.container()["desvar"]:
self.param_dict["mses_settings"]["XCDELH"][idx] = self.geo_col.container()["desvar"][xcdelh_param].value()
else:
raise ValueError(f"Could not find XCDELH parameter {xcdelh_param}")
# Update the fan pressure ratio from the updated design variable values if necessary
if "PTRHIN-DesVar" in self.param_dict["mses_settings"]:
for ad_idx, fpr_dv_list in enumerate(self.param_dict["mses_settings"]["PTRHIN-DesVar"]):
for fpr_idx, fpr_dv in enumerate(fpr_dv_list):
self.param_dict["mses_settings"]["PTRHIN-DesVar"][ad_idx][fpr_idx] = self.geo_col.container()["desvar"][fpr_dv].value()
# Update the angle of attack from the updated design variable values if necessary
alfa = self.param_dict["mses_settings"]["ALFAIN"]
# This block converts a dictionary representation of the alfa parameter to a float
# or a list (the latter only in the case of a multipoint optimization)
if isinstance(alfa, dict) and alfa["param"]:
if isinstance(alfa["param"], str):
self.param_dict["mses_settings"]["ALFAIN"] = _get_numerical_value_from_container(alfa["param"])
elif isinstance(alfa["param"], list):
if len(alfa["param"]) > 1:
self.param_dict["mses_settings"]["ALFAIN"] = [
_get_numerical_value_from_container(p) for p in alfa["param"]
]
elif len(alfa["param"]) == 1 and alfa["param"][0]:
self.param_dict["mses_settings"]["ALFAIN"] = _get_numerical_value_from_container(alfa["param"][0])
else:
self.param_dict["mses_settings"]["ALFAIN"] = alfa["value"]
[docs]
def chk_self_intersection(self) -> bool:
"""
Checks if airfoil geometry is self-intersecting
:return: Boolean flag
"""
try:
for airfoil in self.airfoil_list: # For each airfoil,
self_intersecting = airfoil.check_self_intersection()
if self_intersecting: # If the intersection array is not empty (& thus there is a
# self-intersection somewhere),
self_intersection_flag = True # Set self-intersection flag to True
self.valid_geometry = False
if self.verbose:
print('Failed self-intersection test')
return self_intersection_flag # Return the self-intersection flag and break out of the method
self_intersection_flag = False # If the whole method runs with no self-intersections, set the
# flag to False
self.valid_geometry = True
return self_intersection_flag # Return the false self-intersection flag, meaning that this geometry is
# good to go
finally:
self.intersection_checked = True
def chk_min_radius(self, airfoil_name: str) -> bool:
chord_relative = False
cnstr_spec = self.param_dict["constraints"][airfoil_name]["min_radius_curvature"]
if len(cnstr_spec) > 2:
if cnstr_spec[2] == "Non-Dimensional":
chord_relative = True
min_radius = self.geo_col.container()["airfoils"][airfoil_name].compute_min_radius(
chord_relative=chord_relative)
min_radius_too_small = min_radius < cnstr_spec[0]
self.valid_geometry = not min_radius_too_small
if self.verbose:
print(f'Min radius of curvature too small? {min_radius_too_small}. Min radius is {min_radius}')
return min_radius_too_small
def chk_max_thickness(self, airfoil_name: str) -> bool:
airfoil_frame_relative = True
cnstr_spec = self.param_dict["constraints"][airfoil_name]["min_val_of_max_thickness"]
if len(cnstr_spec) > 2:
if cnstr_spec[2] == "Dimensional":
airfoil_frame_relative = False
thickness_data = self.geo_col.container()["airfoils"][airfoil_name].compute_thickness(
airfoil_frame_relative=airfoil_frame_relative)
max_thickness = thickness_data["t_max"] if "t_max" in thickness_data else thickness_data["t/c_max"]
if max_thickness < cnstr_spec[0]:
max_thickness_too_small = True
else:
max_thickness_too_small = False
if max_thickness_too_small:
max_thickness_too_small_flag = True
self.valid_geometry = False
if self.verbose:
print(f'Max thickness is {max_thickness}. '
f'Failed thickness test in chk_max_thickness, trying again...')
for cnstr in self.geo_col.container()["geocon"].values():
cnstr.data = None
return max_thickness_too_small_flag
else:
max_thickness_too_small_flag = False
self.valid_geometry = True
if self.verbose:
print(f'Max thickness is {max_thickness}. Passed thickness test. Continuing...')
return max_thickness_too_small_flag
def check_thickness_at_points(self, airfoil_name: str):
airfoil_frame_relative = True
vertical_check = False
cnstr_spec = self.param_dict["constraints"][airfoil_name]["thickness_at_points"]
if len(cnstr_spec) > 1:
airfoil_frame_relative = "Non-Dimensional" in cnstr_spec[2]
vertical_check = "Vertical" in cnstr_spec[2]
thickness_array = np.array(cnstr_spec[0])
x_array = thickness_array[:, 0]
t_array = thickness_array[:, 1]
airfoil = self.geo_col.container()["airfoils"][airfoil_name]
thickness = airfoil.compute_thickness_at_points(
x_array, airfoil_frame_relative=airfoil_frame_relative, vertical_check=vertical_check)
if np.any(thickness < t_array):
if self.verbose:
print(f"Minimum required thickness condition not met at some point. Trying again")
self.valid_geometry = False
else:
if self.verbose:
print(f"Minimum required thickness condition met everywhere [success]")
self.valid_geometry = True
def check_min_area(self, airfoil_name: str):
airfoil_frame_relative = True
cnstr_spec = self.param_dict["constraints"][airfoil_name]["min_area"]
if len(cnstr_spec) > 2:
airfoil_frame_relative = cnstr_spec[2] == "Non-Dimensional"
area = self.geo_col.container()["airfoils"][airfoil_name].compute_area(
airfoil_frame_relative=airfoil_frame_relative)
required_min_area = cnstr_spec[0]
if area < required_min_area:
if self.verbose:
print(f'Area is {area} < required min. area ({required_min_area}). Trying again...')
self.valid_geometry = False
else:
if self.verbose:
print(f'Area is {area} >= minimum req. area ({required_min_area}) [success]. '
f'Continuing...')
self.valid_geometry = True
def check_contains_points(self, airfoil_name: str) -> bool:
airfoil_frame_relative, rotate_with_airfoil, translate_with_airfoil, scale_with_airfoil = True, True, True, True
cnstr_spec = self.param_dict["constraints"][airfoil_name]["internal_geometry"]
if len(cnstr_spec) > 1:
if "Rotate/Translate w/ Airfoil" in cnstr_spec[2]:
scale_with_airfoil = False
airfoil_frame_relative = False
elif "Absolute" in cnstr_spec[2]:
airfoil_frame_relative = False
rotate_with_airfoil = False
translate_with_airfoil = False
scale_with_airfoil = False
contains_points = self.geo_col.container()["airfoils"][airfoil_name].contains_line_string(
points=cnstr_spec[0],
airfoil_frame_relative=airfoil_frame_relative,
rotate_with_airfoil=rotate_with_airfoil,
translate_with_airfoil=translate_with_airfoil,
scale_with_airfoil=scale_with_airfoil
)
if not contains_points:
self.valid_geometry = False
if self.verbose:
print(f"Failed internal geometry test.")
return self.valid_geometry
self.valid_geometry = True
if self.verbose:
print(f"Passed internal geometry test. Continuing...")
return self.valid_geometry
[docs]
class Population:
[docs]
def __init__(self, param_dict: dict, generation: int,
parents: typing.List[Chromosome] or None, verbose: bool = True,
skip_parent_assignment: bool = False):
self.param_dict = deepcopy(param_dict)
self.generation = generation
self.verbose = verbose
self.population = []
self.parent_indices = []
self.converged_chromosomes = []
if not skip_parent_assignment:
for population_idx, chromosome in enumerate(parents):
chromosome.population_idx = population_idx
self.population.append(chromosome)
# self.parents.append(chromosome)
self.parent_indices.append(population_idx)
self.num_parents = len(self.parent_indices)
[docs]
def eval_chromosome_fitness(self, chromosome: Chromosome):
"""
Evaluates the fitness of a particular chromosome
"""
# print(f"Generating {chromosome.population_idx}...")
chromosome.generate()
if chromosome.evaluate_geometric_constraints:
if not chromosome.valid_geometry:
print(f"Geometry invalid for chromosome {chromosome.population_idx} "
f"(either self-intersecting or fails to meet a constraint)")
return chromosome
else:
print(f"Geometry {chromosome.population_idx} passed all the tests. Continuing on to evaluation...")
else:
print("Skipped evaluating geometric constraints. Continuing...")
if chromosome.fitness is None:
if self.verbose:
print(f'Chromosome {chromosome.population_idx} '
f'(generation: {chromosome.generation}): Evaluating fitness...')
xfoil_settings, mset_settings, mses_settings, mplot_settings = None, None, None, None
if chromosome.param_dict['tool'] == 'XFOIL':
tool = 'XFOIL'
xfoil_settings = chromosome.param_dict['xfoil_settings']
xfoil_settings["base_dir"] = chromosome.param_dict["base_folder"]
xfoil_settings["airfoil_name"] = chromosome.param_dict["name"][chromosome.population_idx]
elif chromosome.param_dict['tool'] == 'MSES':
tool = 'MSES'
mset_settings = chromosome.param_dict['mset_settings']
mses_settings = chromosome.param_dict['mses_settings']
mplot_settings = chromosome.param_dict['mplot_settings']
else:
raise ValueError('Only XFOIL and MSES are supported as tools in the optimization framework')
# Multipoint Tags
multipoint_tags = chromosome.param_dict["multipoint_tags"] \
if "multipoint_tags" in chromosome.param_dict else None
chromosome.forces, _ = calculate_aero_data(None,
chromosome.param_dict['base_folder'],
chromosome.param_dict['name'][chromosome.population_idx],
coords=chromosome.coords, tool=tool,
mea_airfoil_names=chromosome.mea_airfoil_names,
xfoil_settings=xfoil_settings,
mset_settings=mset_settings,
mses_settings=mses_settings,
mplot_settings=mplot_settings,
export_Cp=True,
multipoint_tags=multipoint_tags)
if (xfoil_settings is not None and xfoil_settings["multi_point_stencil"] is None) or (
mses_settings is not None and mses_settings['multi_point_stencil'] is None):
if chromosome.forces['converged'] and not chromosome.forces['errored_out'] \
and not chromosome.forces['timed_out']:
chromosome.fitness = 1 # Set to any value that is not False and not None
else:
if all(chromosome.forces['converged']) and not any(chromosome.forces['errored_out']) and not any(chromosome.forces['timed_out']):
chromosome.fitness = 1 # Set to any value that is not False and not None
if self.verbose and chromosome.fitness is not None:
if "CPK" in chromosome.forces.keys():
print(f"Fitness evaluated successfully for chromosome {chromosome.population_idx} with "
f"generation: {chromosome.generation} | CPK = {chromosome.forces['CPK']} | C_d = {chromosome.forces['Cd']} | C_l = "
f"{chromosome.forces['Cl']} | C_m = {chromosome.forces['Cm']}")
else:
print(f"Fitness evaluated successfully for chromosome {chromosome.population_idx} with "
f"generation: {chromosome.generation} | C_d = {chromosome.forces['Cd']} | C_l = "
f"{chromosome.forces['Cl']} | C_m = {chromosome.forces['Cm']}")
return chromosome
@staticmethod
def generate_chromosome(chromosome: Chromosome):
chromosome.generate()
return chromosome
def generate_chromosomes_parallel(self):
print("Generating chromosomes in parallel...")
with Pool(processes=self.param_dict['num_processors']) as pool:
result = pool.map(self.generate_chromosome, self.population)
for chromosome in result:
self.population = [chromosome if c.population_idx == chromosome.population_idx
else c for c in self.population]
[docs]
def eval_pop_fitness(self, sig: multiprocessing.connection.Connection = None):
"""
Evaluates the fitness of the population using parallel processing
"""
def _end_pool(chr_pool: multiprocessing.Pool):
pool_terminate_multi_tiered(chr_pool)
kill_xfoil_mses_processes()
n_eval = 0
n_converged_chromosomes = 0
pool = Pool(processes=self.param_dict['num_processors'])
for chromosome in pool.imap_unordered(self.eval_chromosome_fitness, self.population):
if chromosome.fitness is not None:
if chromosome.evaluate_geometric_constraints:
assert chromosome.valid_geometry
self.converged_chromosomes.append(chromosome)
n_converged_chromosomes += 1
n_eval += 1
gen = 1 if self.generation == 0 else self.generation
status_bar_message = f"Generation {gen} of {self.param_dict['n_max_gen']}: Converged " \
f"{n_converged_chromosomes} of {self.param_dict['population_size']} chromosomes. " \
f"Total evaluations: {n_eval}\n"
if sig is not None:
try:
sig.send(("message", status_bar_message))
except BrokenPipeError:
_end_pool(pool)
break
else:
print(status_bar_message)
if n_converged_chromosomes >= self.param_dict["population_size"]:
_end_pool(pool)
break
if n_converged_chromosomes < self.param_dict["population_size"]:
message_to_display = ("Ran out of chromosomes to analyze before population size was reached. "
"Increase the number of offspring in the optimization settings to allow "
"more chromosomes to be generated.")
if sig is not None:
try:
sig.send(("disp_message_box", message_to_display))
_end_pool(pool)
except BrokenPipeError:
_end_pool(pool)
return n_eval
else:
print(message_to_display)
_end_pool(pool)
for chromosome in self.converged_chromosomes:
# Re-write the population such that the order of the results from the multiprocessing.Pool does not
# matter
self.population = [chromosome if c.population_idx == chromosome.population_idx
else c for c in self.population]
return n_eval
def all_chromosomes_fitness_converged(self):
for chromosome in self.population:
if chromosome.fitness is None:
return False
return True