simulate_blasts.py

#!/usr/bin/env python
# SPDX-License-Identifier: AGPL-3.0-or-later
# Copyright (C) 2025 SWGY, Inc
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
import os
# For OpenBSD systems with more than 10 hardware threads, the OpenBLAS library
# will experience freezes. See https://englelab.gatech.edu/squaredtasks.html
# This must be set prior to importing numpy
os.environ["OPENBLAS_NUM_THREADS"] = "10"

import argparse
import controller as c
import csv
import geometry
import logging
import model as m
import numpy as np
import re
from datetime import datetime


def validate_blast_cube_center(value):
    # Validate that the value is in the form of "x,y,z", where x, y, and z are floats
    pattern = re.compile(r"^(-?\d+(\.\d+)?),(-?\d+(\.\d+)?),(-?\d+(\.\d+)?)$")
    if not pattern.match(value):
        raise argparse.ArgumentTypeError("Invalid format for blast-cube-center. It should be 'x,y,z' where x, y, and z are numbers.")
    x, y, z = map(float, value.split(","))
    return (x, y, z)

def calculate_offsets(c, l, num_segments):
    """
    Calculate the offsets required to cover 'l' length centered at 'c' across
    'num_segments' segments.
    Args:
            c: Center location of the span.
            l: Length of the span to cover.
            num_segments: Number of segments to cover.

    Returns:
        Array of offsets
    """
    if num_segments <= 1:
        raise ValueError("num_segments must be > 1")

    step_size = l / (num_segments - 1)
    offsets = []
    for i in range(num_segments):
        offset = c - (l / 2) + (i * step_size)
        offsets.append(offset)
    return offsets

def parse_arguments():
    parser = argparse.ArgumentParser(description="Simulate blast point tracing.")

    # Add all the long options with default values
    parser.add_argument('--blast-cube-center', type=validate_blast_cube_center, default="0,-4,0",
                        help='The center of the cube of simulated blast points (x,y,z in meters). Default is "0,-4,0".')
    parser.add_argument('--blast-cube-extents-meters', type=float, default=8,
                        help='The length of each side of the cube of simulated blast points (in meters). Default is 8 meters.')
    parser.add_argument('--blast-cube-segments', type=int, default=10,
                        help='The number of segments to explore in each dimension. Default is 10.')
    parser.add_argument('--blast-min-dist-meters', type=float, default=0.7,
                        help='Any blast closer to the sensor than this value will not be simulated. Default value is 0.7')
    parser.add_argument('--initial-peak-pressure-kpa', type=float, default=50_000,
                        help='The initial peak pressure in kilopascals. Default is 50,000 kPa.')
    parser.add_argument('--ray-resolution-arcmin', type=int, default=20,
                        help='The resolution of the blast ray tracing in arc-minutes. Default is 20.')
    parser.add_argument('--ray-max-bounces', type=int, default=4,
                        help='Max bounces to follow a ray in the blast ray tracing. Default is 4.')
    parser.add_argument('--ray-max-length-meters', type=float, default=35,
                        help='Maximum distance to follow a blast ray in meters. Default is 35 meters.')
    parser.add_argument('--sensor-diameter-meters', type=float, default=0.111,
                        help='Sensor diameter in meters. Default is 0.111 meters.')
    parser.add_argument('--helmet-geometry', type=str, required=True,
                        help='Path to the MICH helmet geometry file. Required.')
    parser.add_argument('--vest-geometry', type=str, required=True,
                        help='Path to the SAPI plate geometry file. Required.')
    parser.add_argument('--output-file', type=str, default=f"./blast-results-{datetime.now():%Y-%m-%d-%H%M}.csv",
                        help='Output CSV file to store the results. Default is a timestamped file.')
    parser.add_argument('--max-workers', type=int, required=True,
                        help='Maximum number of parallel workers to use in the calculation. "0" means single-threaded.')
    parser.add_argument('--verbose', action='store_true',
                        help='Enable verbose output (DEBUG level logging). Default is off.')

    # Parse the arguments
    return parser.parse_args()

# Example function to adjust logging based on the verbosity flag
def setup_logging(verbose=False):
    # Set logging level based on verbosity
    if verbose:
        logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
    else:
        logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def main():
    # Parse the arguments
    args = parse_arguments()

    setup_logging(args.verbose)

    # Just a quick printout of parsed values (for illustration)
    logging.info(f"Cube Center: {args.blast_cube_center}")
    logging.info(f"Cube Extents: {args.blast_cube_extents_meters} meters")
    logging.info(f"Cube Segments: {args.blast_cube_segments}")
    logging.info(f"Min Blast Distance: {args.blast_min_dist_meters}")
    logging.info(f"Initial Peak Pressure: {args.initial_peak_pressure_kpa} kPa")
    logging.info(f"Ray Resolution: {args.ray_resolution_arcmin} arc-min")
    logging.info(f"Ray Max Bounces: {args.ray_max_bounces}")
    logging.info(f"Ray Max Length: {args.ray_max_length_meters} meters")
    logging.info(f"Sensor Diameter: {args.sensor_diameter_meters} meters")
    logging.info(f"Helmet Geometry: {args.helmet_geometry}")
    logging.info(f"Vest Geometry: {args.vest_geometry}")
    logging.info(f"Output File: {args.output_file}")
    logging.info(f"Max Workers: {args.max_workers}")

    try:
        helmet = geometry.load_helmet(args.helmet_geometry)
        vest = geometry.load_vest(args.vest_geometry)
    except (FileNotFoundError, ValueError) as e:
        logging.error(f"Error loading geometry: {e}")
        sys.exit(1)

    blast_cube_center = np.array([args.blast_cube_center[0],
                                 args.blast_cube_center[1],
                                 args.blast_cube_center[2]])
    sensor = m.define_sensor(origin=np.array([0,0,0]),
                                 radius=args.sensor_diameter_meters)

    x_offs = calculate_offsets(blast_cube_center[0],
                               args.blast_cube_extents_meters,
                               args.blast_cube_segments)
    y_offs = calculate_offsets(blast_cube_center[1],
                               args.blast_cube_extents_meters,
                               args.blast_cube_segments)
    z_offs = calculate_offsets(blast_cube_center[2],
                               args.blast_cube_extents_meters,
                               args.blast_cube_segments)

    c_params = c.ControllerParams(
            p0=args.initial_peak_pressure_kpa,
            min_blast_dist=args.blast_min_dist_meters,
            resolution_arcmin=args.ray_resolution_arcmin,
            max_bounces=args.ray_max_bounces,
            max_length=args.ray_max_length_meters,
            max_workers=args.max_workers)

    if args.max_workers > 0:
        results = c.parallel_calculation(c_params, blast_cube_center,
                                         x_offs, y_offs, z_offs,
                                         helmet, vest, sensor)
    else: # Run sequentially
        results = c.sequential_calculation(c_params, blast_cube_center,
                                         x_offs, y_offs, z_offs,
                                         helmet, vest, sensor)

    # Write the results to the CSV
    with open(args.output_file, mode='w', newline='') as csvfile:
        csv_writer = csv.writer(csvfile)
        csv_writer.writerow(["bp_x", "bp_y", "bp_z",
                             "full_armor", "helmet_only",
                             "vest_only", "no_armor"])
        for r in results:
            csv_writer.writerow([
                f"{r.x:.2f}",
                f"{r.y:.2f}",
                f"{r.z:.2f}",
                f"{r.full_armor:.6f}",
                f"{r.helmet_only:.6f}",
                f"{r.vest_only:.6f}",
                f"{r.no_armor:.6f}",
            ])


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
    main()