Source code for pymead.analysis.read_aero_data

import os
import re
import typing

import pandas as pd
import numpy as np
import shapely
import triangle


flow_var_idx = {"M": 7, "Cp": 8, "p": 3, "rho": 2, "u": 4, "v": 5, "V": 6, "Cpt": 9, "dCpt": 10, "dCp": 11}


[docs] def read_Cp_from_file_xfoil(fname: str): """ Reads the :math:`C_p` data from the format output by XFOIL and converts it to a *numpy* array Parameters ========== fname: str File from which to read the :math:`C_p` data Returns ======= dict Dictionary containing 1-D *numpy* arrays for :math:`x`, :math:`y`, and :math:`C_p` """ y_in_header = False header_line_idx = None with open(fname, "r") as f: for idx, line in enumerate(f): if "Cp" not in line: continue header_line_idx = idx if "y" in line: y_in_header = True break else: raise ValueError("Could not detect the header line in the XFOIL Cp file") # Read in the Cp data for XFOIL versions 6.93 or 6.99 (other versions are untested) names = ["x", "y", "Cp"] if y_in_header else ["x", "Cp"] cols = {"x": 0, "y": 1, "Cp": 2} if y_in_header else {"x": 0, "Cp": 1} df = pd.read_csv(fname, skiprows=header_line_idx + 1, names=names, sep=r"\s+", engine='python') array_ = df.to_numpy() return {name: array_[:, cols[name]] for name in names}
[docs] def read_aero_data_from_xfoil(fpath: str, aero_data: dict): """ Reads aerodynamic data from XFOIL's output log Parameters ========== fpath: str File path to the XFOIL log aero_data: dict Dictionary in which the aerodynamic data will be written Returns ======= str, str The first and second lines of the XFOIL log files containing aerodynamic output data """ line1 = None line2 = None for line in read_reverse_order(fpath): if 'VISCAL: Convergence failed' in line: aero_data['converged'] = False break elif 'Cm' in line: line1 = line elif 'CL' in line: line2 = line aero_data['converged'] = True break else: aero_data['converged'] = False aero_data['errored_out'] = True return line1, line2
[docs] def read_reverse_order(file_name): """ Reads a file in reverse order. From `<https://thispointer.com/python-read-a-file-in-reverse-order-line-by-line/>`_ """ # Open file for reading in binary mode with open(file_name, 'rb') as read_obj: # Move the cursor to the end of the file read_obj.seek(0, os.SEEK_END) # Get the current position of pointer i.e eof pointer_location = read_obj.tell() # Create a buffer to keep the last read line buffer = bytearray() # Loop till pointer reaches the top of the file while pointer_location >= 0: # Move the file pointer to the location pointed by pointer_location read_obj.seek(pointer_location) # Shift pointer location by -1 pointer_location = pointer_location - 1 # read that byte / character new_byte = read_obj.read(1) # If the read byte is new line character then it means one line is read if new_byte == b'\n': # Fetch the line from buffer and yield it yield buffer.decode()[::-1] # Reinitialize the byte array to save next line buffer = bytearray() else: # If last read character is not eol then add it in buffer buffer.extend(new_byte) # As file is read completely, if there is still data in buffer, then its the first line. if len(buffer) > 0: # Yield the first line too yield buffer.decode()[::-1]
[docs] def read_forces_from_mses(search_file: str): """ This function uses regex to extract the aerodynamic force coefficients of the airfoil [system] from the ``list forces`` option available in the MPLOT menu Parameters ========== search_file: str "Forces" file to be read (log of the MPLOT function with ``mode=="forces"``) Returns ======= dict Dictionary containing the aerodynamic force coefficients """ CL_lines = [] element_lines = [] alpha_line, vw_line, fp_line, ad_line, ad_mass_flow_line = None, None, None, None, None element = 0 with open(search_file, 'r') as f: file_out = f.readlines() for line in file_out: if re.search("CL", line): CL_lines.append(line) if 'alpha' in line: alpha_line = line if 'viscous' in line: vw_line = line if 'friction' in line: fp_line = line if 'CDh' in line: ad_line = line if "mh / rho V" in line: ad_mass_flow_line = line if "Element" in line: element = int(line.split(":")[-1].strip()) if element != 0 and "CL" in line: element_lines.append([line]) if element != 0 and "CM" in line: element_lines[-1].append(line) if element != 0 and "top" in line: element_lines[-1].append(line) try: required_line = CL_lines[1] # line we need happens the second time the string "CL" is mentioned split_line = required_line.split() # split the up the line to grab the actual numbers forces = { # write as a dictionary 'Cl': float(split_line[2]), # CL = VALUE 'Cd': float(split_line[5]), # CD = VALUE 'Cm': float(split_line[8]) # CM = VALUE } if ad_line is None and forces["Cd"] < -1e-5: raise ValueError("Negative drag calculated!") # Find the angle of attack alpha_line = alpha_line.split('=') alpha_line = alpha_line[-1].split() forces['alf'] = float(alpha_line[-2]) # Find the viscous drag and wave drag coefficients in the drag breakdown (these two values + Cdh should add to Cd) vw_line = vw_line.split('=') forces['Cdv'] = float(vw_line[-2].split()[0]) forces['Cdw'] = float(vw_line[-1].split()[0]) if ad_line is None: if forces["Cdw"] < -1e-5: raise ValueError("Negative wave drag calculated!") if forces["Cdv"] < -1e-5: raise ValueError("Negative viscous drag calculated!") # Find the friction drag and pressure drag coefficients in the drag breakdown (these two values should add to Cd) fp_line = fp_line.split('=') forces['Cdf'] = float(fp_line[-2].split()[0]) forces['Cdp'] = float(fp_line[-1].split()[0]) if ad_line is None: if forces["Cdf"] < -1e-5: raise ValueError("Negative friction drag calculated!") if forces["Cdp"] < -1e-5: raise ValueError("Negative pressure drag calculated!") # Find the "drag" due to the actuator disk: if ad_line is not None: ad_line = ad_line.split('=') forces['Cdh'] = float(ad_line[-1].split()[0]) else: forces['Cdh'] = 0.0 # Find the non-dimensional mass flow rate due to the actuator disk: if ad_mass_flow_line is not None: ad_mass_flow_line = ad_mass_flow_line.split("=") forces["mh / rho V"] = float(ad_mass_flow_line[-1].strip()) else: forces["mh / rho V"] = 0.0 # Find the elementwise aerodynamic performance components if element_lines: for idx, element_line in enumerate(element_lines): forces[f"Element-{idx + 1}"] = { "Cl": float(element_line[0].split()[2]), "Cdv": float(element_line[0].split()[5]), "Cm": float(element_line[1].split()[2]), "Cdf": float(element_line[1].split()[5]), } if len(element_line) > 2: forces[f"Element-{idx + 1}"]["top_xtr"] = float(element_line[2].split()[3]) forces[f"Element-{idx + 1}"]["bot_xtr"] = float(element_line[2].split()[7]) except: forces = {'Cl': 0.0, 'Cd': 1000.0, 'Cm': 1000.0, 'alf': 0.0, 'Cdv': 1000.0, 'Cdw': 1000.0, 'Cdf': 1000.0, 'Cdp': 1000.0, 'Cdh': 1000.0, 'CPK': 1000.0, "mh / rho V": 0.0} # print(f"{forces = }") return forces
[docs] def read_grid_stats_from_mses(src_file: str): """ Reads grid statistics from an MPLOT output file and outputs them to a dictionary Parameters ========== src_file: str The file from which to read the grid statistics Returns ======= dict The grid statistics in a Python dictionary format """ with open(src_file, 'r') as f: lines = f.readlines() grid_stats = {'grid_size': None, 'numel': None, 'Jside1': [], 'Jside2': [], 'ILE1': [], 'ILE2': [], 'ITE1': [], 'ITE2': [], } for line in lines: if 'Grid size' in line: line_split = line.split('=')[-1] # print(f"{[idx for idx in line_split.split()] = }") grid_stats['grid_size'] = [int(idx) for idx in line_split.split()] elif 'Number' in line: line_split = line.split(':')[-1] grid_stats['numel'] = int(line_split.strip()) elif 'Jside1' in line: line_split = line.split('=') grid_stats['Jside1'].append(int(line_split[1].split()[0])) grid_stats['ILE1'].append(int(line_split[2].split()[0])) grid_stats['ITE1'].append(int(line_split[3].split()[0])) elif 'Jside2' in line: line_split = line.split('=') grid_stats['Jside2'].append(int(line_split[1].split()[0])) grid_stats['ILE2'].append(int(line_split[2].split()[0])) grid_stats['ITE2'].append(int(line_split[3].split()[0])) return grid_stats
[docs] def read_actuator_disk_data_mses(mses_log_file: str, grid_stats: dict) -> typing.List[dict]: """ Reads actuator disk data from the MSES log file (usually ``mses.log``). Parameters ========== mses_log_file: str Log file generated by MSES after CFD evaluation, usually ``mses.log``. grid_stats: dict Grid statistics dictionary generated by ``read_grid_stats_from_mses`` in this module. Returns ======= typing.List[dict] List of actuator disk data, with each element in the list representing a different actuator disk """ with open(mses_log_file, "r") as f: lines = f.readlines() lines_AD = [] for line_idx, line in enumerate(lines): if "Actuator disk plane" in line: lines_AD.append(line_idx) data_AD = [] for line_idx in lines_AD: line1, line2, line3 = lines[line_idx].split(), lines[line_idx + 1].split(), lines[line_idx + 2].split() data_AD.append({"x_c": float(line1[6]), "i": int(line1[9]), "j_start": int(line1[12]), "j_end": int(line1[14]), "pt2_pt1": float(line2[2]), "dCpo": float(line2[5]), "ht2_ht1": float(line3[2]), "etah": float(line3[5]) }) for data in data_AD: flow_section_idx = None if data["j_start"] == 1: flow_section_idx = 0 else: for j_side_idx, j_side in enumerate(grid_stats["Jside1"][::-1]): if data["j_start"] == j_side: flow_section_idx = j_side_idx + 1 data["flow_section_idx"] = flow_section_idx data["field_i_up"] = data["i"] - 2 data["field_i_down"] = data["i"] - 1 data["field_j_start"] = data["j_start"] - flow_section_idx - 1 data["field_j_end"] = data["j_end"] - flow_section_idx - 1 return data_AD
[docs] def read_bl_data_from_mses(src_file: str) -> typing.List[dict]: r""" Reads boundary layer information from ``bl.*`` files generated by MSES Parameters ========== src_file: Boundary layer file, usually with the filename ``bl.{airfoil_name}`` Returns ======= typing.List[dict] Boundary layer data list (length :math:`M`) of dictionaries, where :math:`M` is the number of airfoil sides (2 times the number of airfoils) """ with open(src_file, 'r') as f: line1 = f.readline() header_line = line1.replace(' #', '').split() header_line.pop(0) header_line[-1] = 'Pend' df = pd.read_csv(src_file, sep=r"\s+", skiprows=2, names=header_line) bl = [{}] side_idx = 0 # old_x_val = 0 for idx, s_val in enumerate(df.s): if idx == 0: for var_name in df.columns: bl[side_idx][var_name] = [getattr(df, var_name)[idx]] else: if s_val == 0.0: # Criterion for the start of a new airfoil side: arc length = 0 side_idx += 1 bl.append({}) for var_name in df.columns: bl[side_idx][var_name] = [getattr(df, var_name)[idx]] else: for var_name in df.columns: bl[side_idx][var_name].append(getattr(df, var_name)[idx]) return bl
[docs] def read_Mach_from_mses_file(mses_file: str) -> float: """ Reads the freestream Mach number from the MSES settings file (usually of the form ``mses.*``). Parameters ========== mses_file: str Settings file for MSES, most likely of the form ``mses.*`` Returns ======= float Freestream Mach number """ with open(mses_file, "r") as f: lines = f.readlines() mach_line = lines[2] mach_split = mach_line.split() mach_float = float(mach_split[0]) return mach_float
[docs] def read_field_from_mses(src_file: str, M_inf: float or None = None, gam: float or None = None): r""" Reads a field dump file from MSES (by default, of the form ``field.*``) and outputs the information to an array. The array has shape :math:`9 \times m \times n` (or :math:`12 \times m \times n` if ``M_inf`` and ``gam`` are specified), where :math:`m` is the number of streamlines and :math:`n` is the number of streamwise cells in a given streamline. The 12 entries in the zeroth axis correspond to the flow variables as follows: * 0: :math:`x` * 1: :math:`y` * 2: :math:`\rho/\rho_\infty` (density) * 3: :math:`p/p_\infty` (pressure) * 4: :math:`u/V_\infty` (:math:`x`-velocity) * 5: :math:`v/V_\infty` (:math:`y`-velocity) * 6: :math:`|V|/V_\infty` (velocity magnitude) * 7: :math:`M` (Mach number) * 8: :math:`C_p` (pressure coefficient) * 9: :math:`C_{p_t}` (total pressure coefficient) * 10: :math:`\Delta C_{p_t}` (change in total pressure coefficient relative to the previous streamwise cell) * 11: :math:`\Delta C_p` (change in pressure coefficient relative to the previous streamwise cell) Note that for :math:`\Delta C_{p_t}` and :math:`\Delta C_p`, the value of the first streamwise cells (the cells along the inlet plane) is defined to be 0. Also note that entries 9, 10, and 11 are only available if ``M_inf`` and ``gam`` are specified. Parameters ========== src_file: str Source file containing the MSES field output M_inf: float or None Freestream Mach number (must be set if entries 9, 10, or 11 are needed). Default: ``None``. gam: float or None Specific heat ratio (must be set if entries 9, 10, or 11 are needed). Default: ``None``. Returns ======= np.ndarray Array of MSES field data, reshaped to :math:`9 \times m \times n`, or :math:`12 \times m \times n` if ``M_inf`` and ``gam`` are specified. """ data = np.loadtxt(src_file, skiprows=2) n_flow_vars = data.shape[1] with open(src_file, "r") as f: lines = f.readlines() n_streamlines = 0 for line in lines: if line == "\n": n_streamlines += 1 n_streamwise_lines = int(data.shape[0] / n_streamlines) field_array = np.array([data[:, i].reshape(n_streamlines, n_streamwise_lines).T for i in range(n_flow_vars)]) if M_inf is not None and gam is not None: Cpt = field_array[3][:, :] * (1 + (gam - 1) / 2 * field_array[7][:, :] ** 2) ** (gam / (gam - 1)) * (2 / gam / M_inf**2) Cp = field_array[flow_var_idx["Cp"]][:, :] Cpt = np.array([Cpt]) Cp = np.array([Cp]) field_array = np.concatenate((field_array, Cpt)) dCpt = np.zeros(Cpt[0].shape) dCp = np.zeros(Cpt[0].shape) for idx in range(1, Cpt[0].shape[0]): dCpt[idx, :] = Cpt[0][idx, :] - Cpt[0][idx - 1, :] dCp[idx, :] = Cp[0][idx, :] - Cp[0][idx - 1, :] dCpt = np.array([dCpt]) dCp = np.array([dCp]) field_array = np.concatenate((field_array, dCpt)) field_array = np.concatenate((field_array, dCp)) return field_array
[docs] def read_field_variables_names_mses(field_file: str) -> typing.List[str]: with open(field_file, "r") as src_file: line = src_file.readline() headers = line.replace("#", "").replace("\n", "").split() return headers
[docs] def export_mses_field_to_tecplot_ascii(output_file: str, field_file: str, grid_stats_file: str, grid_file: str, blade_file: str, **kwargs): blade = convert_blade_file_to_array_list(blade_file) field_array = read_field_from_mses(field_file, **kwargs) grid_stats = read_grid_stats_from_mses(grid_stats_file) x_grid, y_grid = read_streamline_grid_from_mses(grid_file, grid_stats) headers = read_field_variables_names_mses(field_file) with open(output_file, "w") as ascii_file: ascii_file.write(f'VARIABLES = ') for header_idx, header in enumerate(headers): if header_idx < len(headers) - 1: ascii_file.write(f'"{header}", ') else: ascii_file.write(f'"{header}"\n') starting_streamline = 0 for zone_idx in range(len(x_grid)): i_max, j_max = x_grid[zone_idx].shape[0], x_grid[zone_idx].shape[1] ascii_file.write(f'ZONE T="ZONE {zone_idx + 1}", I={x_grid[zone_idx].shape[0]}, J={x_grid[zone_idx].shape[1]}, DATAPACKING=BLOCK, VARLOCATION=(') entry_counter = 5 # Used to limit the line length less than 4000 characters, as required by Tecplot for header_idx in range(2, len(headers)): if header_idx < len(headers) - 1: if entry_counter < 6: ascii_file.write(f"{header_idx + 1}=CELLCENTERED, ") else: ascii_file.write(f"{header_idx + 1}=CELLCENTERED,\n") entry_counter = 0 entry_counter += 1 else: ascii_file.write(f"{header_idx + 1}=CELLCENTERED)\n") # Write the x values (node-centered) entry_counter = 0 # Used to limit the line length less than 4000 characters, as required by Tecplot for j in range(j_max): for i in range(i_max): if entry_counter < 10: ascii_file.write(str(x_grid[zone_idx][i, j]) + " ") entry_counter += 1 else: ascii_file.write(str(x_grid[zone_idx][i, j]) + "\n") entry_counter = 0 ascii_file.write("\n") # Write the y values (node-centered) entry_counter = 0 # Used to limit the line length less than 4000 characters, as required by Tecplot for j in range(j_max): for i in range(i_max): if entry_counter < 10: ascii_file.write(str(y_grid[zone_idx][i, j]) + " ") entry_counter += 1 else: ascii_file.write(str(y_grid[zone_idx][i, j]) + "\n") entry_counter = 0 ascii_file.write("\n") # Write the other variables (cell-centered) entry_counter = 0 # Used to limit the line length less than 4000 characters, as required by Tecplot for header_idx in range(2, len(headers)): for j in range(starting_streamline, j_max + starting_streamline - 1): for i in range(0, i_max - 1): if entry_counter < 10: ascii_file.write(str(field_array[header_idx, i, j]) + " ") entry_counter += 1 else: ascii_file.write(str(field_array[header_idx, i, j]) + "\n") entry_counter = 0 ascii_file.write("\n") starting_streamline += x_grid[zone_idx].shape[1] - 1 # Write geometry to the file ascii_file.write("GEOMETRY X=0, Y=0, T=LINE, F=BLOCK, C=BLACK, FC=CUST2, CS=GRID\n") ascii_file.write(f"{len(blade)}\n") # Number of airfoils (each represented by a unique polyline) for airfoil in blade: X_strings = [str(x) for x in airfoil[:, 0].tolist()] Y_strings = [str(y) for y in airfoil[:, 1].tolist()] ascii_file.write(f"{len(airfoil)}\n") ascii_file.write(f"{' '.join(X_strings)}\n") # X-coords (block format) ascii_file.write(f"{' '.join(Y_strings)}\n") # Y-coords (block format)
[docs] def export_mses_field_to_paraview_xml(analysis_dir: str, x_grid: typing.List[np.ndarray], y_grid: typing.List[np.ndarray], headers: typing.List[str], field_array: np.ndarray) -> typing.List[str]: """ Writes MSES field data to the Paraview XML Structured Grid file type (``.vts``). A separate file is created for each "zone," the cells where the Euler equations are solved on each side of the displacement thickness streamlines. These files get stored in ``<analysis_dir>/Paraviewdata`` Parameters ---------- analysis_dir: str Directory where the ``ParaviewData`` directory will be created x_grid: typing.List[np.ndarray] List of grid :math:`x`-coordinate arrays (one for each zone). Stored in a "meshgrid"-like format y_grid: typing.List[np.ndarray] List of grid :math:`x`-coordinate arrays (one for each zone). Stored in a "meshgrid"-like format headers: typing.List[str] List of variable names ("Cp", "M", etc.) field_array: np.ndarray 3-D ``numpy.ndarray`` where the first dimension represents a given flow variable, and the second and third dimensions represent the value of the flow variable at indices ``i`` and ``j``, respectively Returns ------- typing.List[str] Absolute file path to each of the generated ``.vts`` files """ # Get the folder in which the structured data files will be stored, creating it if it does not yet exist output_vts_dir = os.path.join(analysis_dir, "ParaviewData") if not os.path.exists(output_vts_dir): os.mkdir(output_vts_dir) # Make a storage container for the vts file locations vts_files = [] starting_streamline = 0 for zone_idx in range(len(x_grid)): output_vts_file = os.path.join(output_vts_dir, f"mses_field_zone_{zone_idx}.vts") vts_files.append(output_vts_file) with open(output_vts_file, "w") as vts_file: vts_file.write('<?xml version="1.0"?>\n') vts_file.write('<VTKFile type="StructuredGrid" version="0.1" byte_order="LittleEndian">\n') # Get the grid extents i_max, j_max = x_grid[zone_idx].shape[0], x_grid[zone_idx].shape[1] whole_extent = [0, i_max - 1, 0, j_max - 1, 0, 0] whole_extent_str = ' '.join([str(ext_val) for ext_val in whole_extent]) vts_file.write(f'<StructuredGrid WholeExtent="{whole_extent_str}">\n') vts_file.write(f'<Piece Extent="{whole_extent_str}">\n') # Write the grid values (node-centered) vts_file.write('<Points>\n') vts_file.write('<DataArray type="Float64" NumberOfComponents="3" format="ascii">\n') for j in range(j_max): for i in range(i_max): vts_file.write(str(x_grid[zone_idx][i, j]) + ' ') vts_file.write(str(y_grid[zone_idx][i, j]) + ' ') vts_file.write('0 ') vts_file.write('\n') vts_file.write('</DataArray>\n') vts_file.write('</Points>\n') # Write the other variables (cell-centered) vts_file.write('<CellData Scalars="Cp" Normals="cell_normals">\n') for header_idx in range(2, len(headers)): vts_file.write(f'<DataArray type="Float64" Name="{headers[header_idx]}" format="ascii">\n') for j in range(starting_streamline, j_max + starting_streamline - 1): for i in range(0, i_max - 1): vts_file.write(str(field_array[header_idx, i, j]) + " ") vts_file.write("\n") vts_file.write('</DataArray>\n') # # Write the cell normals to another data array # vts_file.write(f'<DataArray type="Float64" Name="cell_normals" format="ascii" NumberOfComponents="3">\n') # for j in range(starting_streamline, j_max + starting_streamline - 1): # for i in range(0, i_max - 1): # vts_file.write("0 0 1 ") # vts_file.write('\n') # vts_file.write('</DataArray>\n') vts_file.write('</CellData>\n') starting_streamline += x_grid[zone_idx].shape[1] - 1 vts_file.write('</Piece>\n') vts_file.write('</StructuredGrid>\n') vts_file.write('</VTKFile>\n') return vts_files
[docs] def export_blade_to_paraview_xml(analysis_dir: str, blade: typing.List[np.ndarray]) -> str: """ Exports an array of multi-element airfoil coordinates to the Paraview XML PolyData (``.vtp``) format. Parameters ---------- analysis_dir: str Absolute path of the location where the newly created ``.vtp`` file will be stored blade: Each array in the list describes the coordinates of a different airfoil, and each array has shape :math:`M \times 2`, where :math:`M` is the number of discrete airfoil coordinates in a given airfoil Returns ------- str Absolute path to the newly created ``.vtp`` file """ # Get the folder in which the polydata will be stored, creating it if it does not yet exist output_vtp_dir = os.path.join(analysis_dir, "ParaviewData") if not os.path.exists(output_vtp_dir): os.mkdir(output_vtp_dir) output_vtp_file = os.path.join(output_vtp_dir, "mses_geom.vtp") with open(output_vtp_file, "w") as vtp_file: # File header and opening tags vtp_file.write('<?xml version="1.0"?>\n') vtp_file.write('<VTKFile type="PolyData" version="0.1" byte_order="LittleEndian">\n') vtp_file.write('<PolyData>\n') for airfoil in blade: # Perform a Delaunay triangulation (convex hull) segments = np.array([[i, i + 1] for i in range(airfoil.shape[0] - 1)]) tri = triangle.triangulate({"vertices": airfoil, "segments": segments}) vertices = tri['vertices'] triangles = tri['triangles'] # Get a buffered version of a polygon defined by the airfoil points. # This helps avoid floating point precision issues with the shapely `contains` method shapely_poly = shapely.Polygon(airfoil).buffer(1e-11) # Get the triangles outside the airfoil polygon triangles_to_remove = [] for tri_idx, tri_indices in enumerate(triangles): for edge_pair_combo in [[0, 1], [1, 2], [2, 0]]: xy = np.mean((vertices[tri_indices[edge_pair_combo[0]]], vertices[tri_indices[edge_pair_combo[1]]]), axis=0) if not shapely.contains(shapely_poly, shapely.Point(xy[0], xy[1])): triangles_to_remove.append(tri_idx) break # Remove these triangles to obtain a triangulated concave hull for triangles_to_remove in triangles_to_remove[::-1]: triangles = np.delete(triangles, triangles_to_remove, axis=0) vtp_file.write(f'<Piece NumberOfPoints="{len(vertices)}" ' f'NumberOfVerts="0" NumberOfLines="0" NumberOfStrips="0" ' f'NumberOfPolys="{len(triangles)}">\n') # Write the coordinates of each vertex to the file inside a DataArray vtp_file.write('<Points>\n') vtp_file.write('<DataArray type="Float64" NumberOfComponents="3" format="ascii">\n') for point in vertices: vtp_file.write(f"{point[0]} {point[1]} 0 ") vtp_file.write('\n') vtp_file.write('</DataArray>\n') vtp_file.write('</Points>\n') # Write the connectivity of each triangle to the file as well as an offset array that describes # how many connectivity points should be extracted from the array for each triangle vtp_file.write('<Polys>\n') vtp_file.write('<DataArray type="Int32" Name="connectivity" format="ascii">\n') for tri_indices in triangles: vtp_file.write(f"{tri_indices[0]} {tri_indices[1]} {tri_indices[2]} ") vtp_file.write('\n') vtp_file.write('</DataArray>\n') vtp_file.write('<DataArray type="Int32" Name="offsets" format="ascii">\n') offsets = 3 * np.arange(1, len(triangles) + 1) vtp_file.write(f'{" ".join([str(offset) for offset in offsets])}\n') vtp_file.write('</DataArray>\n') vtp_file.write('</Polys>\n') # Finish up by writing the closing tags vtp_file.write('</Piece>\n') vtp_file.write('</PolyData>\n') vtp_file.write('</VTKFile>\n') return output_vtp_file
[docs] def export_geom_and_mses_field_to_paraview(analysis_dir: str, field_file: str, grid_stats_file: str, grid_file: str, blade_file: str, **kwargs) -> (typing.List[str], str): """ Exports both airfoil geometry and MSES field data to Paraview XML files. The airfoil geometry is exported to a PolyData (``.vtp``) file, and each zone of the MSES data (number of airfoils plus one) is exported to its own Structured Grid (``.vts``) file. These files get exported to ``<analysis_dir>/ParaviewData``. Parameters ---------- analysis_dir: str Directory where the ``ParaviewData`` directory will be created field_file: str Path to the ``field.<airfoil-name>`` file created by MSES grid_stats_file: str Path to the ``mplot_grid_stats.log`` file created by running MSES through ``pymead`` grid_file: str Path to the ``grid.<airfoil-name>`` file created by MSES blade_file: str Path to the ``blade.<airfoil-name>`` file generated by ``pymead`` kwargs Additional keyword arguments that are passed to ``read_field_from_mses`` Returns ------- typing.List[str], str List of absolute file paths to each of the ``.vts`` files and the absolute file path to the ``.vtp`` file """ blade = convert_blade_file_to_array_list(blade_file) field_array = read_field_from_mses(field_file, **kwargs) grid_stats = read_grid_stats_from_mses(grid_stats_file) x_grid, y_grid = read_streamline_grid_from_mses(grid_file, grid_stats) headers = read_field_variables_names_mses(field_file) # Export the field data to the Paraview XML-style format (.vts files) vts_files = export_mses_field_to_paraview_xml(analysis_dir, x_grid, y_grid, headers, field_array) # Export the airfoil geometry to the Paraview XML-style format (.vtp file) vtp_file = export_blade_to_paraview_xml(analysis_dir, blade) return vts_files, vtp_file
[docs] def read_streamline_grid_from_mses(src_file: str, grid_stats: dict): r""" Reads the grid of streamlines from an MSES ``grid.*`` file Parameters ========== src_file: str File containing the grid output from MPLOT (usually of the form ``grid.*``) grid_stats: dict Output from the ``read_grid_stats_from_mses()`` function Returns ======= typing.Tuple(np.ndarray, np.ndarray) Grid x-coordinates and grid y-coordinates split by the stagnation streamlines """ data = np.loadtxt(src_file, skiprows=2) with open(src_file, "r") as f: lines = f.readlines() n_streamlines = 0 for idx, line in enumerate(lines): if line == "\n": n_streamlines += 1 n_streamwise_lines = int(data.shape[0] / n_streamlines) x_grid = data[:, 0].reshape(n_streamlines, n_streamwise_lines).T y_grid = data[:, 1].reshape(n_streamlines, n_streamwise_lines).T streamline_borders = np.sort(grid_stats["Jside2"]) split_x_grid = np.split(x_grid, streamline_borders, axis=1) split_y_grid = np.split(y_grid, streamline_borders, axis=1) return split_x_grid, split_y_grid
[docs] def convert_blade_file_to_array_list(src_file: str) -> typing.List[np.ndarray]: r""" Converts an MSES blade file (by default of the form ``blade.*``) to a list of arrays describing the airfoil coordinates. Parameters ---------- src_file: str Source file containing the MSES blade information (usually ``blade.*``) Returns ------- typing.List[np.ndarray] Each array represents a different airfoil, and each array has shape :math:`M \times 2`, where :math:`M` is the number of discrete airfoil coordinates in a given airfoil. """ # Load the text file as a single 2-D array containing all the airfoil coordinates blade = np.loadtxt(src_file, skiprows=2) # Get each row of "999s" that divide the blade file into separate airfoils airfoil_delimiter_rows = np.argwhere(blade == 999.0) # Split the array at these rows array_3d = np.split(blade, np.unique(airfoil_delimiter_rows[:, 0])) # Because the first split array is guaranteed not to contain a 999 row, we can just set it as the first list element new_airfoils = [array_3d[0]] # Remove the "999" rows and append the airfoil coordinate array to the list for airfoil in array_3d[1:]: new_airfoil = np.delete(airfoil, 0, axis=0) new_airfoils.append(new_airfoil) return new_airfoils
[docs] def read_polar(airfoil_name: str, base_dir: str) -> typing.Dict[str, typing.List[float]]: """ Reads the ``polar.xxx`` file produced by MPOLAR and converts the data to a dictionary of lists Parameters ---------- airfoil_name: str Name of the airfoil provided to MSES (also the name of the sub-folder inside ``base_dir`` containing the analysis files) base_dir: str Base directory of the analysis. The file being read should have the form ``base_dir/airfoil_name/polar.airfoil_name`` Returns ------- typing.Dict[str, typing.List[float]] Dictionary where each key is a string corresponding to an aerodynamic performance variable and each value is a list of the evaluation of that performance variable at every point along the polar. For example, the dictionary might look something like ``{"alf": [-1.0, 0.0, 1.0], "Cl": [0.2, 0.3, 0.4], ...}`` """ data = np.loadtxt(os.path.join(base_dir, airfoil_name, f"polar.{airfoil_name}"), skiprows=13) keys = ["alf", "Cl", "Cd", "Cm", "Cdw", "Cdv", "Cdp", "Ma", "Re", "top_xtr", "bot_xtr"] return {k: v.tolist() for (k, v) in zip(keys, data.T)}
if __name__ == "__main__": # name = "OptUnderwingFreeTransition" # my_folder = os.path.join(r"C:\Users\mlauer2\Documents\dissertation\data\MSESRuns", name) # export_mses_field_to_tecplot_ascii( # os.path.join(my_folder, f"{name}.dat"), # os.path.join(my_folder, f"field.{name}"), # os.path.join(my_folder, "mplot_grid_stats.log"), # os.path.join(my_folder, f"grid.{name}"), # os.path.join(my_folder, f"blade.{name}") # ) blade_name = "default_airfoil" my_folder = os.path.join(r"C:\Users\mlauer2\AppData\Local\Temp", blade_name) export_geom_and_mses_field_to_paraview( my_folder, os.path.join(my_folder, f"field.{blade_name}"), os.path.join(my_folder, "mplot_grid_stats.log"), os.path.join(my_folder, f"grid.{blade_name}"), os.path.join(my_folder, f"blade.{blade_name}") )