#!/usr/bin/env python
# SPDX-License-Identifier: AGPL-3.0-or-later
# Copyright (C) 2025 SWGY, Inc
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import os
# For OpenBSD systems with more than 10 hardware threads, the OpenBLAS library
# will experience freezes. See https://englelab.gatech.edu/squaredtasks.html
# This must be set prior to importing numpy
os.environ["OPENBLAS_NUM_THREADS"] = "10"
import argparse
import controller as c
import csv
import geometry
import logging
import model as m
import numpy as np
import re
from datetime import datetime
def validate_blast_cube_center(value):
# Validate that the value is in the form of "x,y,z", where x, y, and z are floats
pattern = re.compile(r"^(-?\d+(\.\d+)?),(-?\d+(\.\d+)?),(-?\d+(\.\d+)?)$")
if not pattern.match(value):
raise argparse.ArgumentTypeError("Invalid format for blast-cube-center. It should be 'x,y,z' where x, y, and z are numbers.")
x, y, z = map(float, value.split(","))
return (x, y, z)
def calculate_offsets(c, l, num_segments):
"""
Calculate the offsets required to cover 'l' length centered at 'c' across
'num_segments' segments.
Args:
c: Center location of the span.
l: Length of the span to cover.
num_segments: Number of segments to cover.
Returns:
Array of offsets
"""
if num_segments <= 1:
raise ValueError("num_segments must be > 1")
step_size = l / (num_segments - 1)
offsets = []
for i in range(num_segments):
offset = c - (l / 2) + (i * step_size)
offsets.append(offset)
return offsets
def parse_arguments():
parser = argparse.ArgumentParser(description="Simulate blast point tracing.")
# Add all the long options with default values
parser.add_argument('--blast-cube-center', type=validate_blast_cube_center, default="0,-4,0",
help='The center of the cube of simulated blast points (x,y,z in meters). Default is "0,-4,0".')
parser.add_argument('--blast-cube-extents-meters', type=float, default=8,
help='The length of each side of the cube of simulated blast points (in meters). Default is 8 meters.')
parser.add_argument('--blast-cube-segments', type=int, default=10,
help='The number of segments to explore in each dimension. Default is 10.')
parser.add_argument('--blast-min-dist-meters', type=float, default=0.7,
help='Any blast closer to the sensor than this value will not be simulated. Default value is 0.7')
parser.add_argument('--initial-peak-pressure-kpa', type=float, default=50_000,
help='The initial peak pressure in kilopascals. Default is 50,000 kPa.')
parser.add_argument('--ray-resolution-arcmin', type=int, default=20,
help='The resolution of the blast ray tracing in arc-minutes. Default is 20.')
parser.add_argument('--ray-max-bounces', type=int, default=4,
help='Max bounces to follow a ray in the blast ray tracing. Default is 4.')
parser.add_argument('--ray-max-length-meters', type=float, default=35,
help='Maximum distance to follow a blast ray in meters. Default is 35 meters.')
parser.add_argument('--sensor-diameter-meters', type=float, default=0.111,
help='Sensor diameter in meters. Default is 0.111 meters.')
parser.add_argument('--helmet-geometry', type=str, required=True,
help='Path to the MICH helmet geometry file. Required.')
parser.add_argument('--vest-geometry', type=str, required=True,
help='Path to the SAPI plate geometry file. Required.')
parser.add_argument('--output-file', type=str, default=f"./blast-results-{datetime.now():%Y-%m-%d-%H%M}.csv",
help='Output CSV file to store the results. Default is a timestamped file.')
parser.add_argument('--max-workers', type=int, required=True,
help='Maximum number of parallel workers to use in the calculation. "0" means single-threaded.')
parser.add_argument('--verbose', action='store_true',
help='Enable verbose output (DEBUG level logging). Default is off.')
# Parse the arguments
return parser.parse_args()
# Example function to adjust logging based on the verbosity flag
def setup_logging(verbose=False):
# Set logging level based on verbosity
if verbose:
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
else:
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def main():
# Parse the arguments
args = parse_arguments()
setup_logging(args.verbose)
# Just a quick printout of parsed values (for illustration)
logging.info(f"Cube Center: {args.blast_cube_center}")
logging.info(f"Cube Extents: {args.blast_cube_extents_meters} meters")
logging.info(f"Cube Segments: {args.blast_cube_segments}")
logging.info(f"Min Blast Distance: {args.blast_min_dist_meters}")
logging.info(f"Initial Peak Pressure: {args.initial_peak_pressure_kpa} kPa")
logging.info(f"Ray Resolution: {args.ray_resolution_arcmin} arc-min")
logging.info(f"Ray Max Bounces: {args.ray_max_bounces}")
logging.info(f"Ray Max Length: {args.ray_max_length_meters} meters")
logging.info(f"Sensor Diameter: {args.sensor_diameter_meters} meters")
logging.info(f"Helmet Geometry: {args.helmet_geometry}")
logging.info(f"Vest Geometry: {args.vest_geometry}")
logging.info(f"Output File: {args.output_file}")
logging.info(f"Max Workers: {args.max_workers}")
try:
helmet = geometry.load_helmet(args.helmet_geometry)
vest = geometry.load_vest(args.vest_geometry)
except (FileNotFoundError, ValueError) as e:
logging.error(f"Error loading geometry: {e}")
sys.exit(1)
blast_cube_center = np.array([args.blast_cube_center[0],
args.blast_cube_center[1],
args.blast_cube_center[2]])
sensor = m.define_sensor(origin=np.array([0,0,0]),
radius=args.sensor_diameter_meters)
x_offs = calculate_offsets(blast_cube_center[0],
args.blast_cube_extents_meters,
args.blast_cube_segments)
y_offs = calculate_offsets(blast_cube_center[1],
args.blast_cube_extents_meters,
args.blast_cube_segments)
z_offs = calculate_offsets(blast_cube_center[2],
args.blast_cube_extents_meters,
args.blast_cube_segments)
c_params = c.ControllerParams(
p0=args.initial_peak_pressure_kpa,
min_blast_dist=args.blast_min_dist_meters,
resolution_arcmin=args.ray_resolution_arcmin,
max_bounces=args.ray_max_bounces,
max_length=args.ray_max_length_meters,
max_workers=args.max_workers)
if args.max_workers > 0:
results = c.parallel_calculation(c_params, blast_cube_center,
x_offs, y_offs, z_offs,
helmet, vest, sensor)
else: # Run sequentially
results = c.sequential_calculation(c_params, blast_cube_center,
x_offs, y_offs, z_offs,
helmet, vest, sensor)
# Write the results to the CSV
with open(args.output_file, mode='w', newline='') as csvfile:
csv_writer = csv.writer(csvfile)
csv_writer.writerow(["bp_x", "bp_y", "bp_z",
"full_armor", "helmet_only",
"vest_only", "no_armor"])
for r in results:
csv_writer.writerow([
f"{r.x:.2f}",
f"{r.y:.2f}",
f"{r.z:.2f}",
f"{r.full_armor:.6f}",
f"{r.helmet_only:.6f}",
f"{r.vest_only:.6f}",
f"{r.no_armor:.6f}",
])
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
main()