"""
Interface for easily parsing HYDRAD results
"""
import glob
import os
import pathlib
import astropy.units as u
import h5py
import matplotlib.pyplot as plt
import numpy as np
import plasmapy.particles
from scipy.interpolate import splev, splrep
from pydrad import log
from pydrad.configure import Configure
from pydrad.visualize import (animate_strand, plot_histogram, plot_profile,
plot_strand, plot_time_distance, plot_time_mesh)
__all__ = ['Strand', 'Profile', 'InitialProfile']
def get_master_time(hydrad_root, read_from_cfg=False):
"""
Get array of times that correspond to each timestep for the entire simulation.
Parameters
----------
hydrad_root : `str` or path-like
read_from_cfg : `bool`, optional
If True, create the time array from the cadence as specified in
`HYDRAD/config/hydrad.cfg` and the start time as given in the first
AMR file. Note that this is substantially faster than reading the time
from every AMR file, but there may be small differences between these
approximate time steps and the exact time steps listed in the AMR files.
Returns
-------
: `~astropy.units.Quantity`
"""
hydrad_root = pathlib.Path(hydrad_root)
amr_files = sorted((hydrad_root / 'Results').glob('profile*.amr'))
if read_from_cfg:
log.debug('Creating master time array from config files')
# NOTE: Sometimes this file is capitalized and some OSes are sensitive to this
cfg_file = hydrad_root / 'HYDRAD' / 'config' / 'hydrad.cfg'
if not cfg_file.is_file():
log.debug('hydrad.cfg not found; trying HYDRAD.cfg')
cfg_file = hydrad_root / 'HYDRAD' / 'config' / 'HYDRAD.cfg'
with cfg_file.open() as f:
lines = f.readlines()
cadence = float(lines[3])
with amr_files[0].open() as f:
start_time = float(f.readline())
time = start_time + np.arange(len(amr_files)) * cadence
else:
log.debug('Reading master time array from all AMR files')
time = np.zeros((len(amr_files),))
for i, af in enumerate(amr_files):
with af.open() as f:
time[i] = f.readline()
return sorted(time) * u.s
[docs]
class Strand(object):
"""
Container for parsing HYDRAD results
Parameters
----------
hydrad_root : path-like
Path to HYDRAD simulation directory
read_from_cfg : `bool`, optional
If True, create the time array from the cadence as specified in
`HYDRAD/config/hydrad.cfg` and the start time as given in the first
AMR file. Note that this is substantially faster than reading the time
from every AMR file, but there may be small differences between these
approximate time steps and the exact time steps listed in the AMR files.
"""
def __init__(self, hydrad_root, **kwargs):
self.hydrad_root = pathlib.Path(hydrad_root)
# This is different than time depending on the slicing. We allow this
# to be passed as a kwarg to avoid repeatedly reading multiple files
# when slicing.
self._master_time = kwargs.pop('master_time', None)
if self._master_time is None:
self._master_time = get_master_time(self.hydrad_root,
read_from_cfg=kwargs.get('read_from_cfg', False))
# NOTE: time is only specified when slicing a Strand. When not
# slicing, it should be read from the results.
self._time = kwargs.pop('time', self._master_time)
self._profile_kwargs = kwargs
def __repr__(self):
return f"""HYDrodynamics and RADiation (HYDRAD) Code
-----------------------------------------
Results path: {self.hydrad_root}
Time interval: [{self.time[0]}, {self.time[-1]}]
Number of profiles: {len(self)}
Loop length: {self.loop_length.to(u.Mm):.3f}"""
def __len__(self):
return self.time.shape[0]
def __getitem__(self, index):
# NOTE: This will throw an index error to stop iteration
_ = self.time[index]
if self.time[index].shape: # empty if time[index] is a scalar
return Strand(self.hydrad_root,
time=self.time[index],
master_time=self._master_time,
**self._profile_kwargs)
else:
return Profile(self.hydrad_root,
self.time[index],
master_time=self._master_time,
**self._profile_kwargs)
[docs]
def to_hdf5(self, filename, *variables):
"""
Save variables to an HDF5 file
Parameters
----------
filename : `str` or path-like
Path to HDF file
variables : `list`
Names of variables to save to file
"""
with h5py.File(filename, 'w') as hf:
ds = hf.create_dataset('time', data=self.time.value)
ds.attrs['unit'] = self.time.unit.to_string()
for p in self:
grp = hf.create_group(f'index{p._index}')
for v in variables:
data = getattr(p, v)
ds = grp.create_dataset(v, data=data.value)
ds.attrs['unit'] = data.unit.to_string()
@property
def config(self):
"""
Configuration options. This will only work if the simuation was also
configured by pydrad.
"""
return Configure.load_config(self.hydrad_root / 'pydrad_config.asdf')
@property
@u.quantity_input
def time(self) -> u.s:
"""
Simulation time
"""
return self._time
@property
@u.quantity_input
def loop_length(self) -> u.cm:
"""
Footpoint-to-footpoint loop length
"""
with (self.hydrad_root / 'Results' / 'profile0.amr').open() as f:
tmp = f.readlines()
loop_length = float(tmp[2])
return loop_length * u.cm
@property
def initial_conditions(self):
"""
`~pydrad.parse.Profile` for the solutions to the hydrostatic
equations used as initial conditions.
"""
return InitialProfile(self.hydrad_root,
0*u.s,
master_time=[0]*u.s,
**self._profile_kwargs)
[docs]
def peek(self, **kwargs):
"""
Quick look at all profiles for the run on a single plot. Takes
the same keyword arguments as `~pydrad.visualize.plot_strand`
"""
plot_strand(self, **kwargs)
plt.show()
[docs]
@u.quantity_input
def peek_time_distance(self, quantities=None, delta_s: u.cm = 1*u.Mm, **kwargs):
"""
Quick look at time-distance plots of various quantities. Takes
the same keyword arguments as `~pydrad.visualize.plot_time_distance`
"""
if quantities is None:
quantities = ['electron_temperature', 'electron_density', 'velocity']
_ = plot_time_distance(self, quantities, delta_s, **kwargs)
plt.show()
[docs]
def animate(self, **kwargs):
"""
Simple animation of time-dependent loop profiles. Takes the same
keyword arguments as `~pydrad.visualize.animate_strand`
"""
return animate_strand(self, **kwargs)
[docs]
@u.quantity_input
def get_unique_grid(self) -> u.cm:
all_coordinates = [p.coordinate.to(u.cm).value for p in self]
return np.unique(np.concatenate(all_coordinates).ravel()) * u.cm
[docs]
@u.quantity_input
def to_constant_grid(self, name, grid: u.cm, order=1):
"""
Interpolate a given quantity onto a spatial grid that is the same at
each time step.
Parameters
----------
name : `str`
grid : `~astropy.units.Quantity`
Spatial grid to interpolate onto
order : `int`
Order of the spline interpolation. Default is 1.
"""
q_uniform = np.zeros(self.time.shape+grid.shape)
grid_cm = grid.to(u.cm).value
for i, p in enumerate(self):
q = getattr(p, name)
tsk = splrep(p.coordinate.to(u.cm).value, q.value, k=order)
q_uniform[i, :] = splev(grid_cm, tsk, ext=0)
return q_uniform * q.unit
[docs]
def spatial_average(self, quantity, bounds=None):
"""
Compute a spatial average of a specific quantity or quantities
"""
return u.Quantity([p.spatial_average(quantity, bounds=bounds) for p in self])
[docs]
def column_emission_measure(self, bins=None, bounds=None):
"""
Column emission measure as a function of time
See Also
--------
Profile.column_emission_measure
"""
_, bins = self[0].column_emission_measure(bins=bins, bounds=bounds)
em = np.stack([p.column_emission_measure(bins=bins, bounds=bounds)[0] for p in self])
return em, bins
[docs]
def peek_emission_measure(self, bins=None, bounds=None, **kwargs):
em, bins = self.column_emission_measure(bins=bins, bounds=bounds)
bin_centers = (bins[1:] + bins[:-1])/2
# Make API consistent with plot_time_mesh
for k in ['cmap', 'norm', 'units', 'labels']:
if k in kwargs:
kwargs[k] = {'EM': kwargs[k]}
_ = plot_time_mesh(self, [('EM', em)], bin_centers, r'$T$', yscale='log', **kwargs)
plt.show()
[docs]
class Profile(object):
"""
Container for HYDRAD results at a given timestep. Typically accessed
through `pydrad.parse.Strand`
Parameters
----------
hydrad_root : `str`
Path to HYDRAD directory
time : `~astropy.units.Quantity`
Timestep corresponding to the profile of interest
"""
@u.quantity_input
def __init__(self, hydrad_root, time: u.s, **kwargs):
self.hydrad_root = pathlib.Path(hydrad_root)
if time.shape:
raise ValueError('time must be a scalar')
self.time = time
self._master_time = kwargs.get('master_time')
if self._master_time is None:
self._master_time = get_master_time(self.hydrad_root,
read_from_cfg=kwargs.get('read_from_cfg', False))
# Read results files
self._read_phy()
self._read_amr()
if kwargs.get('read_hstate', True):
self._read_hstate()
if kwargs.get('read_ine', True):
self._read_ine()
if kwargs.get('read_trm', True):
self._read_trm()
@property
def _amr_filename(self):
return self.hydrad_root / 'Results' / f'profile{self._index:d}.amr'
@property
def _phy_filename(self):
return self.hydrad_root / 'Results' / f'profile{self._index:d}.phy'
@property
def _trm_filename(self):
return self.hydrad_root / 'Results' / f'profile{self._index:d}.trm'
@property
def _ine_filename(self):
return self.hydrad_root / 'Results' / f'profile{self._index:d}.ine'
@property
def _hstate_filename(self):
return self.hydrad_root / 'Results' / f'profile{self._index:d}.Hstate'
@property
def _index(self):
return np.where(self.time == self._master_time)[0][0]
def __repr__(self):
return f"""HYDRAD Timestep Profile
-----------------------
Filename: {self._phy_filename}
Timestep #: {self._index}"""
def _read_phy(self):
"""
Parse the hydrodynamics results file
"""
self._phy_data = np.loadtxt(self._phy_filename)
def _read_amr(self):
"""
Parse the adaptive mesh grid file
"""
# TODO: Read other quantities from .amr file
with self._amr_filename.open() as f:
lines = f.readlines()
self._grid_centers = np.zeros((int(lines[3]),))
self._grid_widths = np.zeros((int(lines[3]),))
for i, l in enumerate(lines[4:]):
tmp = np.array(l.split(), dtype=float)
self._grid_centers[i] = tmp[0]
self._grid_widths[i] = tmp[1]
def _read_trm(self):
"""
Parse the equation terms file
"""
try:
with self._trm_filename.open() as f:
lines = f.readlines()
except FileNotFoundError:
log.debug(f'{self._trm_filename} not found')
return
n_elements = int(len(lines)/5)
self._trm_data = np.zeros([n_elements, 3])
# The files come in sets of 5 rows
# -- Loop coordinate, and at each one:
# -- Terms of mass equation
# -- Terms of momentum equation
# -- Terms of electron energy equation
# -- Terms of hydrogen energy equation
# Right now, we only read 3 values from this:
# the electron heating, hydrogen heating,
# and bolometric radiative losses
for i in range(len(lines)):
j = int(i/5)
line = lines[i].strip().split()
# Electron heating and radiative loss terms from the
# electron energy equation
if i % 5 == 3:
self._trm_data[j, 0] = float(line[5])
self._trm_data[j, 1] = float(line[6])
# Hydrogen heating term from the hydrogen energy
# equation
if i % 5 == 4:
self._trm_data[j, 2] = float(line[5])
properties = [('electron_heating_term', '_trm_data', 0, 'erg cm-3 s-1'),
('hydrogen_heating_term', '_trm_data', 2, 'erg cm-3 s-1'),
('radiative_loss_term', '_trm_data', 1, 'erg cm-3 s-1')]
for p in properties:
add_property(*p)
def _read_ine(self):
"""
Parse non-equilibrium ionization population fraction files
and set attributes for relevant quantities
"""
# TODO: clean this up somehow? I've purposefully included
# a lot of comments because the format of this file makes
# the parsing code quite opaque
try:
with self._ine_filename.open() as f:
lines = f.readlines()
except FileNotFoundError:
log.debug(f'{self._ine_filename} not found')
return
# First parse all of the population fraction arrays
n_s = self.coordinate.shape[0]
# NOTE: Have to calculate the number of elements we have
# computed population fractions for as we do not necessarily
# know this ahead of time
n_e = int(len(lines)/n_s - 1)
# The file is arranged in n_s groups of n_e+1 lines each where the first
# line is the coordinate and the subsequent lines are the population fraction
# for each element, with each column corresponding to an ion of that element
# First, separate by coordinate
pop_lists = [lines[i*(n_e+1)+1:(i+1)*(n_e+1)] for i in range(n_s)]
# Convert each row of each group into a floating point array
pop_lists = [[np.array(l.split(), dtype=float) for l in p] for p in pop_lists]
# NOTE: each row has Z+2 entries as the first entry is the atomic number Z
# Get these from just the first group as the number of elements is the same
# for each
Z = np.array([p[0] for p in pop_lists[0]], dtype=int)
pop_arrays = [np.zeros((n_s, z+1))for z in Z]
for i, p in enumerate(pop_lists):
for j, l in enumerate(p):
pop_arrays[j][i, :] = l[1:] # Skip first entry, it is the atomic number
# Then set attributes for each ion of each element
for z, p in zip(Z, pop_arrays):
name = plasmapy.particles.element_name(z)
attr = f'_population_fraction_{name}'
setattr(self, attr, p)
for p in [(f'{attr[1:]}_{i+1}', attr, i, '') for i in range(z+1)]:
add_property(*p)
def _read_hstate(self):
"""
Parse the hydrogen energy level populations file
"""
try:
self._hstate_data = np.loadtxt(self._hstate_filename)
except OSError:
log.debug(f'{self._hstate_filename} not found')
self._hstate_data = None
@property
@u.quantity_input
def grid_centers(self) -> u.cm:
"""
Spatial location of the grid centers
"""
return u.Quantity(self._grid_centers, 'cm')
@property
@u.quantity_input
def grid_widths(self) -> u.cm:
"""
Spatial width of each grid cell
"""
return u.Quantity(self._grid_widths, 'cm')
@property
@u.quantity_input
def grid_edges(self) -> u.cm:
"""
Spatial location of the edges of each grid cell,
including the rightmost edge
"""
left = self.grid_centers - self.grid_widths/2.
return np.append(left, left[-1:] + self.grid_widths[-1])
@property
@u.quantity_input
def grid_edges_left(self) -> u.cm:
"""
Spatial location of the left edge of each grid cell
"""
return self.grid_edges[:-1]
@property
@u.quantity_input
def grid_edges_right(self) -> u.cm:
"""
Spatial location of the right edge of each grid cell
"""
return self.grid_edges[1:]
[docs]
def spatial_average(self, quantity, bounds=None):
"""
Compute a spatial average of a specific quantity
Parameters
----------
quantity : `str`
Name of the desired quantity to average
bounds : `~astropy.units.Quantity`, optional
Array of length 2 specifying the range over which
to take the spatial average.
"""
if bounds is None:
bounds = self.coordinate[[0, -1]]
i_bounds = np.where(np.logical_and(self.coordinate >= bounds[0],
self.coordinate <= bounds[1]))
quantity_bounds = getattr(self, quantity)[i_bounds]
grid_widths_bounds = self.grid_widths[i_bounds]
return np.average(quantity_bounds, weights=grid_widths_bounds)
[docs]
@u.quantity_input
def column_emission_measure(self, bins: u.K = None, bounds: u.cm = None):
r"""
Computes the column emission measure, where it is assumed that the loop
is confined to a single pixel and oriented along the line of sight.
Parameters
----------
bins : `~astropy.units.Quantity`, optional
Temperature bin edges, including rightmost edge. If None (default),
the bins will be equally-spaced in :math:`\log{T}`, with a left
edge at :math:`\log{T}=3`, a right edge at :math:`\log{T}=8`, and a
bin width of :math:`0.05`.
Returns
-------
em : `~astropy.units.Quantity`
The column emission measure in each bin
bins : `~astropy.units.Quantity`
Temperature bin edges. Note that ``len(bins)=len(em)+1``.
"""
if bins is None:
bins = 10.0**(np.arange(3.0, 8.0, 0.05)) * u.K
if bounds is None:
bounds = self.grid_edges[[0, -1]]
weights = self.electron_density * self.ion_density * self.grid_widths
H, _, _ = np.histogram2d(self.grid_centers, self.electron_temperature,
bins=(bounds, bins), weights=weights)
return H.squeeze(), bins
[docs]
def peek(self, **kwargs):
"""
Quick look at profiles at a given timestep.
Takes the same keyword arguments as `~pydrad.visualize.plot_profile`.
"""
plot_profile(self, **kwargs)
plt.show()
[docs]
def peek_emission_measure(self, **kwargs):
"""
Quick look at the column emission measure.
Takes the same keyword arguments as `column_emission_measure` and
`~pydrad.visualize.plot_histogram`.
"""
bins = kwargs.pop('bins', None)
if 'color' not in kwargs:
kwargs['color'] = 'C0'
em, bins = self.column_emission_measure(bins=bins)
ax = plot_histogram(em.to('cm-5').value, bins.to('K').value, **kwargs)
ax.set_yscale('log')
ax.set_xscale('log')
ax.set_xlim(kwargs.get('xlim'))
ax.set_ylim(kwargs.get('ylim'))
ax.set_xlabel(r'$T$ [K]')
ax.set_ylabel(r'EM [cm$^{-5}$]')
plt.show()
[docs]
class InitialProfile(Profile):
@property
def _amr_filename(self):
return self.hydrad_root / 'Initial_Conditions' / 'profiles' / 'initial.amr'
@property
def _phy_filename(self):
return self.hydrad_root / 'Initial_Conditions' / 'profiles' / 'initial.amr.phy'
def add_property(name, attr, index, unit):
"""
Auto-generate properties for various pieces of data
"""
def property_template(self):
data = getattr(self, attr)
if data is None:
raise ValueError(f'No data available for {name}')
return u.Quantity(data[:, index], unit)
property_template.__doc__ = f'{" ".join(name.split("_")).capitalize()} as a function of :math:`s`'
property_template.__name__ = name
setattr(Profile, property_template.__name__, property(property_template))
properties = [
('coordinate', '_phy_data', 0, 'cm'),
('velocity', '_phy_data', 1, 'cm / s'),
('sound_speed', '_phy_data', 2, 'cm / s'),
('electron_density', '_phy_data', 3, 'cm-3'),
('ion_density', '_phy_data', 4, 'cm-3'),
('electron_pressure', '_phy_data', 5, 'dyne cm-2'),
('ion_pressure', '_phy_data', 6, 'dyne cm-2'),
('electron_temperature', '_phy_data', 7, 'K'),
('ion_temperature', '_phy_data', 8, 'K'),
('electron_conduction', '_phy_data', 9, 'erg s-1 cm-2'),
('ion_conduction', '_phy_data', 10, 'erg s-1 cm-2'),
]
properties += [(f'level_population_hydrogen_{i}', '_hstate_data', i, '') for i in range(1, 7)]
for p in properties:
add_property(*p)