# SPDX-License-Identifier: AGPL-3.0-or-later
# Copyright (C) 2025 SWGY, Inc
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import csv
import logging
import model
import numpy as np
from concurrent.futures import ProcessPoolExecutor, as_completed
from dataclasses import dataclass
from datetime import datetime
@dataclass
class BlastResult:
"""Stores pressure readings for each armor configuration"""
p0: float
x: float
y: float
z: float
full_armor: float
helmet_only: float
vest_only: float
no_armor: float
@dataclass
class ControllerParams:
"""Global settings for this controller."""
p0: float
resolution_arcmin: int
max_bounces: int
max_length: float
max_workers: int
min_blast_dist: float
def compute_blast_result(blast_x: float, blast_y: float, blast_z: float,
res: int, mb: int, ml: float, P0_KPA: float,
helmet, vest, sensor):
"""
Simulates the pressure exerted by a blast at a given location and calculates the total impulse
for different armor configurations.
The function simulates the blast pressure and impulse for the following configurations:
1. Full armor (helmet and vest)
2. Helmet only
3. Vest only
4. No armor
The results are computed by tracing the blast rays and simulating the pressure based on the
armor configurations and other parameters like resolution, max bounces, and max ray length.
Args:
blast_x (float): The x-coordinate of the blast point (in meters).
blast_y (float): The y-coordinate of the blast point (in meters).
blast_z (float): The z-coordinate of the blast point (in meters).
res (int): The resolution of the blast ray tracing (in arc-minutes).
mb (int): The maximum number of bounces for ray tracing.
ml (float): The maximum length to trace the blast ray (in meters).
P0_KPA (float): The initial peak pressure of the blast (in kilopascals).
helmet (object): The helmet geometry object for armor simulation.
vest (object): The vest geometry object for armor simulation.
sensor (object): The sensor object used to simulate pressure and measure impulse.
Returns:
BlastResult: An object containing the computed total impulse for each armor configuration:
- `full_armor`: Impulse with both helmet and vest.
- `helmet_only`: Impulse with helmet only.
- `vest_only`: Impulse with vest only.
- `no_armor`: Impulse with no armor.
Notes:
The function uses the `simulate_pressure` method from the `model` to simulate the blast pressure
and compute the total impulse for each armor configuration. The method uses ray tracing to compute
the pressure at each configuration's point of impact.
"""
bp_n = np.array([blast_x, blast_y, blast_z])
full_armor = model.simulate_pressure(
geometry=[helmet, vest], blast_point=bp_n, sensor=sensor,
extents=model.compute_extents([], bp_n, sensor),
resolution_arcmin=res, ray_callback=model.trace_ray, p0=P0_KPA,
max_length=ml, max_bounces=mb,
).total_impulse
helmet_only = model.simulate_pressure(
geometry=[helmet], blast_point=bp_n, sensor=sensor,
extents=model.compute_extents([], bp_n, sensor),
resolution_arcmin=res, ray_callback=model.trace_ray, p0=P0_KPA,
max_length=ml, max_bounces=mb,
).total_impulse
vest_only = model.simulate_pressure(
geometry=[vest], blast_point=bp_n, sensor=sensor,
extents=model.compute_extents([], bp_n, sensor),
resolution_arcmin=res, ray_callback=model.trace_ray, p0=P0_KPA,
max_length=ml, max_bounces=mb,
).total_impulse
no_armor = model.simulate_pressure(
geometry=[], blast_point=bp_n, sensor=sensor,
extents=model.compute_extents([], bp_n, sensor),
resolution_arcmin=res, ray_callback=model.trace_ray, p0=P0_KPA,
max_length=ml, max_bounces=mb,
).total_impulse
return BlastResult(
P0_KPA, bp_n[0], bp_n[1], bp_n[2],
full_armor, helmet_only, vest_only, no_armor
)
def parallel_calculation(params: ControllerParams, blast_cube_center,
x_offsets, y_offsets, z_offsets,
helmet, vest, sensor):
"""
Perform a parallelized calculation of blast results at multiple locations
within a cube of blast points.
This function divides the cube into discrete segments and performs parallel
calculations to simulate the blast pressure and impulse at each point,
using different armor configurations (full armor, helmet only, vest only,
and no armor). The results are computed using multiple processes to speed
up the simulation.
Args:
params (ControllerParams): An object containing simulation parameters,
including resolution, maximum bounces, maximum ray length, and initial
peak pressure.
blast_cube_center (tuple): A tuple representing the center of the cube
(x, y, z) where the blast points are located.
x_offsets (list of float): A list of x-offset values for the cube grid.
y_offsets (list of float): A list of y-offset values for the cube grid.
z_offsets (list of float): A list of z-offset values for the cube grid.
helmet (object): The MICH helmet geometry object for simulation.
vest (object): The SAPI plate geometry object for simulation.
sensor (object): The sensor object for simulating the blast pressure
and impulse.
Returns:
list: A list of `BlastResult` objects, each containing the calculated
total impulse for different armor configurations (full armor, helmet
only, vest only, and no armor) for each blast point.
Notes:
- The cube is divided into grid points using the provided offsets for
x, y, and z dimensions.
- The calculation of the blast result is done using the
`compute_blast_result` function, which simulates
pressure and impulse using ray tracing.
- Progress updates are logged every 5% of the total tasks completed
during the parallel processing.
- The results are sorted by the x, y, and z coordinates before being
returned.
Example:
results = parallel_calculation(params, (0, 0, 0), x_offsets, y_offsets,
z_offsets, helmet, vest, sensor)
"""
blast_locs = [(x + blast_cube_center[0],
y + blast_cube_center[1],
z + blast_cube_center[2])
for x in x_offsets
for y in y_offsets
for z in z_offsets]
# Generate filename based on current date and time
filename = datetime.now().strftime("blast-results-%Y-%m-%d-%H%M.csv")
# Collect results in a list for final sorting and CSV writing
results = []
total_blasts = len(blast_locs)
# Emit an INFO update every so often
progress_interval = max(1, total_blasts // 50)
logging.info(f"Running {total_blasts} blast simulations.")
# Use parallel processing with up to 4 workers
with ProcessPoolExecutor(max_workers=params.max_workers) as executor:
# Submit all blast point jobs
future_to_offset = {
executor.submit(
compute_blast_result,
x, y, z,
params.resolution_arcmin,
params.max_bounces,
params.max_length,
params.p0,
helmet, vest, sensor
): (x, y, z)
for (x, y, z) in blast_locs
if model.calculate_distance((x, y, z), sensor.origin) >= params.min_blast_dist
}
completed_tasks = 0
for future in as_completed(future_to_offset):
res = future.result()
results.append(res)
completed_tasks += 1
if completed_tasks % progress_interval == 0:
percent_complete = (completed_tasks / total_blasts) * 100
logging.info(f"Progress: {percent_complete:.1f}% complete")
# Sort the results by (x, y, z) so they proceed spatially
# in a consistent order
results.sort(key=lambda r: (r.x, r.y, r.z))
logging.info("Finished parallel simulation")
return results
def sequential_calculation(params: ControllerParams, blast_cube_center,
x_offsets, y_offsets, z_offsets,
helmet, vest, sensor):
"""
Perform a sequential calculation of blast results at multiple locations
within a cube of blast points.
This function iterates over all the blast locations in the cube and
calculates the blast result for each location using the
`compute_blast_result` function. It simulates the pressure and impulse for
different armor configurations (full armor, helmet only, vest only, and no
armor). The results are computed sequentially, one by one, and progress is
logged.
Args:
params (ControllerParams): An object containing simulation parameters,
including resolution, maximum bounces, maximum ray length, and initial
peak pressure.
blast_cube_center (tuple): A tuple representing the center of the cube
(x, y, z) where the blast points are located.
x_offsets (list of float): A list of x-offset values for the cube grid.
y_offsets (list of float): A list of y-offset values for the cube grid.
z_offsets (list of float): A list of z-offset values for the cube grid.
helmet (object): The MICH helmet geometry object for simulation.
vest (object): The SAPI plate geometry object for simulation.
sensor (object): The sensor object for simulating the blast pressure
and impulse.
Returns:
list: A list of `BlastResult` objects, each containing the calculated
total impulse for different armor configurations (full armor, helmet
only, vest only, and no armor) for each blast point.
Notes:
- The cube is divided into grid points using the provided offsets for
x, y, and z dimensions.
- The calculation of the blast result is done sequentially by iterating
over each point in the grid.
- Progress updates are logged every 5% of the total tasks completed
during the sequential processing.
- The results are sorted by the x, y, and z coordinates before being
returned.
Example:
results = sequential_calculation(params, (0, 0, 0),
x_offsets, y_offsets, z_offsets,
helmet, vest, sensor)
"""
blast_locs = [(x + blast_cube_center[0],
y + blast_cube_center[1],
z + blast_cube_center[2])
for x in x_offsets
for y in y_offsets
for z in z_offsets]
# Generate filename based on current date and time
filename = datetime.now().strftime("blast-results-%Y-%m-%d-%H%M.csv")
# Collect results in a list for final sorting and CSV writing
results = []
total_blasts = len(blast_locs)
# Emit an INFO update every so often
progress_interval = max(1, total_blasts // 50)
logging.info(f"Running {total_blasts} blast simulations.")
# Perform calculations sequentially by iterating over all blast locations
for idx, (x, y, z) in enumerate(blast_locs, 1):
if model.calculate_distance((x, y, z), sensor.origin) >= params.min_blast_dist:
continue
# Compute the blast result for each location
res = compute_blast_result(
x, y, z,
params.resolution_arcmin,
params.max_bounces,
params.max_length,
params.p0,
helmet, vest, sensor
)
results.append(res)
if idx % progress_interval == 0:
percent_complete = (idx / total_blasts) * 100
logging.info(f"Progress: {percent_complete:.1f}% complete")
# Sort the results by (x, y, z) so they proceed spatially in a consistent order
results.sort(key=lambda r: (r.x, r.y, r.z))
logging.info("Finished sequential simulation")
return results