Skip to content

Python API Reference

The public API of amorphouspy is defined by what is importable directly from the top-level package:

from amorphouspy import *

The rest of the package is considered internal implementation and can be subject to breaking changes even in minor releases.


amorphouspy

amorphouspy - workflows for atomistic modeling of oxide glasses.

Functions

analyze_structure(atoms: Atoms) -> StructureData

Perform a comprehensive structural analysis of an atomic configuration.

Parameters:

Name Type Description Default
atoms Atoms

Atomic configuration to analyze.

required

Returns:

Name Type Description
StructureData StructureData

Object containing structured analysis results.

Example

structure_data = analyze_structure(my_atoms) print(structure_data.density)

Source code in amorphouspy/src/amorphouspy/workflows/structural_analysis.py
def analyze_structure(atoms: Atoms) -> StructureData:  # noqa: C901, PLR0912, PLR0915
    """Perform a comprehensive structural analysis of an atomic configuration.

    Args:
        atoms: Atomic configuration to analyze.

    Returns:
        StructureData: Object containing structured analysis results.

    Example:
        >>> structure_data = analyze_structure(my_atoms)
        >>> print(structure_data.density)

    """
    atomic_numbers = atoms.get_atomic_numbers()
    unique_z = np.unique(atomic_numbers)

    # Calculate density
    volume_cm3 = atoms.get_volume() * 1e-24  # Convert ų to cm³
    avogadro_number = 6.022140857e23  # Avogadro's number (mol⁻¹)
    total_mass_g = atoms.get_masses().sum() / avogadro_number  # Convert amu to g
    density = total_mass_g / volume_cm3

    type_map, network_formers, modifiers, _oxygen_present = _classify_elements(unique_z)
    former_types = [z for z, sym in type_map.items() if sym in network_formers]
    modifier_types = [z for z, sym in type_map.items() if sym in modifiers]
    O_type = [z for z, sym in type_map.items() if sym == "O"]

    r, rdfs, cumcn = compute_rdf(atoms)

    def _canonical(a: int, b: int) -> tuple[int, int]:
        return (min(a, b), max(a, b))

    cutoff_map: dict[str, float] = {}
    for z in unique_z:
        symbol = type_map[z]
        if symbol == "O":
            if former_types:
                pair = _canonical(O_type[0], former_types[0])
                if pair in rdfs:
                    r_min = find_rdf_minimum(r, rdfs[pair])
                    cutoff_map[symbol] = r_min if r_min is not None else 2.0
        else:
            pair = _canonical(z, O_type[0]) if O_type else None
            if pair and pair in rdfs:
                r_min = find_rdf_minimum(r, rdfs[pair])
                if r_min is not None:
                    cutoff_map[symbol] = r_min
                else:
                    element_fallbacks = {
                        "Si": 2.0,
                        "B": 1.8,
                        "P": 1.8,
                        "Ge": 2.0,
                        "Al": 2.0,
                        "Ti": 2.0,
                        "Zr": 2.2,
                        "Na": 3.52,
                        "K": 3.8,
                        "Ca": 3.0,
                        "Mg": 2.8,
                        "Li": 2.5,
                        "Ba": 3.2,
                        "Zn": 2.5,
                    }
                    cutoff_map[symbol] = element_fallbacks.get(symbol, 3.0)

    # Coordination calculations
    O_coord = {}
    if O_type:
        O_cutoff = cutoff_map.get("O", 2.0)
        O_coord_raw, _ = compute_coordination(atoms, O_type[0], O_cutoff, former_types + modifier_types)
        # Convert integer keys to strings for HDF5 compatibility
        O_coord = {str(k): v for k, v in O_coord_raw.items()}

    former_coords = {}
    for former in network_formers:
        z = next(k for k, v in type_map.items() if v == former)
        if O_type:
            coord_raw, _ = compute_coordination(atoms, z, cutoff_map[former], O_type)
            # Convert integer keys to strings for HDF5 compatibility
            former_coords[former] = {str(k): v for k, v in coord_raw.items()}

    modifier_coords = {}
    for mod in modifiers:
        z = next(k for k, v in type_map.items() if v == mod)
        if O_type:
            coord_raw, _ = compute_coordination(atoms, z, cutoff_map[mod], O_type)
            # Convert integer keys to strings for HDF5 compatibility
            modifier_coords[mod] = {str(k): v for k, v in coord_raw.items()}

    Qn_dist = {}
    Qn_dist_partial = {}
    network_connectivity = 0.0
    oxygen_class_counts: dict[str, int] = {}
    oxygen_class_ids: dict[str, list[int]] = {}
    if network_formers and O_type:
        Qn_dist_raw, Qn_dist_partial_raw, o_classes = compute_qn_and_classify(
            atoms, cutoff_map["O"], former_types, O_type[0]
        )

        # Aggregate oxygen classification
        for aid, cls in o_classes.items():
            oxygen_class_counts[cls] = oxygen_class_counts.get(cls, 0) + 1
            oxygen_class_ids.setdefault(cls, []).append(aid)

        # Convert integer keys to strings for Pydantic compatibility
        Qn_dist = {str(k): v for k, v in Qn_dist_raw.items()} if Qn_dist_raw else {}
        Qn_dist_partial = {}
        if Qn_dist_partial_raw:
            for element_key, qn_dict in Qn_dist_partial_raw.items():
                # Convert both outer and inner keys to strings
                Qn_dist_partial[str(element_key)] = {str(k): v for k, v in qn_dict.items()}

        if Qn_dist_raw and sum(Qn_dist_raw.values()) > 0:
            network_connectivity = compute_network_connectivity(Qn_dist_raw)

    # Bond angles and rings
    bond_angle_distributions = {}
    for former in network_formers:
        z = next(k for k, v in type_map.items() if v == former)
        if O_type:
            bond_angle_distributions[former] = compute_angles(
                atoms, center_type=z, neighbor_type=O_type[0], cutoff=cutoff_map[former], bins=180
            )

    ring_statistics_data = {}
    if network_formers and O_type:
        specific_cutoffs = {(former, "O"): cutoff_map[former] for former in network_formers}
        bond_lengths = generate_bond_length_dict(atoms, specific_cutoffs=specific_cutoffs)
        rings_dist, mean_ring_size = compute_guttmann_rings(atoms, bond_lengths=bond_lengths, max_size=40, n_cpus=1)
        # Convert integer keys to strings for HDF5 compatibility
        rings_dist_str = {str(k): v for k, v in rings_dist.items()}
        ring_statistics_data = {"distribution": rings_dist_str, "mean_size": mean_ring_size}

    # Convert data for serialization
    def to_list(data: np.ndarray | list[float]) -> list[float]:
        if isinstance(data, np.ndarray):
            return data.tolist()
        return list(data)

    bond_angle_distributions_serializable = {
        former: (to_list(angles), to_list(counts)) for former, (angles, counts) in bond_angle_distributions.items()
    }

    rdfs_serializable = {}
    cumcn_serializable = {}
    for pair, rdf_data in rdfs.items():
        elem1, elem2 = type_map[pair[0]], type_map[pair[1]]
        key = f"{elem1}-{elem2}"
        rdfs_serializable[key] = to_list(rdf_data)
        if pair in cumcn:
            cumcn_serializable[key] = to_list(cumcn[pair])

    # Structure factor S(q) — neutron and X-ray
    q_sf, sq_neutron, sq_partials_neutron = compute_structure_factor(atoms, radiation="neutron")
    _, sq_xray, _ = compute_structure_factor(atoms, radiation="xray")
    sq_partials_serializable = {}
    for pair, sq_data in sq_partials_neutron.items():
        elem1, elem2 = type_map[pair[0]], type_map[pair[1]]
        sq_partials_serializable[f"{elem1}-{elem2}"] = to_list(sq_data)
    structure_factor_data = StructureFactorData(
        q=to_list(q_sf),
        sq_neutron=to_list(sq_neutron),
        sq_xray=to_list(sq_xray),
        sq_partials=sq_partials_serializable,
    )

    return StructureData(
        density=density,
        coordination=CoordinationData(oxygen=O_coord, formers=former_coords, modifiers=modifier_coords),
        network=NetworkData(
            Qn_distribution={k: float(v) for k, v in Qn_dist.items()},
            Qn_distribution_partial={
                outer: {inner: float(v) for inner, v in inner_d.items()} for outer, inner_d in Qn_dist_partial.items()
            },
            connectivity=network_connectivity,
        ),
        distributions=StructuralDistributions(
            bond_angles=bond_angle_distributions_serializable, rings=ring_statistics_data
        ),
        rdfs=RadialDistributionData(r=to_list(r), rdfs=rdfs_serializable, cumulative_coordination=cumcn_serializable),
        structure_factor=structure_factor_data,
        elements=ElementInfo(
            formers=list(network_formers),
            modifiers=list(modifiers),
            cutoffs=cutoff_map,
            oxygen_classes=oxygen_class_counts,
            oxygen_ids=oxygen_class_ids,
        ),
    )

check_neutral_oxide(oxide: str) -> None

Check if an oxide formula is charge neutral based on standard oxidation states.

Parameters:

Name Type Description Default
oxide str

The chemical formula of the oxide (e.g., "Al2O3").

required

Raises:

Type Description
ValueError

If the oxide is invalid, oxidation states cannot be determined, or the net charge is not zero.

Source code in amorphouspy/src/amorphouspy/structure/composition.py
def check_neutral_oxide(oxide: str) -> None:
    """Check if an oxide formula is charge neutral based on standard oxidation states.

    Args:
        oxide: The chemical formula of the oxide (e.g., "Al2O3").

    Raises:
        ValueError: If the oxide is invalid, oxidation states cannot be determined,
            or the net charge is not zero.

    """
    try:
        comp = Composition(oxide)
    except Exception as e:
        error_msg = f"Invalid oxide formula: '{oxide}'"
        raise ValueError(error_msg) from e

    oxi_guesses = comp.oxi_state_guesses()
    if not oxi_guesses:
        error_msg = f"Cannot determine oxidation states for '{oxide}'"
        raise ValueError(error_msg)

    total_charge = sum(oxi * comp[el] for el, oxi in oxi_guesses[0].items())

    if total_charge != 0:
        error_msg = f"Oxide '{oxide}' is not charge neutral (net charge {total_charge})"
        raise ValueError(error_msg)

classify_oxygens(structure: Atoms, cutoff: float, former_types: list[int], o_type: int) -> dict[int, str]

Classify each oxygen atom as bridging (BO), non-bridging (NBO), free, or triclustered.

Convenience wrapper around :func:compute_qn_and_classify that returns only the oxygen classification.

Parameters:

Name Type Description Default
structure Atoms

The atomic structure as ASE object.

required
cutoff float

Cutoff radius for former-O neighbor search (Å).

required
former_types list[int]

Atom types (atomic numbers) considered as formers.

required
o_type int

Atom type (atomic number) considered as oxygen.

required

Returns:

Type Description
dict[int, str]

A mapping from real atom ID to oxygen class string:

dict[int, str]

"BO" (bridging, 2 former neighbours), "NBO" (non-bridging, 1),

dict[int, str]

"free" (0), or "tri" (≥ 3 triclustered).

Example

o_classes = classify_oxygens(atoms, cutoff=2.0, former_types=[14], o_type=8)

Source code in amorphouspy/src/amorphouspy/analysis/qn_network_connectivity.py
def classify_oxygens(
    structure: Atoms,
    cutoff: float,
    former_types: list[int],
    o_type: int,
) -> dict[int, str]:
    """Classify each oxygen atom as bridging (BO), non-bridging (NBO), free, or triclustered.

    Convenience wrapper around :func:`compute_qn_and_classify` that returns
    only the oxygen classification.

    Args:
        structure: The atomic structure as ASE object.
        cutoff: Cutoff radius for former-O neighbor search (Å).
        former_types: Atom types (atomic numbers) considered as formers.
        o_type: Atom type (atomic number) considered as oxygen.

    Returns:
        A mapping from real atom ID to oxygen class string:
        ``"BO"`` (bridging, 2 former neighbours), ``"NBO"`` (non-bridging, 1),
        ``"free"`` (0), or ``"tri"`` (≥ 3 triclustered).

    Example:
        >>> o_classes = classify_oxygens(atoms, cutoff=2.0, former_types=[14], o_type=8)

    """
    _total_qn, _partial_qn, o_classes = compute_qn_and_classify(structure, cutoff, former_types, o_type)
    return o_classes

compute_angles(structure: Atoms, center_type: int, neighbor_type: int, cutoff: float, bins: int = 180) -> tuple[np.ndarray, np.ndarray]

Compute bond angle distribution between triplets of neighbor_type-center-neighbor_type.

Parameters:

Name Type Description Default
structure Atoms

Atomic structure.

required
center_type int

Atom type at the angle center.

required
neighbor_type int

Atom type forming the angle with center.

required
cutoff float

Neighbor search cutoff.

required
bins int

Number of histogram bins. Defaults to 180.

180

Returns:

Type Description
tuple[ndarray, ndarray]

A tuple containing: bin_centers: Bin centers (degrees). angle_hist: Normalized angle histogram.

Example

bins, hist = compute_angles(structure, center_type=1, neighbor_type=2, cutoff=3.0)

Source code in amorphouspy/src/amorphouspy/analysis/bond_angle_distribution.py
def compute_angles(
    structure: Atoms,
    center_type: int,
    neighbor_type: int,
    cutoff: float,
    bins: int = 180,
) -> tuple[np.ndarray, np.ndarray]:
    """Compute bond angle distribution between triplets of neighbor_type-center-neighbor_type.

    Args:
        structure: Atomic structure.
        center_type: Atom type at the angle center.
        neighbor_type: Atom type forming the angle with center.
        cutoff: Neighbor search cutoff.
        bins: Number of histogram bins. Defaults to 180.

    Returns:
        A tuple containing:
            bin_centers: Bin centers (degrees).
            angle_hist: Normalized angle histogram.

    Example:
        >>> bins, hist = compute_angles(structure, center_type=1, neighbor_type=2, cutoff=3.0)

    """
    # Wrap and extract positions/cell once — needed for minimum-image vectors
    structure_wrapped = structure.copy()
    structure_wrapped.wrap()
    coords = structure_wrapped.get_positions()
    cell = structure_wrapped.get_cell().array
    is_orthogonal = np.allclose(cell - np.diag(np.diag(cell)), 0.0, atol=1e-10)

    # Build ID → array index map to look up coordinates by real atom ID
    if "id" in structure_wrapped.arrays:
        raw_ids = structure_wrapped.arrays["id"]
    else:
        raw_ids = np.arange(1, len(structure_wrapped) + 1)
    id_to_idx = {int(aid): i for i, aid in enumerate(raw_ids)}

    # get_neighbors returns List[Tuple[central_id, List[neighbor_ids]]]
    neighbors = get_neighbors(
        structure,
        cutoff=cutoff,
        target_types=[center_type],
        neighbor_types=[neighbor_type],
    )

    angles = []

    for central_id, nn_ids in neighbors:
        if len(nn_ids) < MIN_NEIGHBORS_FOR_ANGLE:
            continue

        ci = id_to_idx[central_id]
        center_coord = coords[ci]

        # Resolve neighbor coordinates by real ID — shape (k, 3)
        nn_indices = np.array([id_to_idx[nid] for nid in nn_ids], dtype=np.int32)
        vecs = coords[nn_indices] - center_coord  # (k, 3)

        # Minimum-image correction — vectorized for all neighbor vectors at once
        if is_orthogonal:
            box = np.diag(cell)
            vecs -= box * np.round(vecs / box)
        else:
            inv_cell = np.linalg.inv(cell)
            delta_frac = (inv_cell @ vecs.T).T
            delta_frac -= np.round(delta_frac)
            vecs = (cell.T @ delta_frac.T).T

        # Normalise all vectors at once
        norms = np.linalg.norm(vecs, axis=1)  # (k,)
        valid = norms > 0
        if valid.sum() < MIN_NEIGHBORS_FOR_ANGLE:
            continue
        vecs = vecs[valid]
        norms = norms[valid]
        unit_vecs = vecs / norms[:, np.newaxis]  # (k, 3)

        # Compute all unique pair cosines via matrix multiply: (k, k)
        # Upper triangle gives each unique i<j pair
        cos_mat = np.clip(unit_vecs @ unit_vecs.T, -1.0, 1.0)
        i_idx, j_idx = np.triu_indices(len(unit_vecs), k=1)
        cos_angles = cos_mat[i_idx, j_idx]
        angles.extend(np.degrees(np.arccos(cos_angles)).tolist())

    angle_hist, bin_edges = np.histogram(
        angles,
        bins=bins,
        range=(0, 180),
        density=True,
    )
    bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2
    return bin_centers, angle_hist

compute_cavities(structure: Atoms, resolution: int = 64, cutoff_radii: dict[str, float] | float | None = None) -> dict[str, np.ndarray]

Compute cavity distribution and shape properties for a glass structure.

Replicates sovapy's domain-based cavity detection algorithm. The simulation box is discretised into a 3D voxel grid; voxels within the discrete (integer-rounded) atomic radius of any atom are marked as occupied. Connected regions of empty voxels -- detected with full 26-connectivity and periodic-boundary-condition awareness -- constitute cavities (domains).

Atomic radii are rounded to the nearest integer number of voxels before marking, matching sovapy's discrete_radius = round(radius / s_step) scheme. This produces higher occupancy than using raw float radii and is the key factor that reproduces sovapy's cavity count and volumes.

Periodic boundary conditions are handled via a union-find merge of cavity labels that connect across opposite box faces.

Parameters:

Name Type Description Default
structure Atoms

ASE Atoms object with atomic coordinates and cell.

required
resolution int

Number of voxels along the longest box dimension before padding (default 64). Higher values give more accurate surface areas and shapes at the cost of memory and runtime.

64
cutoff_radii dict[str, float] | float | None

Atomic radius specification. Three forms are accepted:

  • None (default): element-specific ASE Van der Waals radii, with ASE covalent radii as fallback.
  • float: uniform radius in A applied to every atom (e.g. cutoff_radii=2.8 reproduces sovapy's default).
  • dict[str, float]: per-element overrides in A (e.g. {'Si': 2.1, 'O': 1.52}); elements not in the dict fall back to ASE covalent radii.
None

Returns:

Type Description
dict[str, ndarray]

Dictionary with the following keys, each mapping to a 1D float64

dict[str, ndarray]

array with one entry per detected cavity:

dict[str, ndarray]
  • 'volumes': cavity volumes in A^3.
dict[str, ndarray]
  • 'surface_areas': cavity surface areas in A^2.
dict[str, ndarray]
  • 'asphericities': asphericity eta in [0, 1].
dict[str, ndarray]
  • 'acylindricities': acylindricity c in [0, 1].
dict[str, ndarray]
  • 'anisotropies': relative shape anisotropy kappa in [0, 1].

Examples:

>>> from ase.io import read
>>> structure = read('glass.xyz')
>>> cavities = compute_cavities(structure, resolution=64)
>>> print(cavities['volumes'])
Source code in amorphouspy/src/amorphouspy/analysis/cavities.py
def compute_cavities(
    structure: Atoms,
    resolution: int = 64,
    cutoff_radii: dict[str, float] | float | None = None,
) -> dict[str, np.ndarray]:
    """Compute cavity distribution and shape properties for a glass structure.

    Replicates sovapy's domain-based cavity detection algorithm. The simulation
    box is discretised into a 3D voxel grid; voxels within the discrete
    (integer-rounded) atomic radius of any atom are marked as occupied.
    Connected regions of empty voxels -- detected with full 26-connectivity
    and periodic-boundary-condition awareness -- constitute cavities (domains).

    Atomic radii are rounded to the nearest integer number of voxels before
    marking, matching sovapy's ``discrete_radius = round(radius / s_step)``
    scheme. This produces higher occupancy than using raw float radii and
    is the key factor that reproduces sovapy's cavity count and volumes.

    Periodic boundary conditions are handled via a union-find merge of cavity
    labels that connect across opposite box faces.

    Args:
        structure: ASE Atoms object with atomic coordinates and cell.
        resolution: Number of voxels along the longest box dimension before
            padding (default 64). Higher values give more accurate surface
            areas and shapes at the cost of memory and runtime.
        cutoff_radii: Atomic radius specification. Three forms are accepted:

            - ``None`` (default): element-specific ASE Van der Waals radii,
              with ASE covalent radii as fallback.
            - ``float``: uniform radius in A applied to every atom
              (e.g. ``cutoff_radii=2.8`` reproduces sovapy's default).
            - ``dict[str, float]``: per-element overrides in A
              (e.g. ``{'Si': 2.1, 'O': 1.52}``); elements not in the dict
              fall back to ASE covalent radii.

    Returns:
        Dictionary with the following keys, each mapping to a 1D float64
        array with one entry per detected cavity:

        - ``'volumes'``: cavity volumes in A^3.
        - ``'surface_areas'``: cavity surface areas in A^2.
        - ``'asphericities'``: asphericity eta in [0, 1].
        - ``'acylindricities'``: acylindricity c in [0, 1].
        - ``'anisotropies'``: relative shape anisotropy kappa in [0, 1].

    Examples:
        >>> from ase.io import read
        >>> structure = read('glass.xyz')
        >>> cavities = compute_cavities(structure, resolution=64)
        >>> print(cavities['volumes'])
    """
    void_labels, _occupied_grid, _voxel_step, cell_array = _build_cavity_grid(structure, resolution, cutoff_radii)
    number_of_voids = int(void_labels.max())
    grid_shape = np.array(void_labels.shape, dtype=float)

    if number_of_voids == 0:
        empty: np.ndarray = np.array([], dtype=np.float64)
        return {
            "volumes": empty,
            "surface_areas": empty,
            "asphericities": empty,
            "acylindricities": empty,
            "anisotropies": empty,
        }

    # Voxel volume: exact cell volume divided by total voxel count.
    # For orthogonal cells this equals voxel_step^3 up to padding rounding.
    voxel_volume = abs(float(np.linalg.det(cell_array))) / float(np.prod(grid_shape))

    # Transformation from voxel-index space to Cartesian:
    # voxel index i along axis a → fractional i/na → Cartesian (i/na)*cell[0]
    # matrix form: v_cart = v_voxel @ mc_to_cart  where mc_to_cart = diag(1/grid_shape) @ cell
    mc_to_cart = np.diag(1.0 / grid_shape) @ cell_array

    percolating_labels = _find_percolating_labels(void_labels)
    if percolating_labels:
        import warnings  # noqa: PLC0415

        warnings.warn(
            f"{len(percolating_labels)} percolating cavity/cavities span the simulation cell "
            "and will be excluded from the output. Consider using larger cutoff radii.",
            UserWarning,
            stacklevel=2,
        )

    volumes = np.zeros(number_of_voids, dtype=np.float64)
    surface_areas = np.zeros(number_of_voids, dtype=np.float64)
    asphericities = np.zeros(number_of_voids, dtype=np.float64)
    acylindricities = np.zeros(number_of_voids, dtype=np.float64)
    anisotropies = np.zeros(number_of_voids, dtype=np.float64)

    for void_index in range(1, number_of_voids + 1):
        if void_index in percolating_labels:
            continue

        void_mask = void_labels == void_index
        void_voxel_count = int(np.sum(void_mask))

        if void_voxel_count < 2:  # noqa: PLR2004
            continue

        volumes[void_index - 1] = void_voxel_count * voxel_volume

        void_float_mask = void_mask.astype(np.float32)
        try:
            vertices, faces = mcubes.marching_cubes(void_float_mask, 0.5)
            vertices_cart = vertices @ mc_to_cart
            surface_areas[void_index - 1] = _compute_triangle_mesh_area(vertices_cart, faces)
        except (ValueError, RuntimeError):
            surface_areas[void_index - 1] = 0.0

        cavity_voxel_indices = np.argwhere(void_mask)

        if len(cavity_voxel_indices) >= 3:  # noqa: PLR2004
            cavity_voxel_coordinates = (cavity_voxel_indices / grid_shape) @ cell_array
            (
                asphericities[void_index - 1],
                acylindricities[void_index - 1],
                anisotropies[void_index - 1],
            ) = _compute_gyration_tensor_descriptors(cavity_voxel_coordinates)

    non_empty = volumes > 0
    return {
        "volumes": volumes[non_empty],
        "surface_areas": surface_areas[non_empty],
        "asphericities": asphericities[non_empty],
        "acylindricities": acylindricities[non_empty],
        "anisotropies": anisotropies[non_empty],
    }

compute_coordination(structure: Atoms, target_type: int, cutoff: float, neighbor_types: list[int] | None = None) -> tuple[dict[int, int], dict[int, int]]

Compute coordination number for atoms of a target type.

Parameters:

Name Type Description Default
structure Atoms

The atomic structure as ASE object.

required
target_type int

Atom type (atomic number) for which to compute coordination.

required
cutoff float

Cutoff radius in Å.

required
neighbor_types list[int] | None

Valid neighbor atomic numbers. None means all types.

None

Returns:

Type Description
tuple[dict[int, int], dict[int, int]]

A tuple containing: coordination_distribution: Mapping from coordination number to count. per_atom_coordination: Mapping from atom ID to coordination number.

Example

structure = read('glass.xyz') dist, cn = compute_coordination(structure, target_type=14, cutoff=2.0)

Source code in amorphouspy/src/amorphouspy/analysis/radial_distribution_functions.py
def compute_coordination(
    structure: Atoms,
    target_type: int,
    cutoff: float,
    neighbor_types: list[int] | None = None,
) -> tuple[dict[int, int], dict[int, int]]:
    """Compute coordination number for atoms of a target type.

    Args:
        structure: The atomic structure as ASE object.
        target_type: Atom type (atomic number) for which to compute coordination.
        cutoff: Cutoff radius in Å.
        neighbor_types: Valid neighbor atomic numbers. None means all types.

    Returns:
        A tuple containing:
            coordination_distribution: Mapping from coordination number to count.
            per_atom_coordination: Mapping from atom ID to coordination number.

    Example:
        >>> structure = read('glass.xyz')
        >>> dist, cn = compute_coordination(structure, target_type=14, cutoff=2.0)

    """
    neighbors = get_neighbors(
        structure,
        cutoff=cutoff,
        target_types=[target_type],
        neighbor_types=neighbor_types,
    )

    # Precompute the set of real IDs that belong to target_type once,
    # avoiding a per-atom np.where search inside the comprehension.
    types = structure.get_atomic_numbers()
    raw_ids = structure.arrays["id"] if "id" in structure.arrays else np.arange(1, len(structure) + 1)
    target_id_set: set[int] = {int(aid) for aid, t in zip(raw_ids, types, strict=False) if int(t) == target_type}

    # coord_numbers is keyed by real atom ID (matches OVITO IDs)
    coord_numbers = {
        central_id: len(nn_ids) for central_id, nn_ids in neighbors if len(nn_ids) > 0 or central_id in target_id_set
    }

    coord_numbers_distribution = count_distribution(coord_numbers)
    return dict(sorted(coord_numbers_distribution.items())), coord_numbers

compute_guttmann_rings(structure: Atoms, bond_lengths: dict[tuple[str, str], float], max_size: int = 24, n_cpus: int = 1) -> tuple[dict[int, int], float]

Compute the Guttman ring size distribution and mean ring size.

Rings are detected using a native networkx-based BFS implementation of Guttman's shortest-path (primitive) ring criterion. Only closed primitive rings up to max_size former atoms are counted.

Ring size is defined as the number of network-former atoms (T atoms) in the ring, following Guttman's original convention.

Parameters:

Name Type Description Default
structure Atoms

ASE Atoms object containing atomic coordinates and types.

required
bond_lengths dict[tuple[str, str], float]

Maximum bond lengths for each element pair, e.g. {('Si', 'O'): 1.8, ('Al', 'O'): 1.95}. All T-O pairs must be specified; T-T and O-O pairs are ignored.

required
max_size int

Maximum ring size (number of T atoms) to search for.

24
n_cpus int

Number of worker processes for parallel ring search. 1 — sequential execution (default). N — distribute edge loop across N worker processes. -1 — use all logical CPUs (os.cpu_count()).

1

Returns:

Name Type Description
histogram dict[int, int]

Mapping from ring size to ring count.

mean_ring_size float

Mean ring size weighted by count.

Raises:

Type Description
ValueError

If bond_lengths contains no T-O pairs (i.e. all pairs involve only oxygen or only formers).

Examples:

>>> from ase.io import read
>>> structure = read('glass.xyz')
>>> histogram, mean_size = compute_guttmann_rings(
...     structure,
...     bond_lengths={('Si', 'O'): 1.8},
...     max_size=12,
... )
>>> # Parallel on 4 cores
>>> histogram, mean_size = compute_guttmann_rings(
...     structure,
...     bond_lengths={('Si', 'O'): 1.8},
...     n_cpus=4,
... )
>>> # Parallel using all available cores
>>> histogram, mean_size = compute_guttmann_rings(
...     structure,
...     bond_lengths={('Si', 'O'): 1.8},
...     n_cpus=-1,
... )
Source code in amorphouspy/src/amorphouspy/analysis/rings.py
def compute_guttmann_rings(
    structure: Atoms,
    bond_lengths: dict[tuple[str, str], float],
    max_size: int = 24,
    n_cpus: int = 1,
) -> tuple[dict[int, int], float]:
    """Compute the Guttman ring size distribution and mean ring size.

    Rings are detected using a native networkx-based BFS implementation of
    Guttman's shortest-path (primitive) ring criterion. Only closed primitive
    rings up to ``max_size`` former atoms are counted.

    Ring size is defined as the number of network-former atoms (T atoms) in
    the ring, following Guttman's original convention.

    Args:
        structure: ASE Atoms object containing atomic coordinates and types.
        bond_lengths: Maximum bond lengths for each element pair, e.g.
            ``{('Si', 'O'): 1.8, ('Al', 'O'): 1.95}``. All T-O pairs must
            be specified; T-T and O-O pairs are ignored.
        max_size: Maximum ring size (number of T atoms) to search for.
        n_cpus: Number of worker processes for parallel ring search.
            ``1``  — sequential execution (default).
            ``N``  — distribute edge loop across N worker processes.
            ``-1`` — use all logical CPUs (``os.cpu_count()``).

    Returns:
        histogram: Mapping from ring size to ring count.
        mean_ring_size: Mean ring size weighted by count.

    Raises:
        ValueError: If ``bond_lengths`` contains no T-O pairs (i.e. all
            pairs involve only oxygen or only formers).

    Examples:
        >>> from ase.io import read
        >>> structure = read('glass.xyz')
        >>> histogram, mean_size = compute_guttmann_rings(
        ...     structure,
        ...     bond_lengths={('Si', 'O'): 1.8},
        ...     max_size=12,
        ... )
        >>> # Parallel on 4 cores
        >>> histogram, mean_size = compute_guttmann_rings(
        ...     structure,
        ...     bond_lengths={('Si', 'O'): 1.8},
        ...     n_cpus=4,
        ... )
        >>> # Parallel using all available cores
        >>> histogram, mean_size = compute_guttmann_rings(
        ...     structure,
        ...     bond_lengths={('Si', 'O'): 1.8},
        ...     n_cpus=-1,
        ... )
    """
    z_cutoffs, former_atomic_numbers = _symbols_to_z_cutoffs(bond_lengths)

    if not former_atomic_numbers:
        error_message = (
            "bond_lengths contains no network-former species. Provide at least one T-O pair such as ('Si', 'O')."
        )
        raise ValueError(error_message)

    wrapped_structure = structure.copy()
    wrapped_structure.wrap()

    former_graph = _build_former_graph(
        wrapped_structure,
        z_cutoffs,
        former_atomic_numbers,
        _OXYGEN_ATOMIC_NUMBER,
    )

    ring_counts = _find_guttman_rings(former_graph, max_size, n_cpus=n_cpus)

    if not ring_counts:
        return {}, 0.0

    total_rings = sum(ring_counts.values())
    mean_ring_size = sum(size * count for size, count in ring_counts.items()) / total_rings

    return ring_counts, float(mean_ring_size)

compute_network_connectivity(qn_dist: dict[int, int]) -> float

Compute average network connectivity based on Qn distribution.

Parameters:

Name Type Description Default
qn_dist dict[int, int]

Qn distribution histogram.

required

Returns:

Type Description
float

Average network connectivity.

Raises:

Type Description
ValueError

If qn_dist is empty or sum of counts is zero.

Example

qn_dist = {0: 10, 1: 50, 2: 100, 3: 200, 4: 50} connectivity = compute_network_connectivity(qn_dist)

Source code in amorphouspy/src/amorphouspy/analysis/qn_network_connectivity.py
def compute_network_connectivity(qn_dist: dict[int, int]) -> float:
    """Compute average network connectivity based on Qn distribution.

    Args:
        qn_dist: Qn distribution histogram.

    Returns:
        Average network connectivity.

    Raises:
        ValueError: If qn_dist is empty or sum of counts is zero.

    Example:
        >>> qn_dist = {0: 10, 1: 50, 2: 100, 3: 200, 4: 50}
        >>> connectivity = compute_network_connectivity(qn_dist)

    """
    total_formers = sum(qn_dist.values())
    if total_formers == 0:
        msg = "total_formers is zero, cannot compute network connectivity."
        raise ValueError(msg)

    return sum(n * (count / total_formers) for n, count in qn_dist.items())

compute_qn(structure: Atoms, cutoff: float, former_types: list[int], o_type: int) -> tuple[dict[int, int], dict[int, dict[int, int]]]

Calculate Qn distribution.

The Q^n distribution characterizes connectivity of tetrahedral network-forming units based on bridging oxygens (BOs) count.

Parameters:

Name Type Description Default
structure Atoms

The atomic structure as ASE object.

required
cutoff float

Cutoff radius for former-O neighbor search.

required
former_types list[int]

Atom types considered as formers.

required
o_type int

Atom type considered as oxygen.

required

Returns:

Type Description
tuple[dict[int, int], dict[int, dict[int, int]]]

A tuple containing: total_qn: Total Qn distribution (mapping from n to count). partial_qn: Partial Qn (mapping from former type to Qn distribution).

Example

structure = read('glass.xyz') total_qn, partial_qn = compute_qn( ... structure, cutoff=2.0, former_types=[14], o_type=8 ... )

Source code in amorphouspy/src/amorphouspy/analysis/qn_network_connectivity.py
def compute_qn(
    structure: Atoms,
    cutoff: float,
    former_types: list[int],
    o_type: int,
) -> tuple[dict[int, int], dict[int, dict[int, int]]]:
    """Calculate Qn distribution.

    The Q^n distribution characterizes connectivity of tetrahedral
    network-forming units based on bridging oxygens (BOs) count.

    Args:
        structure: The atomic structure as ASE object.
        cutoff: Cutoff radius for former-O neighbor search.
        former_types: Atom types considered as formers.
        o_type: Atom type considered as oxygen.

    Returns:
        A tuple containing:
            total_qn: Total Qn distribution (mapping from n to count).
            partial_qn: Partial Qn (mapping from former type to Qn distribution).

    Example:
        >>> structure = read('glass.xyz')
        >>> total_qn, partial_qn = compute_qn(
        ...     structure, cutoff=2.0, former_types=[14], o_type=8
        ... )

    """
    total_qn, partial_qn, _o_classes = compute_qn_and_classify(structure, cutoff, former_types, o_type)
    return total_qn, partial_qn

compute_qn_and_classify(structure: Atoms, cutoff: float, former_types: list[int], o_type: int) -> tuple[dict[int, int], dict[int, dict[int, int]], dict[int, str]]

Calculate Qn distribution and classify each oxygen as BO/NBO/free/tri.

Performs a single neighbour search pass to compute both the Q^n distribution and the per-oxygen classification.

Parameters:

Name Type Description Default
structure Atoms

The atomic structure as ASE object.

required
cutoff float

Cutoff radius for former-O neighbor search (Å).

required
former_types list[int]

Atom types (atomic numbers) considered as formers.

required
o_type int

Atom type (atomic number) considered as oxygen.

required

Returns:

Type Description
tuple[dict[int, int], dict[int, dict[int, int]], dict[int, str]]

A tuple containing: total_qn: Total Qn distribution (mapping from n to count). partial_qn: Partial Qn (mapping from former type to Qn distribution). oxygen_classes: Mapping from real atom ID to oxygen class string ("BO", "NBO", "free", or "tri").

Example

total_qn, partial_qn, o_classes = compute_qn_and_classify( ... structure, cutoff=2.0, former_types=[14], o_type=8 ... )

Source code in amorphouspy/src/amorphouspy/analysis/qn_network_connectivity.py
def compute_qn_and_classify(
    structure: Atoms,
    cutoff: float,
    former_types: list[int],
    o_type: int,
) -> tuple[dict[int, int], dict[int, dict[int, int]], dict[int, str]]:
    """Calculate Qn distribution and classify each oxygen as BO/NBO/free/tri.

    Performs a single neighbour search pass to compute both the Q^n
    distribution *and* the per-oxygen classification.

    Args:
        structure: The atomic structure as ASE object.
        cutoff: Cutoff radius for former-O neighbor search (Å).
        former_types: Atom types (atomic numbers) considered as formers.
        o_type: Atom type (atomic number) considered as oxygen.

    Returns:
        A tuple containing:
            total_qn: Total Qn distribution (mapping from n to count).
            partial_qn: Partial Qn (mapping from former type to Qn distribution).
            oxygen_classes: Mapping from real atom ID to oxygen class string
                (``"BO"``, ``"NBO"``, ``"free"``, or ``"tri"``).

    Example:
        >>> total_qn, partial_qn, o_classes = compute_qn_and_classify(
        ...     structure, cutoff=2.0, former_types=[14], o_type=8
        ... )

    """
    # Build real-ID -> atom type map
    types = structure.get_atomic_numbers()
    if "id" in structure.arrays:
        raw_ids = structure.arrays["id"].astype(np.int64)
    else:
        raw_ids = np.arange(1, len(structure) + 1, dtype=np.int64)
    id_to_type = {int(aid): int(t) for aid, t in zip(raw_ids, types, strict=False)}

    # --- Step 1: classify oxygens and identify bridging set -----------------
    oxygen_classes: dict[int, str] = {}
    bridging_o_ids: set[int] = set()

    for cid, nns in get_neighbors(
        structure,
        cutoff=cutoff,
        target_types=[o_type],
        neighbor_types=former_types,
    ):
        if id_to_type.get(cid) != o_type:
            continue
        n_formers = len(nns)
        if n_formers == 0:
            oxygen_classes[cid] = "free"
        elif n_formers == 1:
            oxygen_classes[cid] = "NBO"
        elif n_formers == MIN_COORDINATION_FOR_BRIDGING:
            oxygen_classes[cid] = "BO"
            bridging_o_ids.add(cid)
        else:
            oxygen_classes[cid] = "tri"
            bridging_o_ids.add(cid)

    # --- Step 2: count bridging O per former --------------------------------
    total_qn_counts: dict[int, int] = defaultdict(int)
    partial_qn_counts = {f_type: defaultdict(int) for f_type in former_types}

    for central_id, nn_ids in get_neighbors(
        structure,
        cutoff=cutoff,
        target_types=former_types,
        neighbor_types=[o_type],
    ):
        atom_type = id_to_type.get(central_id)
        if atom_type not in former_types:
            continue

        bridging_count = sum(1 for nid in nn_ids if nid in bridging_o_ids)
        total_qn_counts[bridging_count] += 1
        partial_qn_counts[atom_type][bridging_count] += 1

    # Normalise to Q0-Q6 keys
    total_qn_norm = {n: total_qn_counts.get(n, 0) for n in range(7)}
    partial_plain = {f_type: {n: partial_qn_counts[f_type].get(n, 0) for n in range(7)} for f_type in former_types}
    return total_qn_norm, partial_plain, oxygen_classes

compute_rdf(structure: Atoms, r_max: float = 10.0, n_bins: int = 500, type_pairs: list[tuple[int, int]] | None = None) -> tuple[np.ndarray, dict, dict]

Compute radial distribution functions (RDFs) and cumulative coordination numbers.

Calculates the pair-wise radial distribution function g(r) for specified atom-type pairs under periodic boundary conditions (orthogonal and triclinic), along with the cumulative coordination number n(r).

Parameters:

Name Type Description Default
structure Atoms

ASE Atoms object.

required
r_max float

Maximum distance in Å (default 10.0).

10.0
n_bins int

Number of radial bins (default 500).

500
type_pairs list[tuple[int, int]] | None

List of (atomic_number_1, atomic_number_2) pairs. None -> all unique unordered combinations of present types plus all same-type pairs.

None

Returns:

Name Type Description
r ndarray

Radial bin centres in Å, shape (n_bins,).

rdfs dict

Normalised g(r) for each type pair, shape (n_bins,).

cn_cumulative dict

Mean number of neighbours of the second type within radius r around an atom of the first type, shape (n_bins,).

Notes
  • If r_max exceeds half the smallest perpendicular cell height, it is automatically reduced to the largest integer value that does not exceed that limit, and a warning message is printed.
  • If the adjusted r_max would be zero or negative (cell too small), a ValueError is raised instead.
  • This function operates on array indices internally (not atom IDs) because it only needs type information and distances, not ID lookup.
  • Periodic boundaries are handled via the minimum-image convention in fractional space, so both orthogonal and triclinic cells are correct.
Example

structure = read('glass.xyz') r, rdfs, cn = compute_rdf(structure, r_max=10.0, n_bins=500) g_SiO = rdfs[(8, 14)] cn_SiO = cn[(8, 14)] # O around Si cn_OSi = cn[(14, 8)] # Si around O

Source code in amorphouspy/src/amorphouspy/analysis/radial_distribution_functions.py
def compute_rdf(
    structure: Atoms,
    r_max: float = 10.0,
    n_bins: int = 500,
    type_pairs: list[tuple[int, int]] | None = None,
) -> tuple[np.ndarray, dict, dict]:
    """Compute radial distribution functions (RDFs) and cumulative coordination numbers.

    Calculates the pair-wise radial distribution function g(r) for specified
    atom-type pairs under periodic boundary conditions (orthogonal and
    triclinic), along with the cumulative coordination number n(r).

    Args:
        structure:  ASE Atoms object.
        r_max:      Maximum distance in Å (default 10.0).
        n_bins:     Number of radial bins (default 500).
        type_pairs: List of (atomic_number_1, atomic_number_2) pairs.
                    None -> all unique unordered combinations of present types
                    plus all same-type pairs.

    Returns:
        r: Radial bin centres in Å, shape (n_bins,).
        rdfs: Normalised g(r) for each type pair, shape (n_bins,).
        cn_cumulative: Mean number of neighbours of the second type within radius r
            around an atom of the first type, shape (n_bins,).

    Notes:
        - If r_max exceeds half the smallest perpendicular cell height, it is
          automatically reduced to the largest integer value that does not
          exceed that limit, and a warning message is printed.
        - If the adjusted r_max would be zero or negative (cell too small),
          a ValueError is raised instead.
        - This function operates on array indices internally (not atom IDs)
          because it only needs type information and distances, not ID lookup.
        - Periodic boundaries are handled via the minimum-image convention in
          fractional space, so both orthogonal and triclinic cells are correct.

    Example:
        >>> structure = read('glass.xyz')
        >>> r, rdfs, cn = compute_rdf(structure, r_max=10.0, n_bins=500)
        >>> g_SiO = rdfs[(8, 14)]
        >>> cn_SiO = cn[(8, 14)]   # O around Si
        >>> cn_OSi = cn[(14, 8)]   # Si around O

    """
    types = structure.get_atomic_numbers()
    unique_types = np.unique(types)
    type_counts = {int(t): int(np.sum(types == t)) for t in unique_types}
    cell = structure.get_cell().array
    volume = abs(np.linalg.det(cell))

    heights = cell_perpendicular_heights(cell)
    r_max_allowed = float(heights.min()) / 2.0
    if r_max > r_max_allowed:
        r_max_adjusted = float(math.floor(r_max_allowed))
        if r_max_adjusted <= 0.0:
            msg = (
                f"r_max_allowed={r_max_allowed:.4f} Å is less than 1 Å; no valid "
                "integer cutoff exists. Use a larger simulation box."
            )
            raise ValueError(msg)
        warnings.warn(
            f"r_max={r_max:.4f} Å exceeds half the smallest perpendicular "
            f"cell height ({r_max_allowed:.4f} Å). r_max has been automatically "
            f"adjusted to {r_max_adjusted:.1f} Å (largest integer not exceeding "
            f"the limit).",
            UserWarning,
            stacklevel=2,
        )
        r_max = r_max_adjusted

    if type_pairs is None:
        unordered_pairs = [(int(a), int(b)) for a, b in combinations_with_replacement(unique_types, 2)]
        requested_ordered = []
        for a, b in unordered_pairs:
            requested_ordered.append((a, b))
            if a != b:
                requested_ordered.append((b, a))
    else:
        unordered_pairs = list({(min(a, b), max(a, b)) for a, b in type_pairs})
        requested_ordered = list(dict.fromkeys(type_pairs))

    bin_edges = np.linspace(0, r_max, n_bins + 1)
    r = 0.5 * (bin_edges[1:] + bin_edges[:-1])
    dr = bin_edges[1] - bin_edges[0]
    shell_volumes = 4.0 * np.pi * r**2 * dr

    distances, i_idx, j_idx = _compute_distances(structure, r_max)
    type_i = types[i_idx]
    type_j = types[j_idx]

    rdfs, hist_directed = _compute_rdf_histograms(
        unordered_pairs,
        type_i,
        type_j,
        distances,
        type_counts,
        volume,
        bin_edges,
        shell_volumes,
    )
    cn_cumulative = _compute_cn_cumulative(requested_ordered, hist_directed, type_counts)

    return r, rdfs, cn_cumulative

compute_structure_factor(structure: Atoms, q_min: float = 0.5, q_max: float = 20.0, n_q: int = 500, r_max: float = 10.0, n_bins: int = 2000, radiation: str = 'neutron', *, lorch_damping: bool = True, type_pairs: list[tuple[int, int]] | None = None) -> tuple[np.ndarray, np.ndarray, dict[tuple[int, int], np.ndarray]]

Compute the 1D isotropic structure factor S(q) and Faber-Ziman partials.

Uses the Faber-Ziman formalism to compute partial structure factors S_ab(q) from partial radial distribution functions g_ab(r) via the sine transform, then combines them into the total S(q) weighted by coherent neutron scattering lengths or q-dependent X-ray form factors.

Neutron diffraction (Faber-Ziman total structure factor):

S(q) = 1 + sum_{a<=b} (2 - d_ab) * c_a * c_b * b_a * b_b * [S_ab(q) - 1] / <b>^2

where c_a is the atomic concentration, b_a the coherent scattering length (from NIST), and = sum_a c_a * b_a.

X-ray diffraction: b_a is replaced by the q-dependent International Tables form factor f_a(q) (four-Gaussian fit via pymatgen), making the weights q-dependent.

Parameters:

Name Type Description Default
structure Atoms

ASE Atoms object with periodic boundary conditions.

required
q_min float

Minimum momentum transfer in Angstroms^-1 (default 0.5).

0.5
q_max float

Maximum momentum transfer in Angstroms^-1 (default 20.0).

20.0
n_q int

Number of q-grid points (default 500).

500
r_max float

Real-space cutoff in Angstroms for the underlying RDF (default 10.0).

10.0
n_bins int

Number of radial bins for the RDF (default 2000).

2000
radiation str

Scattering probe: "neutron" uses tabulated NIST coherent scattering lengths; "xray" uses the International Tables four-Gaussian form factors via pymatgen (default "neutron").

'neutron'
lorch_damping bool

Apply the Lorch modification function M(r) = sinc(r/r_max) to suppress termination ripples from the finite r_max cutoff (default True).

True
type_pairs list[tuple[int, int]] | None

List of (Z_a, Z_b) pairs for which to compute partials. None computes all unique unordered pairs present in the structure.

None

Returns:

Name Type Description
q ndarray

Momentum transfer grid in Angstroms^-1, shape (n_q,).

sq_total ndarray

Total structure factor S(q), shape (n_q,).

sq_partials dict[tuple[int, int], ndarray]

Faber-Ziman partial structure factors S_ab(q), keyed by the canonical pair (min(Z_a, Z_b), max(Z_a, Z_b)), each with shape (n_q,).

Raises:

Type Description
ValueError

If radiation is not "neutron" or "xray".

KeyError

If a scattering length or form factor is not available for an element present in the structure.

Example

from ase.io import read structure = read("sodium_silicate.extxyz") q, sq, sq_partials = compute_structure_factor(structure, q_max=20.0) sq_SiO = sq_partials[(8, 14)] # partial S_SiO(q)

Source code in amorphouspy/src/amorphouspy/analysis/structure_factor.py
def compute_structure_factor(
    structure: Atoms,
    q_min: float = 0.5,
    q_max: float = 20.0,
    n_q: int = 500,
    r_max: float = 10.0,
    n_bins: int = 2000,
    radiation: str = "neutron",
    *,
    lorch_damping: bool = True,
    type_pairs: list[tuple[int, int]] | None = None,
) -> tuple[np.ndarray, np.ndarray, dict[tuple[int, int], np.ndarray]]:
    """Compute the 1D isotropic structure factor S(q) and Faber-Ziman partials.

    Uses the Faber-Ziman formalism to compute partial structure factors S_ab(q)
    from partial radial distribution functions g_ab(r) via the sine transform,
    then combines them into the total S(q) weighted by coherent neutron scattering
    lengths or q-dependent X-ray form factors.

    Neutron diffraction (Faber-Ziman total structure factor):

        S(q) = 1 + sum_{a<=b} (2 - d_ab) * c_a * c_b * b_a * b_b * [S_ab(q) - 1] / <b>^2

    where c_a is the atomic concentration, b_a the coherent scattering length
    (from NIST), and <b> = sum_a c_a * b_a.

    X-ray diffraction: b_a is replaced by the q-dependent International Tables
    form factor f_a(q) (four-Gaussian fit via pymatgen), making the weights q-dependent.

    Args:
        structure: ASE Atoms object with periodic boundary conditions.
        q_min: Minimum momentum transfer in Angstroms^-1 (default 0.5).
        q_max: Maximum momentum transfer in Angstroms^-1 (default 20.0).
        n_q: Number of q-grid points (default 500).
        r_max: Real-space cutoff in Angstroms for the underlying RDF (default 10.0).
        n_bins: Number of radial bins for the RDF (default 2000).
        radiation: Scattering probe: ``"neutron"`` uses tabulated NIST coherent
            scattering lengths; ``"xray"`` uses the International Tables
            four-Gaussian form factors via pymatgen (default ``"neutron"``).
        lorch_damping: Apply the Lorch modification function M(r) = sinc(r/r_max)
            to suppress termination ripples from the finite r_max cutoff
            (default True).
        type_pairs: List of (Z_a, Z_b) pairs for which to compute partials.
            ``None`` computes all unique unordered pairs present in the structure.

    Returns:
        q: Momentum transfer grid in Angstroms^-1, shape (n_q,).
        sq_total: Total structure factor S(q), shape (n_q,).
        sq_partials: Faber-Ziman partial structure factors S_ab(q), keyed by
            the canonical pair (min(Z_a, Z_b), max(Z_a, Z_b)), each with
            shape (n_q,).

    Raises:
        ValueError: If ``radiation`` is not ``"neutron"`` or ``"xray"``.
        KeyError: If a scattering length or form factor is not available for
            an element present in the structure.

    Example:
        >>> from ase.io import read
        >>> structure = read("sodium_silicate.extxyz")
        >>> q, sq, sq_partials = compute_structure_factor(structure, q_max=20.0)
        >>> sq_SiO = sq_partials[(8, 14)]  # partial S_SiO(q)

    """
    if radiation not in ("neutron", "xray"):
        msg = f"radiation must be 'neutron' or 'xray', got {radiation!r}."
        raise ValueError(msg)

    types = structure.get_atomic_numbers()
    unique_types = sorted({int(t) for t in types})
    total_atoms = len(structure)
    cell = structure.get_cell().array
    volume = float(abs(np.linalg.det(cell)))
    number_density = total_atoms / volume  # Angstrom^-3

    concentrations = {t: float(np.sum(types == t)) / total_atoms for t in unique_types}

    # --- Radial distribution functions ----------------------------------------
    r, rdfs, _ = compute_rdf(structure, r_max=r_max, n_bins=n_bins, type_pairs=type_pairs)
    unordered_pairs = list(rdfs.keys())

    # --- Momentum transfer grid -----------------------------------------------
    q_values = np.linspace(q_min, q_max, n_q)

    # --- Partial structure factors --------------------------------------------
    sq_partials: dict[tuple[int, int], np.ndarray] = {
        pair: _sine_transform_rdf(r, rdfs[pair], q_values, number_density, lorch_damping=lorch_damping)
        for pair in unordered_pairs
    }

    # --- Scattering weights and total S(q) ------------------------------------
    if radiation == "neutron":
        scattering_lengths = {t: _neutron_scattering_length(t) for t in unique_types}
        mean_b = sum(concentrations[t] * scattering_lengths[t] for t in unique_types)
        mean_b_sq = mean_b**2

        sq_total = np.zeros(n_q)
        for t1, t2 in unordered_pairs:
            b1 = scattering_lengths[t1]
            b2 = scattering_lengths[t2]
            symmetry_factor = 1.0 if t1 == t2 else 2.0
            weight = symmetry_factor * concentrations[t1] * concentrations[t2] * b1 * b2
            sq_total += weight * (sq_partials[(t1, t2)] - 1.0)
        sq_total = 1.0 + sq_total / mean_b_sq

    else:  # xray
        form_factors = {t: _xray_form_factor(t, q_values) for t in unique_types}
        # mean_f(q) = sum_a c_a * f_a(q), shape (n_q,)
        mean_f = np.sum(
            [concentrations[t] * form_factors[t] for t in unique_types],
            axis=0,
        )
        mean_f_sq = mean_f**2

        sq_total = np.zeros(n_q)
        for t1, t2 in unordered_pairs:
            f1 = form_factors[t1]
            f2 = form_factors[t2]
            symmetry_factor = 1.0 if t1 == t2 else 2.0
            weight = symmetry_factor * concentrations[t1] * concentrations[t2] * f1 * f2
            sq_total += weight * (sq_partials[(t1, t2)] - 1.0)
        sq_total = 1.0 + sq_total / mean_f_sq

    return q_values, sq_total, sq_partials

count_distribution(coord_numbers: dict[int, int]) -> dict[int, int]

Convert coordination numbers to a histogram distribution.

Parameters:

Name Type Description Default
coord_numbers dict[int, int]

Mapping from atom ID to coordination number.

required

Returns:

Type Description
dict[int, int]

Coordination number frequency histogram.

Example

dist = count_distribution({1: 4, 2: 4, 3: 3})

Source code in amorphouspy/src/amorphouspy/shared.py
def count_distribution(coord_numbers: dict[int, int]) -> dict[int, int]:
    """Convert coordination numbers to a histogram distribution.

    Args:
        coord_numbers: Mapping from atom ID to coordination number.

    Returns:
        Coordination number frequency histogram.

    Example:
        >>> dist = count_distribution({1: 4, 2: 4, 3: 3})

    """
    dist = {}
    for cn in coord_numbers.values():
        dist[cn] = dist.get(cn, 0) + 1
    return dist

create_random_atoms(composition: dict[str, float], n_molecules: int | None = None, target_atoms: int | None = None, mode: str = 'molar', stoichiometry: dict[str, dict[str, int]] | None = None, box_length: float = 50.0, min_distance: float = 1.6, seed: int = 42, max_attempts_per_atom: int = 100000) -> tuple[list[dict[str, str | list[float]]], dict[str, int]]

Generate random atom positions for a glass system in a periodic cubic box.

Supports specifying system size either by total number of molecules (n_molecules) or total number of atoms (target_atoms).

Parameters:

Name Type Description Default
composition dict[str, float]

A dictionary mapping oxide formulas to their fractions, e.g. {"CaO": 0.25, "Al2O3": 0.25, "SiO2": 0.5}.

required
n_molecules int | None

The desired total number of molecules. Mutually exclusive with target_atoms.

None
target_atoms int | None

The desired target number of atoms. Mutually exclusive with n_molecules.

None
mode str

The composition interpretation mode ("molar" or "weight"). Defaults to "molar".

'molar'
stoichiometry dict[str, dict[str, int]] | None

Optional pre-calculated stoichiometry dictionary.

None
box_length float

The length of the cubic simulation box in Angstroms. Defaults to 50.0.

50.0
min_distance float

The minimum allowed distance between any two atoms in Angstroms. Defaults to 1.6.

1.6
seed int

Random seed for reproducibility. Defaults to 42.

42
max_attempts_per_atom int

Maximum attempts to place a single atom before failing. Defaults to 100000.

100000

Returns:

Type Description
tuple[list[dict[str, str | list[float]]], dict[str, int]]

A tuple containing: - A list of atom dictionaries, each with "element" and "position". - A dictionary of total counts per element.

Raises:

Type Description
ValueError

If composition sum is invalid or target mode is ambiguous.

RuntimeError

If atom placement fails due to overcrowding (max attempts reached).

Example

atoms_list, atom_counts = create_random_atoms( ... composition={"SiO2": 0.8, "Na2O": 0.2}, ... target_atoms=500, ... box_length=20.0 ... )

Source code in amorphouspy/src/amorphouspy/structure/geometry.py
def create_random_atoms(
    composition: dict[str, float],
    n_molecules: int | None = None,
    target_atoms: int | None = None,
    mode: str = "molar",
    stoichiometry: dict[str, dict[str, int]] | None = None,
    box_length: float = 50.0,
    min_distance: float = 1.6,
    seed: int = 42,
    max_attempts_per_atom: int = 100000,
) -> tuple[list[dict[str, str | list[float]]], dict[str, int]]:
    """Generate random atom positions for a glass system in a periodic cubic box.

    Supports specifying system size either by total number of molecules (`n_molecules`)
    or total number of atoms (`target_atoms`).

    Args:
        composition: A dictionary mapping oxide formulas to their fractions,
            e.g. {"CaO": 0.25, "Al2O3": 0.25, "SiO2": 0.5}.
        n_molecules: The desired total number of molecules. Mutually exclusive with `target_atoms`.
        target_atoms: The desired target number of atoms. Mutually exclusive with `n_molecules`.
        mode: The composition interpretation mode ("molar" or "weight"). Defaults to "molar".
        stoichiometry: Optional pre-calculated stoichiometry dictionary.
        box_length: The length of the cubic simulation box in Angstroms. Defaults to 50.0.
        min_distance: The minimum allowed distance between any two atoms in Angstroms. Defaults to 1.6.
        seed: Random seed for reproducibility. Defaults to 42.
        max_attempts_per_atom: Maximum attempts to place a single atom before failing. Defaults to 100000.

    Returns:
        A tuple containing:
            - A list of atom dictionaries, each with "element" and "position".
            - A dictionary of total counts per element.

    Raises:
        ValueError: If composition sum is invalid or target mode is ambiguous.
        RuntimeError: If atom placement fails due to overcrowding (max attempts reached).

    Example:
        >>> atoms_list, atom_counts = create_random_atoms(
        ...     composition={"SiO2": 0.8, "Na2O": 0.2},
        ...     target_atoms=500,
        ...     box_length=20.0
        ... )

    """
    rng = np.random.default_rng(seed)

    validate_target_mode(n_molecules, target_atoms)

    if stoichiometry is None:
        stoichiometry = extract_stoichiometry(composition)

    if target_atoms is not None:
        system_plan = plan_system(composition, target_atoms, mode=mode, target_type="atoms")
        atom_counts = system_plan["element_counts"]
    else:
        assert n_molecules is not None
        _, atom_counts = _counts_from_n_molecules(composition, n_molecules, mode, stoichiometry)

    atoms = _place_atoms(atom_counts, box_length, min_distance, rng, max_attempts_per_atom)
    return atoms, atom_counts

cte_from_fluctuations_simulation(structure: Atoms, potential: str, temperature: float = 300, pressure: float = 0.0001, timestep: float = 1.0, equilibration_steps: int = 100000, production_steps: int = 200000, min_production_runs: int = 5, max_production_runs: int = 25, CTE_uncertainty_criterion: float = 1e-06, n_dump: int = 100000, n_log: int = 10, server_kwargs: dict[str, Any] | None = None, *, aniso: bool = False, seed: int | None = 12345, tmp_working_directory: str | Path | None = None) -> dict[str, Any]

Perform a LAMMPS-based cte simulation protocol based on fluctuations.

This workflow equilibrates a structure at a target temperature and performs a production MD run to collect the instantaneous total energy, pressure and volume, needed to compute the CTE via H-V fluctuation analysis or V-T data, depending if only one temperature or multiple temperatures should be probed.

The number of steps used here is only for testing purposes. It is assumed in this workflow that the given in structure is pre-equilibrated.

Parameters:

Name Type Description Default
structure Atoms

Input structure (assumed pre-equilibrated).

required
potential str

LAMMPS potential file.

required
temperature float

Simulation temperature in Kelvin (default 300 K).

300
pressure float

Target pressure in GPa (use pyiron units here!) for NPT simulations (default: 10-4 GPa = 10^5 Pa = 1 bar).

0.0001
timestep float

MD integration timestep in femtoseconds (default 1.0 fs).

1.0
equilibration_steps int

Number of MD steps for the equilibration run (default 100,000).

100000
production_steps int

Number of MD steps for the production runs (default 200,000).

200000
min_production_runs int

Minimum number of production runs to perform before checking for convergence (default 2).

5
max_production_runs int

Maximum number of production runs to perform before checking for convergence (default 10).

25
CTE_uncertainty_criterion float

Convergence criterion for the uncertainty of the linear CTE (default 1e-6/K).

1e-06
n_dump int

Dump output frequency of the production runs (default 100,000).

100000
n_log int

Log output frequency (default 10).

10
server_kwargs dict[str, Any] | None

Additional server configuration arguments for pyiron.

None
aniso bool

If false, an isotropic NPT calculation is performed and the simulation box is scaled uniformly. If True, anisotropic NPT calculation is performed and the simulation box can change shape and size independently along each axis (default False).

False
seed int | None

Random seed for velocity initialization (default 12345). If None, a random seed is used.

12345
tmp_working_directory str | Path | None

Temporary directory for job execution.

None

Returns:

Type Description
dict[str, Any]

Nested dictionary containing the "summary" and "data" keys. In the "summary" section the CTE

dict[str, Any]

values and their uncertainties are returned together with info whether CTE_uncertainty_criterion was

dict[str, Any]

reached within the max_production_runs.

dict[str, Any]

The "data" holds the collected data from all individual production runs. "run_index" is to

dict[str, Any]

clearly identify which production run the data belongs to. "steps" contains the number of steps

dict[str, Any]

for each run. Thermodynamic and structural data are averaged over each production run and these

dict[str, Any]

averages are listed under the respective key. Finally, also the structure after the simulation

dict[str, Any]

has finished is stored for further use or analysis. Example:

dict[str, Any]

{ 'summary' : {"CTE_V_mean" : ..., "CTE_x_mean" : ..., "CTE_y_mean" : ..., "CTE_z_mean" : ..., "CTE_V_uncertainty" : ..., "CTE_x_uncertainty" : ..., "CTE_y_uncertainty" : ..., "CTE_z_uncertainty" : ..., "is_converged" : "True" or "False", "convergence_criterion" : float },

dict[str, Any]

'data': {"run_index" : [1, 2, 3, ...], "steps" : [...], "T" : [...], "E_tot" : [...], "ptot" : [...], "pxx" : [...], "pyy" : [...], "pzz" : [...], "V" : [...], "Lx" : [...], "Ly" : [...], "Lz" : [...], "CTE_V" : [...], "CTE_x" : [...], "CTE_y" : [...], "CTE_z" : [...], "structure_final" : Atoms }

dict[str, Any]

}

Notes
  • The structure is first pre-equilibrated with a short, hard-coded 10 ps NVT run. Only then follows the user-defined NPT equilibration and production runs.
  • The simulation is only marked as converged if the uncertainty criterion is reached for all four CTE values (CTE_V, CTE_x, CTE_y, CTE_z) at the same time. The CTE_uncertainty_criterion is applied to as-is to the linear CTEs. How this is treated for the CTE_V, see next point.
  • How to chose the uncertainty criterion for the volumetric CTE? Should it be the same as the defined CTE_uncertainty_criterion applied to the linear CTEs? Consider this: The volumetric CTE is approximately the sum of the linear CTEs along x, y, and z. If three variables x, y and z were uncorrelated and have known uncertainties sigma_x, sigma_y, and sigma_z, the uncertainty of the sum of them would be: sigma_V = sqrt( sigma_x2 + sigma_y2 + sigma_z2 ). If we assume that the uncertainty criterion is reached at roughly the same time for all three variables, the uncertainty of the volumetric CTE can be approximated as sqrt(3)CTE_uncertainty_criterion. However, x, y and z are typically not uncorrelated. If calculated from the actual simulation data, the uncertainty of CTE_V is found to be approximately the same as the individual uncertainties of the linear CTEs. To be not too strict here, we keep the sqrt(3)CTE_uncertainty_criterion for CTE_V.
Example

result = cte_from_fluctuations_simulation( ... structure=my_atoms, ... potential=my_potential, ... temperature=300, ... equilibration_steps=500_000, ... production_steps=200_000, ... min_production_runs=10, ... max_production_runs=50, ... CTE_uncertainty_criterion=1e-6, ... )

Source code in amorphouspy/src/amorphouspy/workflows/cte.py
def cte_from_fluctuations_simulation(
    structure: Atoms,
    potential: str,
    temperature: float = 300,
    pressure: float = 1e-4,
    timestep: float = 1.0,
    equilibration_steps: int = 100_000,
    production_steps: int = 200_000,
    # n=5 corresponds to relative uncertainty of the standard deviation
    # delta sigma/sigma = (2(n-1))**-0.5 of ~35%
    min_production_runs: int = 5,
    max_production_runs: int = 25,
    CTE_uncertainty_criterion: float = 1e-6,
    n_dump: int = 100000,
    n_log: int = 10,
    server_kwargs: dict[str, Any] | None = None,
    *,
    aniso: bool = False,
    seed: int | None = 12345,
    tmp_working_directory: str | Path | None = None,
) -> dict[str, Any]:  # pylint: disable=too-many-positional-arguments
    """Perform a LAMMPS-based cte simulation protocol based on fluctuations.

    This workflow equilibrates a structure at a target temperature and performs a
    production MD run to collect the instantaneous total energy, pressure and volume,
    needed to compute the CTE via H-V fluctuation analysis or V-T data, depending
    if only one temperature or multiple temperatures should be probed.

    The number of steps used here is only for testing purposes.
    It is assumed in this workflow that the given in structure is pre-equilibrated.

    Args:
        structure: Input structure (assumed pre-equilibrated).
        potential: LAMMPS potential file.
        temperature: Simulation temperature in Kelvin (default 300 K).
        pressure: Target pressure in GPa (use pyiron units here!) for NPT simulations
            (default: 10-4 GPa = 10^5 Pa = 1 bar).
        timestep: MD integration timestep in femtoseconds (default 1.0 fs).
        equilibration_steps: Number of MD steps for the equilibration run (default 100,000).
        production_steps: Number of MD steps for the production runs (default 200,000).
        min_production_runs: Minimum number of production runs to perform before checking for
            convergence (default 2).
        max_production_runs: Maximum number of production runs to perform before checking for
            convergence (default 10).
        CTE_uncertainty_criterion: Convergence criterion for the uncertainty of the linear
            CTE (default 1e-6/K).
        n_dump: Dump output frequency of the production runs (default 100,000).
        n_log: Log output frequency (default 10).
        server_kwargs: Additional server configuration arguments for pyiron.
        aniso: If false, an isotropic NPT calculation is performed and the simulation box is
              scaled uniformly. If True, anisotropic NPT calculation is performed and the simulation
              box can change shape and size independently along each axis (default False).
        seed: Random seed for velocity initialization (default 12345). If None, a random seed is used.
        tmp_working_directory: Temporary directory for job execution.

    Returns:
        Nested dictionary containing the "summary" and "data" keys. In the "summary" section the CTE
        values and their uncertainties are returned together with info whether CTE_uncertainty_criterion was
        reached within the max_production_runs.
        The "data" holds the collected data from all individual production runs. "run_index" is to
        clearly identify which production run the data belongs to. "steps" contains the number of steps
        for each run. Thermodynamic and structural data are averaged over each production run and these
        averages are listed under the respective key. Finally, also the structure after the simulation
        has finished is stored for further use or analysis. Example:
        { 'summary' : {"CTE_V_mean" : ..., "CTE_x_mean" : ...,
                       "CTE_y_mean" : ..., "CTE_z_mean" : ...,
                       "CTE_V_uncertainty" : ..., "CTE_x_uncertainty" : ...,
                       "CTE_y_uncertainty" : ..., "CTE_z_uncertainty" : ...,
                       "is_converged" : "True" or "False",
                       "convergence_criterion" : float
                       },
        'data':  {"run_index" : [1, 2, 3, ...],
                    "steps" : [...],
                    "T" : [...],
                    "E_tot" : [...],
                    "ptot" : [...],
                    "pxx" : [...],
                    "pyy" : [...],
                    "pzz" : [...],
                    "V" : [...],
                    "Lx" : [...],
                    "Ly" : [...],
                    "Lz" : [...],
                    "CTE_V" : [...],
                    "CTE_x" : [...],
                    "CTE_y" : [...],
                    "CTE_z" : [...],
                    "structure_final" : Atoms
                    }
        }

    Notes:
        - The structure is first pre-equilibrated with a short, hard-coded 10 ps NVT run. Only then
          follows the user-defined NPT equilibration and production runs.
        - The simulation is only marked as converged if the uncertainty criterion is reached for all four CTE
          values (CTE_V, CTE_x, CTE_y, CTE_z) at the same time. The CTE_uncertainty_criterion is applied to
          as-is to the linear CTEs. How this is treated for the CTE_V, see next point.
        - How to chose the uncertainty criterion for the volumetric CTE? Should it be the same as the defined
          CTE_uncertainty_criterion applied to the linear CTEs? Consider this: The volumetric CTE is approximately
          the sum of the linear CTEs along x, y, and z. If three variables x, y and z were uncorrelated and
          have known uncertainties sigma_x, sigma_y, and sigma_z, the uncertainty of the sum of them would
          be: sigma_V = sqrt( sigma_x**2 + sigma_y**2 + sigma_z**2 ). If we assume that the uncertainty
          criterion is reached at roughly the same time for all three variables, the uncertainty of the
          volumetric CTE can be approximated as sqrt(3)*CTE_uncertainty_criterion. However, x, y and z are
          typically not uncorrelated. If calculated from the actual simulation data, the uncertainty of CTE_V
          is found to be approximately the same as the individual uncertainties of the linear CTEs. To be not
          too strict here, we keep the sqrt(3)*CTE_uncertainty_criterion for CTE_V.


    Example:
        >>> result = cte_from_fluctuations_simulation(
        ...     structure=my_atoms,
        ...     potential=my_potential,
        ...     temperature=300,
        ...     equilibration_steps=500_000,
        ...     production_steps=200_000,
        ...     min_production_runs=10,
        ...     max_production_runs=50,
        ...     CTE_uncertainty_criterion=1e-6,
        ... )

    """
    # Logging setup
    logger = _create_logger()

    # Check and adjust input parameters if necessary
    production_steps, min_production_runs, max_production_runs, N_for_averaging, n_dump = (
        _fluctuation_simulation_input_checker(
            production_steps, min_production_runs, max_production_runs, n_log, timestep, n_dump, logger
        )
    )

    # Set pressure to anisotropic if requested
    sim_pressure = [pressure, pressure, pressure, None, None, None] if aniso else pressure

    # initial structure used. Afterwards, it is updated after each temperature
    structure0 = structure

    logger.info("Starting 10 ps (hardcoded) NVT equilibration at %.2f K.", temperature)

    # Stage 1: Short equilibration in NVT at T for 10 ps
    structure1, _ = _run_lammps_md(
        structure=structure0,
        potential=potential,
        tmp_working_directory=tmp_working_directory,
        temperature=temperature,
        n_ionic_steps=10_000,
        timestep=timestep,
        n_dump=10_000,
        n_log=100,
        initial_temperature=temperature,
        langevin=False,
        seed=seed,
        server_kwargs=server_kwargs,
    )

    equilibration_time = equilibration_steps / timestep / 1000
    logger.info("Starting %.1f ps NVT equilibration at %.2f K and %.2e GPa.", equilibration_time, temperature, pressure)

    # Stage 2: NPT equilibration runs at T,p.
    structure2, _ = _run_lammps_md(
        structure=structure1,
        potential=potential,
        tmp_working_directory=tmp_working_directory,
        temperature=temperature,
        pressure=sim_pressure,
        n_ionic_steps=equilibration_steps,
        timestep=timestep,
        n_dump=equilibration_steps,
        n_log=100,
        initial_temperature=0,
        langevin=True,
        server_kwargs=server_kwargs,
    )

    # Stage 3: NPT production runs (loop) at T,p.
    results = _initialize_datadict(with_CTE_keys=True)
    counter_production_run = 1
    production_time = production_steps / timestep / 1000
    while counter_production_run <= max_production_runs:
        # to keep track of multiple production runs, print status message
        logger.info(
            "Starting %.1f ps NPT production run #%03d at %.2f K and %.2e GPa.",
            production_time,
            counter_production_run,
            temperature,
            pressure,
        )

        # actual production run
        structure_production, parsed_output = _run_lammps_md(
            structure=structure2,
            potential=potential,
            tmp_working_directory=tmp_working_directory,
            temperature=temperature,
            pressure=sim_pressure,
            n_ionic_steps=production_steps,
            timestep=timestep,
            n_dump=n_dump,
            n_log=n_log,
            initial_temperature=0,
            langevin=True,
            server_kwargs=server_kwargs,
        )

        # parse and check the output of the production run
        _sim_data = _collect_sim_data(parsed_output, counter_production_run)
        _sanity_check_sim_data(sim_data=_sim_data, T_target=temperature, p_target=pressure, logger=logger)

        # Calculate cte based on the data of the current production run
        _cte_results = _fluctuation_simulation_cte_calculation(
            sim_data=_sim_data,
            temperature=temperature,
            p=pressure,
            use_running_mean=True,
            N_points=N_for_averaging,
        )

        # merge results to have the averages over all production runs so far
        results = _fluctuation_simulation_merge_results(
            previous_data=results, new_sim_data=_sim_data, new_cte_data=_cte_results
        )

        # start checking for convergence once the min number of production runs have been executed
        if counter_production_run >= min_production_runs:
            converge_bool, cte_summary = _fluctuation_simulation_uncertainty_check(results, CTE_uncertainty_criterion)
            logger.info(
                "Production run #%03d finished. Current CTE_V = %.4e +/- %.4e 1/K.",
                counter_production_run,
                cte_summary["CTE_V_mean"],
                cte_summary["CTE_V_uncertainty"],
            )

            # If converged, break the loop and update the summary accordingly
            if converge_bool:
                cte_summary.update({"is_converged": "True", "convergence_criterion": CTE_uncertainty_criterion})
                msg = f"All CTEs converged after production run #{counter_production_run:03d}."
                msg += "\nFINISHED SUCCESSFULLY."
                logger.info(msg)
                break

            # Also break the loop if max number of production runs is reached without convergence and update summary
            if counter_production_run == max_production_runs:
                cte_summary.update({"is_converged": "False", "convergence_criterion": CTE_uncertainty_criterion})
                msg = f"Maximum number of production runs ({max_production_runs}) reached without CTE convergence."
                msg += "\nFINISHED WITHOUT CONVERGENCE."
                logger.info(msg)
                break

        # In all other cases, continue with the next production run
        counter_production_run += 1
        structure2 = structure_production

    results.update({"structure_final": structure_production})
    return {"summary": cte_summary, "data": results}

cte_from_npt_fluctuations(temperature: float | list | np.ndarray, enthalpy: list | np.ndarray, volume: list | np.ndarray, N_points: int = 1000, *, use_running_mean: bool = False) -> float

Compute the CTE from enthalpy-volume cross-correlations.

Data needs to be obtained from a constant pressure, constant temperature NPT simulation. Formula used can be found in See Tildesley, Computer Simulation of Liquids, Eq. 2.94.

Parameters:

Name Type Description Default
temperature float | list | ndarray

Target temperature (defining the ensemble) in K. Can be specified as a single value (int/float). If provided as list or array-like, the mean will be used as target temperature.

required
enthalpy list | ndarray

"Instantaneous enthalpy" in eV, list or np.ndarray. Not that the instantaneous enthalpy is given by H = U + pV, where U is the instantaneous internal energy (i.e., kinetic + potential of every timestep), p the pressure defining the ensemble and V the average volume. Note that this is not the same quantity as the enthalpy obtained from LAMMPS directly via the 'enthalpy' output from thermo_modify. The latter uses the instantaneous pressure at the given timestep instead of the average (or ensemble-defining) pressure.

required
volume list | ndarray

Instantaneous volume in Ang^3, list or np.ndarray. Here, one can also feed individual cell lengths for anisotropic CTE calculations.

required
N_points int

Window size for running mean calculation if running_mean is True. Defaults to 1000.

1000
use_running_mean bool

Conventionally, fluctuations are calculated as difference from the mean of the whole trajectory. If use_running_mean is True, running mean values are used to determine fluctuations. This can be useful for non-stationary data where drift in volume and energy is still observed. Defaults to False.

False

Returns:

Type Description
float

The calculated CTE value in 1/K.

Example

cte = cte_from_npt_fluctuations( ... temperature=300.0, ... enthalpy=enthalpy_array, ... volume=volume_array ... )

Notes
  • If temperature is provided as int/float, it will be used as-is for the target temperature to compute the CTE.
  • If temperature is provided as list/array-like, the mean of the temperature data will be used as target temperature for the CTE calculation. See below how this is slightly changed if running mean is used.
  • If running_mean is set to True used, a part of the volume and enthalpy data at the beginning and end of the arrays cannot be used because it is required to calculate the running mean values. To ensure that the data for the calculation of the fluctuations is consistent with the mean volume and target temperature (in case this was provided as list/array-like), also this data will be trimmed correspondingly.
Source code in amorphouspy/src/amorphouspy/analysis/cte.py
def cte_from_npt_fluctuations(
    temperature: float | list | np.ndarray,
    enthalpy: list | np.ndarray,
    volume: list | np.ndarray,
    N_points: int = 1000,
    *,
    use_running_mean: bool = False,
) -> float:
    """Compute the CTE from enthalpy-volume cross-correlations.

    Data needs to be obtained from a constant pressure, constant temperature NPT simulation. Formula used
    can be found in See Tildesley, Computer Simulation of Liquids, Eq. 2.94.

    Args:
        temperature: Target temperature (defining the ensemble) in K. Can be specified as a single value (int/float).
            If provided as list or array-like, the mean will be used as target temperature.
        enthalpy: "Instantaneous enthalpy" in eV, list or np.ndarray. Not that the instantaneous enthalpy is given by
            H = U + pV, where U is the instantaneous internal energy (i.e., kinetic + potential of every timestep),
            p the pressure defining the ensemble and V the average volume. Note that this is not the same quantity
            as the enthalpy obtained from LAMMPS directly via the 'enthalpy' output from thermo_modify. The latter
            uses the instantaneous pressure at the given timestep instead of the average (or ensemble-defining)
            pressure.
        volume: Instantaneous volume in Ang^3, list or np.ndarray. Here, one can also feed individual cell lengths for
            anisotropic CTE calculations.
        N_points: Window size for running mean calculation if running_mean is True. Defaults to 1000.
        use_running_mean: Conventionally, fluctuations are calculated as difference from the mean of the whole
            trajectory. If use_running_mean is True, running mean values are used to determine fluctuations. This
            can be useful for non-stationary data where drift in volume and energy is still observed. Defaults to False.

    Returns:
        The calculated CTE value in 1/K.

    Example:
        >>> cte = cte_from_npt_fluctuations(
        ...     temperature=300.0,
        ...     enthalpy=enthalpy_array,
        ...     volume=volume_array
        ... )

    Notes:
        - If temperature is provided as int/float, it will be used as-is for the target temperature to compute the CTE.
        - If temperature is provided as list/array-like, the mean of the temperature data will be used as target
          temperature for the CTE calculation. See below how this is slightly changed if running mean is used.
        - If running_mean is set to True used, a part of the volume and enthalpy data at the beginning and end of the
          arrays cannot be used because it is required to calculate the running mean values. To ensure that the data for
          the calculation of the fluctuations is consistent with the mean volume and target temperature (in case this
          was provided as list/array-like), also this data will be trimmed correspondingly.
    """

    def _input_checks(
        enthalpy: list | np.ndarray,
        volume: list | np.ndarray,
        temperature: float | list | np.ndarray,
        N_points: int,
        *,
        use_running_mean: bool,
    ) -> tuple[int | None, int | None]:
        """Checks the input beyond simple type checks and determines the padding for the running mean if required.

        Args:
            enthalpy: See main function.
            volume: See main function.
            temperature: See main function.
            N_points: See main function.
            use_running_mean: See main function.

        Returns:
            padL: Number of points to be trimmed at the beginning of the enthalpy and volume arrays if use_running_mean
                is True. None otherwise. Will also be used for trimming the temperature if provided as list or
                array-like to ensure consistency with the mean volume and target temperature.
            padR: Number of points to be trimmed at the end of the enthalpy and volume arrays if use_running_mean is
                True. None otherwise. Will also be used for trimming the temperature if provided as list or array-like
                to ensure consistency with the mean volume and target temperature.
        """
        if isinstance(temperature, (int, float)):
            if len(enthalpy) != len(volume):
                msg = (
                    "If temperature is provided as int or float, enthalpy and volume arrays must have the same length."
                )
                raise ValueError(msg)
        elif len(temperature) != len(enthalpy) or len(enthalpy) != len(volume):
            msg = "If temperature is provided as list or array-like,"
            msg += " it must have the same length as the enthalpy and volume arrays."
            raise ValueError(msg)

        if use_running_mean:
            if N_points < 1:
                msg = "N_points must be a positive integer >= 1."
                raise ValueError(msg)
            if N_points >= len(enthalpy):
                msg = "N_points must be smaller than the length of the enthalpy and volume arrays."
                raise ValueError(msg)
            padL, padR = int(N_points / 2), N_points - int(N_points / 2) - 1
            return padL, padR
        return None, None

    # if use_running_mean = False, padL and padR will both be None
    padL, padR = _input_checks(enthalpy, volume, temperature, N_points, use_running_mean=use_running_mean)

    if use_running_mean:
        assert padR is not None
        assert padL is not None
        H_fluctuations = np.array(np.array(enthalpy) - running_mean(enthalpy, N_points))[padL:-padR]
        V_fluctuations = np.array(np.array(volume) - running_mean(volume, N_points))[padL:-padR]
    else:
        H_fluctuations = np.array(enthalpy) - np.mean(enthalpy)
        V_fluctuations = np.array(volume) - np.mean(volume)

    V_mean = np.mean(volume) if padL is None or padR is None else np.mean(volume[padL:-padR])
    T_target = (
        temperature
        if isinstance(temperature, (int, float))
        else (np.mean(temperature) if padL is None or padR is None else np.mean(temperature[padL:-padR]))
    )

    kB = 8.617333262145e-5  # eV/K
    CTE = (np.mean(H_fluctuations * V_fluctuations)) / (V_mean * kB * T_target**2)
    return float(CTE)

cte_from_volume_temperature_data(temperature: list | np.ndarray, volume: list | np.ndarray, reference_volume: float | None = None) -> tuple[float, float]

Compute the CTE from slope of a linear fit to volume-temperature data.

This can be done from by performing various constant pressure, constant temperature NPT simulation at different temperatures. Afterwards, collect the averaged volume and fit linearly to the temperature. Divide slope by the volume belonging to the lowest temperature to obtain the CTE. If specified, the volume-temperature plot is shown and the fitted line is overlaid.

Parameters:

Name Type Description Default
temperature list | ndarray

Temperature in K.

required
volume list | ndarray

Volume in Angstrom^3 or other structural quantity of interest. E.g., for anisotropic CTE calculations, one can also use individual cell lengths in Angstroms here.

required
reference_volume optional

Reference of the volume (or structural quantity of interest) the CTE will be calculated from. If not otherwise specified, the volume corresponding to the lowest temperature is used as reference.

None

Returns:

Type Description
tuple[float, float]

The calculated CTE value in 1/K and the R^2 value of the linear fit.

Note
  • This function is currently not in use in the workflows. But it might be used for cross-checking.
  • If volume-temperature data is used, the CTE needs to be divided by 3 to obtain the linear CTE.
Example

cte = cte_from_volume_temperature_data( ... temperature=[300, 400, 500], ... volume=[100.0, 100.1, 100.2] ... )

Source code in amorphouspy/src/amorphouspy/analysis/cte.py
def cte_from_volume_temperature_data(
    temperature: list | np.ndarray,
    volume: list | np.ndarray,
    reference_volume: float | None = None,
) -> tuple[float, float]:
    """Compute the CTE from slope of a linear fit to volume-temperature data.

    This can be done from by performing various constant pressure, constant temperature NPT simulation
    at different temperatures. Afterwards, collect the averaged volume and fit linearly to the temperature.
    Divide slope by the volume belonging to the lowest temperature to obtain the CTE.
    If specified, the volume-temperature plot is shown and the fitted line is overlaid.

    Args:
        temperature: Temperature in K.
        volume: Volume in Angstrom^3 or other structural quantity of interest. E.g., for anisotropic CTE
            calculations, one can also use individual cell lengths in Angstroms here.
        reference_volume (optional): Reference of the volume (or structural quantity of interest) the CTE
            will be calculated from. If not otherwise specified, the volume corresponding to the lowest
            temperature is used as reference.

    Returns:
        The calculated CTE value in 1/K and the R^2 value of the linear fit.

    Note:
        - This function is currently not in use in the workflows. But it might be used for cross-checking.
        - If volume-temperature data is used, the CTE needs to be divided by 3 to obtain the linear CTE.

    Example:
        >>> cte = cte_from_volume_temperature_data(
        ...     temperature=[300, 400, 500],
        ...     volume=[100.0, 100.1, 100.2]
        ... )

    """
    # Set reference volume corresponding to the lowest temperature if not provided.
    if reference_volume is None:
        index_lowest_T = np.argmin(temperature)
        reference_volume = volume[index_lowest_T]

    # fit and calculate CTE
    slope, _intercept, r_value, _p_value, _std_err = linregress(temperature, volume)
    CTE = slope / reference_volume
    R2 = r_value**2

    return float(CTE), float(R2)

elastic_simulation(structure: Atoms, potential: pd.DataFrame, temperature_sim: float = 300.0, pressure: float | None = None, timestep: float = 1.0, equilibration_steps: int = 1000000, production_steps: int = 10000, n_print: int = 1, strain: float = 0.001, server_kwargs: dict[str, Any] | None = None, *, langevin: bool = False, seed: int = 12345, tmp_working_directory: str | Path | None = None) -> dict[str, Any]

Perform a LAMMPS-based elastic constant simulation via the stress-strain method.

This workflow calculates the elastic stiffness tensor (Cij) using finite differences. It equilibrates the structure, applies small normal and shear strains, and measures the resulting stress response to determine the elastic constants.

Parameters:

Name Type Description Default
structure Atoms

Input structure (assumed pre-equilibrated).

required
potential DataFrame

LAMMPS potential file.

required
temperature_sim float

Simulation temperature in Kelvin (default 5000.0 K).

300.0
pressure float | None

Target pressure for equilibration (default None, i.e., NVT).

None
timestep float

MD integration timestep in femtoseconds (default 1.0 fs).

1.0
equilibration_steps int

Number of steps for the initial equilibration phase (default 1,000,000).

1000000
production_steps int

Number of MD steps for the production run (default 10,000).

10000
n_print int

Thermodynamic output frequency (default 1).

1
strain float

Magnitude of the strain applied for finite differences (default 1e-3).

0.001
server_kwargs dict[str, Any] | None

Additional server configuration arguments.

None
langevin bool

Whether to use Langevin dynamics (default False).

False
seed int

Random seed for velocity initialization (default 12345).

12345
tmp_working_directory str | Path | None

Temporary directory for job execution.

None

Returns:

Type Description
dict[str, Any]

Dictionary containing the results. Key "result" contains the "Cij" 6x6 matrix.

Notes
  • The structure is first equilibrated (NPT/NVT).
  • Positive and negative strains are applied to cancel lower-order errors (central difference).
  • Calculated Cij values assume Voigt notation.
  • For production simulations, system size, cooling rate, equilibration time, and strain magnitude should be tested to ensure the robustness of the results.
Example

result = elastic_simulation( ... structure=my_atoms, ... potential=my_potential, ... temperature_sim=300.0, ... strain=0.001 ... )

Source code in amorphouspy/src/amorphouspy/workflows/elastic_mod.py
def elastic_simulation(
    structure: Atoms,
    potential: pd.DataFrame,
    temperature_sim: float = 300.0,
    pressure: float | None = None,
    timestep: float = 1.0,
    equilibration_steps: int = 1_000_000,
    production_steps: int = 10_000,
    n_print: int = 1,
    strain: float = 1e-3,
    server_kwargs: dict[str, Any] | None = None,
    *,
    langevin: bool = False,
    seed: int = 12345,
    tmp_working_directory: str | Path | None = None,
) -> dict[str, Any]:  # pylint: disable=too-many-positional-arguments
    """Perform a LAMMPS-based elastic constant simulation via the stress-strain method.

    This workflow calculates the elastic stiffness tensor (Cij) using finite differences.
    It equilibrates the structure, applies small normal and shear strains, and measures
    the resulting stress response to determine the elastic constants.

    Args:
        structure: Input structure (assumed pre-equilibrated).
        potential: LAMMPS potential file.
        temperature_sim: Simulation temperature in Kelvin (default 5000.0 K).
        pressure: Target pressure for equilibration (default None, i.e., NVT).
        timestep: MD integration timestep in femtoseconds (default 1.0 fs).
        equilibration_steps: Number of steps for the initial equilibration phase (default 1,000,000).
        production_steps: Number of MD steps for the production run (default 10,000).
        n_print: Thermodynamic output frequency (default 1).
        strain: Magnitude of the strain applied for finite differences (default 1e-3).
        server_kwargs: Additional server configuration arguments.
        langevin: Whether to use Langevin dynamics (default False).
        seed: Random seed for velocity initialization (default 12345).
        tmp_working_directory: Temporary directory for job execution.

    Returns:
        Dictionary containing the results. Key "result" contains the "Cij" 6x6 matrix.

    Notes:
        - The structure is first equilibrated (NPT/NVT).
        - Positive and negative strains are applied to cancel lower-order errors (central difference).
        - Calculated Cij values assume Voigt notation.
        - For production simulations, system size, cooling rate, equilibration time,
        and strain magnitude should be tested to ensure the robustness of the results.

    Example:
        >>> result = elastic_simulation(
        ...     structure=my_atoms,
        ...     potential=my_potential,
        ...     temperature_sim=300.0,
        ...     strain=0.001
        ... )

    """
    potential_name = potential.loc[0, "Name"]

    if potential_name.lower() == "shik":
        exclude_patterns = [
            "fix langevin all langevin 5000 5000 0.01 48279",
            "fix ensemble all nve/limit 0.5",
            "run 10000",
            "unfix langevin",
            "unfix ensemble",
        ]

        potential["Config"] = potential["Config"].apply(
            lambda lines: [line for line in lines if not any(p in line for p in exclude_patterns)]
        )

    # Stage 0: INITIAL EQUILIBRATION
    structure0, res = _run_lammps_md(
        structure=structure,
        potential=potential,
        tmp_working_directory=tmp_working_directory,
        temperature=temperature_sim,
        n_ionic_steps=equilibration_steps,
        timestep=timestep,
        n_print=n_print,
        initial_temperature=temperature_sim,
        pressure=pressure,
        seed=seed,
        langevin=langevin,
        server_kwargs=server_kwargs,
    )

    structure_npt = structure0.copy()

    # Enforce cubic average cell

    gen0 = res.get("generic")
    assert gen0 is not None
    vol_array = np.array(gen0["volume"])
    slice_start = len(vol_array) // 2
    avg_l = np.mean(vol_array[slice_start:]) ** (1.0 / 3.0)

    structure_npt.set_cell([[avg_l, 0, 0], [0, avg_l, 0], [0, 0, avg_l]], scale_atoms=True)

    # Shared settings for all production runs
    md_kwargs = {
        "potential": potential,
        "tmp_working_directory": tmp_working_directory,
        "temperature": temperature_sim,
        "n_ionic_steps": production_steps,
        "timestep": timestep,
        "n_print": n_print,
        "initial_temperature": temperature_sim,
        "pressure": pressure,
        "langevin": langevin,
        "seed": seed,
        "server_kwargs": server_kwargs,
    }

    cij = np.zeros((6, 6))
    denom = 2 * strain

    # Normal strains (C11, C22, C33 and off-diagonals)
    for i in range(3):
        strain_mat = np.zeros((3, 3))
        strain_mat[i, i] = strain
        stress_diff = _run_strained_md(structure_npt, strain_mat, md_kwargs)
        cij[i, 0] = stress_diff[0, 0] / denom
        cij[i, 1] = stress_diff[1, 1] / denom
        cij[i, 2] = stress_diff[2, 2] / denom

    # Shear strains (C44, C55, C66)
    shear_indices = [(1, 2), (0, 2), (0, 1)]  # yz, xz, xy
    for i, (idx1, idx2) in enumerate(shear_indices):
        strain_mat = np.zeros((3, 3))
        strain_mat[idx1, idx2] = strain / 2
        strain_mat[idx2, idx1] = strain / 2
        stress_diff = _run_strained_md(structure_npt, strain_mat, md_kwargs)
        voigt = 3 + i
        cij[voigt, voigt] = stress_diff[idx1, idx2] / denom

    moduli = isotropic_moduli_from_Cij(cij)

    result = {"Cij": cij, "moduli": moduli}

    # After calculating all Cij
    if not np.allclose(cij[0, 0], cij[1, 1]) or not np.allclose(cij[1, 1], cij[2, 2]):
        warnings.warn("System may not be cubic: C11 != C22 != C33", stacklevel=2)
    if not np.allclose(cij[3, 3], cij[4, 4]) or not np.allclose(cij[4, 4], cij[5, 5]):
        warnings.warn("System may not be cubic: C44 != C55 != C66", stacklevel=2)

    return result

extract_composition(composition: dict[str, float], tolerance: float = COMPOSITION_TOLERANCE) -> dict[str, float]

Validate and normalize a composition dictionary.

Handles both fractional (0.0-1.0) and percentage (0-100) inputs. Always returns fractions summing to 1.0.

Parameters:

Name Type Description Default
composition dict[str, float]

A dictionary mapping oxide formulas to their fractions, e.g. {"CaO": 0.25, "Al2O3": 0.25, "SiO2": 0.5}.

required
tolerance float

Maximum allowed fractional deviation from 1.0. Defaults to COMPOSITION_TOLERANCE.

COMPOSITION_TOLERANCE

Returns:

Type Description
dict[str, float]

A dictionary mapping oxide formulas to their molar fractions.

Raises:

Type Description
ValueError

If the composition is empty, contains invalid elements, non-neutral oxides, or sums to an invalid total.

Source code in amorphouspy/src/amorphouspy/structure/composition.py
def extract_composition(
    composition: dict[str, float],
    tolerance: float = COMPOSITION_TOLERANCE,
) -> dict[str, float]:
    """Validate and normalize a composition dictionary.

    Handles both fractional (0.0-1.0) and percentage (0-100) inputs. Always returns
    fractions summing to 1.0.

    Args:
        composition: A dictionary mapping oxide formulas to their fractions,
            e.g. {"CaO": 0.25, "Al2O3": 0.25, "SiO2": 0.5}.
        tolerance: Maximum allowed fractional deviation from 1.0. Defaults to
            ``COMPOSITION_TOLERANCE``.

    Returns:
        A dictionary mapping oxide formulas to their molar fractions.

    Raises:
        ValueError: If the composition is empty, contains invalid elements,
            non-neutral oxides, or sums to an invalid total.

    """
    if not composition:
        error_msg = "Empty composition"
        raise ValueError(error_msg)

    valid_elements = set(chemical_symbols[1:])
    comp_dict = {}
    total = 0.0

    for oxide, frac in composition.items():
        if frac < 0:
            error_msg = f"Negative fraction for '{oxide}': {frac}"
            raise ValueError(error_msg)

        matches = ELEMENT.findall(oxide)
        for element, _ in matches:
            if element not in valid_elements:
                error_msg = f"Invalid element '{element}' in oxide '{oxide}'"
                raise ValueError(error_msg)

        check_neutral_oxide(oxide)

        comp_dict[oxide] = frac
        total += frac

    if total == 0:
        error_msg = "Total composition sum is zero"
        raise ValueError(error_msg)

    pct_tol = 100 * tolerance

    if abs(total - 100) <= pct_tol or abs(total - 1.0) <= tolerance:
        comp_dict = {oxide: frac / total for oxide, frac in comp_dict.items()}
    elif total > 100 + pct_tol:
        error_msg = f"Total exceeds 100% + {pct_tol}% tolerance: {total:.2f}%"
        raise ValueError(error_msg)
    elif total < 1.0 - tolerance:
        error_msg = f"Component sum ({total:.4f}) is less than allowed minimum"
        raise ValueError(error_msg)
    else:
        error_msg = f"Invalid composition sum ({total:.4f}). Expected ~100 mol% or ~1.0 fractional."
        raise ValueError(error_msg)

    return comp_dict

find_rdf_minimum(r: np.ndarray, rdf_data: np.ndarray, sigma: float = 2.0, window_length: int = 21, polyorder: int = 3) -> float | None

Identify the first minimum in a radial distribution function (RDF) curve.

The function applies Gaussian and Savitzky-Golay smoothing to reduce noise, detects the first peak, and searches for the first subsequent minimum using gradient sign changes. If no valid minimum is found, it falls back to detecting where the RDF drops below 1.0 after the peak.

Parameters:

Name Type Description Default
r ndarray

Array of radial distances (Å), assumed to be uniformly spaced.

required
rdf_data ndarray

RDF values corresponding to r.

required
sigma float

Standard deviation for Gaussian smoothing.

2.0
window_length int

Window length for Savitzky-Golay filter. Must be odd.

21
polyorder int

Polynomial order for Savitzky-Golay filter.

3

Returns:

Type Description
float | None

The radial distance of the first minimum (Å) if found, otherwise None.

Example

r = np.linspace(0, 10, 100) rdf = np.sin(r) + 1.0 # Dummy data min_r = find_rdf_minimum(r, rdf)

Source code in amorphouspy/src/amorphouspy/workflows/structural_analysis.py
def find_rdf_minimum(
    r: np.ndarray, rdf_data: np.ndarray, sigma: float = 2.0, window_length: int = 21, polyorder: int = 3
) -> float | None:
    """Identify the first minimum in a radial distribution function (RDF) curve.

    The function applies Gaussian and Savitzky-Golay smoothing to reduce noise,
    detects the first peak, and searches for the first subsequent minimum using
    gradient sign changes. If no valid minimum is found, it falls back to
    detecting where the RDF drops below 1.0 after the peak.

    Args:
        r: Array of radial distances (Å), assumed to be uniformly spaced.
        rdf_data: RDF values corresponding to `r`.
        sigma: Standard deviation for Gaussian smoothing.
        window_length: Window length for Savitzky-Golay filter. Must be odd.
        polyorder: Polynomial order for Savitzky-Golay filter.

    Returns:
        The radial distance of the first minimum (Å) if found, otherwise `None`.

    Example:
        >>> r = np.linspace(0, 10, 100)
        >>> rdf = np.sin(r) + 1.0  # Dummy data
        >>> min_r = find_rdf_minimum(r, rdf)

    """
    rdf_smooth1 = gaussian_filter1d(rdf_data, sigma=sigma)
    rdf_smooth2 = np.asarray(
        savgol_filter(rdf_smooth1, window_length=window_length, polyorder=polyorder),
        dtype=np.float64,
    )

    first_peak_idx = int(np.argmax(rdf_smooth2))
    dr = r[1] - r[0]  # assuming uniform spacing
    gradient = np.gradient(rdf_smooth2, dr)

    # PERF401 fix: list comprehension instead of appending in a loop
    sign_changes = [i for i in range(first_peak_idx + 1, len(gradient) - 1) if gradient[i] < 0 and gradient[i + 1] > 0]

    if sign_changes:
        first_min_idx = sign_changes[0]
        peak_height = rdf_smooth2[first_peak_idx]
        min_height = rdf_smooth2[first_min_idx]
        if (peak_height - min_height) > 0.1 * peak_height:
            return r[first_min_idx]

    for i in range(first_peak_idx + 1, len(rdf_smooth2)):
        if rdf_smooth2[i] < 1.0:
            return r[i]

    return None

fit_vft(T_data: ArrayLike, log10_eta_data: ArrayLike, initial_guess: tuple[float, float, float] = (-3, 1000, 200)) -> tuple[tuple[float, float, float], NDArray[np.float64]]

Fits viscosity data to the Vogel-Fulcher-Tammann (VFT) model.

Parameters:

Name Type Description Default
T_data ArrayLike

Temperatures in Kelvin.

required
log10_eta_data ArrayLike

log10(viscosity) values.

required
initial_guess tuple[float, float, float]

Initial guess for (log10_eta0, B, T0).

(-3, 1000, 200)

Returns:

Type Description
tuple[tuple[float, float, float], NDArray[float64]]

A tuple containing the best-fit parameters and the covariance matrix.

Example

params, cov = fit_vft(temps, log_visc_data) log10_eta0, B, T0 = params

Source code in amorphouspy/src/amorphouspy/workflows/viscosity.py
def fit_vft(
    T_data: ArrayLike, log10_eta_data: ArrayLike, initial_guess: tuple[float, float, float] = (-3, 1000, 200)
) -> tuple[tuple[float, float, float], NDArray[np.float64]]:
    """Fits viscosity data to the Vogel-Fulcher-Tammann (VFT) model.

    Args:
        T_data: Temperatures in Kelvin.
        log10_eta_data: log10(viscosity) values.
        initial_guess: Initial guess for (log10_eta0, B, T0).

    Returns:
        A tuple containing the best-fit parameters and the covariance matrix.

    Example:
        >>> params, cov = fit_vft(temps, log_visc_data)
        >>> log10_eta0, B, T0 = params

    """
    popt, pcov = curve_fit(vft_model, T_data, log10_eta_data, p0=initial_guess, maxfev=1000000)
    return popt, pcov

formula_mass_g_per_mol(formula: str) -> float

Calculate the molar mass of a compound using ASE atomic masses.

Parameters:

Name Type Description Default
formula str

A string representing the chemical formula (e.g., "SiO2").

required

Returns:

Type Description
float

The molar mass of the compound in grams per mole.

Source code in amorphouspy/src/amorphouspy/structure/composition.py
def formula_mass_g_per_mol(formula: str) -> float:
    """Calculate the molar mass of a compound using ASE atomic masses.

    Args:
        formula: A string representing the chemical formula (e.g., "SiO2").

    Returns:
        The molar mass of the compound in grams per mole.

    """
    return sum(get_atomic_mass(el) * cnt for el, cnt in parse_formula(formula).items())

generate_bond_length_dict(atoms: Atoms, specific_cutoffs: dict[tuple[str, str], float] | None = None, default_cutoff: float = -1.0) -> dict[tuple[str, str], float]

Generate all symmetric element pairs and assign bond length cutoffs.

Parameters:

Name Type Description Default
atoms Atoms

ASE Atoms object whose species determine the pair set.

required
specific_cutoffs dict[tuple[str, str], float] | None

Optional cutoff overrides for specific element pairs. Both orderings ('A','B') and ('B','A') are recognised.

None
default_cutoff float

Fallback bond length for pairs not in specific_cutoffs.

-1.0

Returns:

Type Description
dict[tuple[str, str], float]

Dictionary mapping every symmetric element pair to its cutoff.

Examples:

>>> from ase.io import read
>>> structure = read('glass.xyz')
>>> bond_lengths = generate_bond_length_dict(
...     structure,
...     specific_cutoffs={('Si', 'O'): 1.8},
...     default_cutoff=2.0,
... )
Source code in amorphouspy/src/amorphouspy/analysis/rings.py
def generate_bond_length_dict(
    atoms: Atoms,
    specific_cutoffs: dict[tuple[str, str], float] | None = None,
    default_cutoff: float = -1.0,
) -> dict[tuple[str, str], float]:
    """Generate all symmetric element pairs and assign bond length cutoffs.

    Args:
        atoms: ASE Atoms object whose species determine the pair set.
        specific_cutoffs: Optional cutoff overrides for specific element
            pairs. Both orderings ``('A','B')`` and ``('B','A')`` are
            recognised.
        default_cutoff: Fallback bond length for pairs not in
            ``specific_cutoffs``.

    Returns:
        Dictionary mapping every symmetric element pair to its cutoff.

    Examples:
        >>> from ase.io import read
        >>> structure = read('glass.xyz')
        >>> bond_lengths = generate_bond_length_dict(
        ...     structure,
        ...     specific_cutoffs={('Si', 'O'): 1.8},
        ...     default_cutoff=2.0,
        ... )
    """
    if specific_cutoffs is None:
        specific_cutoffs = {}

    atomic_numbers = atoms.get_atomic_numbers()
    type_dict = type_to_dict(atomic_numbers)
    elements = list(type_dict.values())
    bond_dict: dict[tuple[str, str], float] = {}

    for element_a, element_b in combinations_with_replacement(elements, 2):
        cutoff = specific_cutoffs.get(
            (element_a, element_b),
            specific_cutoffs.get((element_b, element_a), default_cutoff),
        )
        bond_dict[(element_a, element_b)] = cutoff

    return bond_dict

generate_potential(atoms_dict: dict, potential_type: str = 'pmmcs', *, melt: bool = True) -> pd.DataFrame

Generate LAMMPS potential configuration for glass simulations.

Parameters:

Name Type Description Default
atoms_dict dict

Dictionary containing atomic structure information.

required
potential_type str

Type of potential to generate. Options are "pmmcs", "bjp", or "shik". (default is "pmmcs").

'pmmcs'
melt bool

Append a Langevin + NVE/limit melt run block (10000 steps). Only used when potential_type="shik".

True

Returns:

Type Description
DataFrame

DataFrame containing potential configuration.

Example

potential = generate_potential(struct_dict, potential_type="shik") potential = generate_potential(struct_dict, potential_type="shik", melt=False)

Source code in amorphouspy/src/amorphouspy/potentials/potential.py
def generate_potential(atoms_dict: dict, potential_type: str = "pmmcs", *, melt: bool = True) -> pd.DataFrame:
    """Generate LAMMPS potential configuration for glass simulations.

    Args:
        atoms_dict: Dictionary containing atomic structure information.
        potential_type: Type of potential to generate. Options are "pmmcs", "bjp", or "shik".
            (default is "pmmcs").
        melt: Append a Langevin + NVE/limit melt run block (10000 steps). Only used when
            ``potential_type="shik"``.

    Returns:
        DataFrame containing potential configuration.

    Example:
        >>> potential = generate_potential(struct_dict, potential_type="shik")
        >>> potential = generate_potential(struct_dict, potential_type="shik", melt=False)

    """
    if potential_type.lower() == "pmmcs":
        return pmmcs.generate_pmmcs_potential(atoms_dict)
    if potential_type.lower() == "bjp":
        return bjp.generate_bjp_potential(atoms_dict)
    if potential_type.lower() == "shik":
        return shik.generate_shik_potential(atoms_dict, melt=melt)
    msg = f"Unsupported potential type: {potential_type}"
    raise ValueError(msg)

get_ase_structure(atoms_dict: dict, replicate: tuple[int, int, int] = (1, 1, 1)) -> Atoms

Generate a LAMMPS data file format string and read into an ASE Atoms object.

Based on the specifications in the provided atoms_dict, this function generates a LAMMPS data file format string, which is then read into an ASE Atoms object. The ASE Atoms object is then returned. atoms_dict is expected to specify a cubic box. Triclinic boxes are not supported.

Parameters:

Name Type Description Default
atoms_dict dict

Dictionary that must contain the atom counts and box dimensions under the keys "atoms" and "box".

required
replicate tuple[int, int, int]

Replication factors for the box in x, y, and z directions. Default is (1, 1, 1), meaning no replication.

(1, 1, 1)

Returns:

Type Description
Atoms

ASE Atoms object of the specified structure.

Example

struct_dict = get_structure_dict({"CaO": 0.25, "Al2O3": 0.25, "SiO2": 0.5}, target_atoms=1000) atoms = get_ase_structure(struct_dict)

Source code in amorphouspy/src/amorphouspy/structure/geometry.py
def get_ase_structure(atoms_dict: dict, replicate: tuple[int, int, int] = (1, 1, 1)) -> Atoms:
    """Generate a LAMMPS data file format string and read into an ASE Atoms object.

    Based on the specifications in the provided atoms_dict,
    this function generates a LAMMPS data file
    format string, which is then read into an ASE Atoms object.
    The ASE Atoms object is then returned.
    atoms_dict is expected to specify a cubic box.
    Triclinic boxes are not supported.

    Args:
        atoms_dict: Dictionary that must contain the atom counts and box dimensions under the
            keys "atoms" and "box".
        replicate: Replication factors for the box in x, y, and z directions.
            Default is (1, 1, 1), meaning no replication.

    Returns:
        ASE Atoms object of the specified structure.

    Example:
        >>> struct_dict = get_structure_dict({"CaO": 0.25, "Al2O3": 0.25, "SiO2": 0.5}, target_atoms=1000)
        >>> atoms = get_ase_structure(struct_dict)

    """
    atoms = atoms_dict["atoms"]
    box_length = atoms_dict["box"]
    nx, ny, nz = replicate
    n_atoms_orig = len(atoms)

    element_to_type = get_element_types_dict(atoms)
    n_types = len(element_to_type)

    list_of_lines = []
    list_of_lines.append("LAMMPS data file via create_random_atoms and write_lammps_data\n\n")

    n_atoms = n_atoms_orig * nx * ny * nz
    list_of_lines.append(f"{n_atoms} atoms\n")
    list_of_lines.append(f"{n_types} atom types\n\n")

    new_box_length_x = box_length * nx
    new_box_length_y = box_length * ny
    new_box_length_z = box_length * nz

    list_of_lines.append(f"0.0 {new_box_length_x} xlo xhi\n")
    list_of_lines.append(f"0.0 {new_box_length_y} ylo yhi\n")
    list_of_lines.append(f"0.0 {new_box_length_z} zlo zhi\n\n")

    list_of_lines.append("Masses\n\n")
    for elem, type_id in element_to_type.items():
        mass = get_atomic_mass(elem)
        list_of_lines.append(f"{type_id} {mass} # {elem}\n")

    list_of_lines.append("\nAtoms\n\n")

    atom_id = 1
    for ix in range(nx):
        for iy in range(ny):
            for iz in range(nz):
                for atom in atoms:
                    elem = atom["element"]
                    type_id = element_to_type[elem]
                    x, y, z = atom["position"]
                    x_shifted = x + ix * box_length
                    y_shifted = y + iy * box_length
                    z_shifted = z + iz * box_length
                    q = 0.0
                    list_of_lines.append(
                        f"{atom_id} {type_id} {q:.6f} {x_shifted:.6f} {y_shifted:.6f} {z_shifted:.6f}\n"
                    )
                    atom_id += 1

    atoms_obj = read(
        filename=StringIO("".join(list_of_lines)),
        format="lammps-data",
        atom_style="charge",
    )
    if isinstance(atoms_obj, list):
        return cast("Atoms", atoms_obj[0])
    return atoms_obj

get_atomic_mass(element: str | int) -> float

Get the atomic mass of an element.

Parameters:

Name Type Description Default
element str | int

Chemical symbol or atomic number.

required

Returns:

Type Description
float

Atomic mass in g/mol.

Example

mass = get_atomic_mass("Si") print(mass) 28.085

Source code in amorphouspy/src/amorphouspy/mass.py
def get_atomic_mass(element: str | int) -> float:
    """Get the atomic mass of an element.

    Args:
        element: Chemical symbol or atomic number.

    Returns:
        Atomic mass in g/mol.

    Example:
        >>> mass = get_atomic_mass("Si")
        >>> print(mass)
        28.085

    """
    atomic_number = chemical_symbols.index(element) if isinstance(element, str) else element
    return atomic_masses_iupac2016[atomic_number]

get_composition(composition: dict[str, float], mode: str = 'molar') -> dict[str, float]

Convert a composition dictionary into normalized molar fractions.

Parameters:

Name Type Description Default
composition dict[str, float]

A dictionary mapping oxide formulas to their fractions, e.g. {"CaO": 0.25, "Al2O3": 0.25, "SiO2": 0.5}.

required
mode str

The interpretation mode, either 'molar' or 'weight'. Defaults to "molar".

'molar'

Returns:

Type Description
dict[str, float]

A dictionary mapping oxide formulas to their molar fractions.

Raises:

Type Description
ValueError

If mode is not 'molar' or 'weight'.

Source code in amorphouspy/src/amorphouspy/structure/composition.py
def get_composition(composition: dict[str, float], mode: str = "molar") -> dict[str, float]:
    """Convert a composition dictionary into normalized molar fractions.

    Args:
        composition: A dictionary mapping oxide formulas to their fractions,
            e.g. {"CaO": 0.25, "Al2O3": 0.25, "SiO2": 0.5}.
        mode: The interpretation mode, either 'molar' or 'weight'. Defaults to "molar".

    Returns:
        A dictionary mapping oxide formulas to their molar fractions.

    Raises:
        ValueError: If `mode` is not 'molar' or 'weight'.

    """
    if mode.lower() not in ("molar", "weight"):
        error_msg = f"Invalid mode: {mode}. Supported modes are 'molar' and 'weight'."
        raise ValueError(error_msg)
    raw = extract_composition(composition)
    if mode.lower() == "weight":
        return weight_percent_to_mol_fraction(raw)
    return normalize(raw)

get_glass_density_from_model(composition: dict[str, float]) -> float

Calculate the room-temperature glass density using Fluegel's empirical model.

The model uses a polynomial expansion based on mole percentages of oxides. Source: Fluegel, A. "Global Model for Calculating Room-Temperature Glass Density from the Composition", J. Am. Ceram. Soc., 90 [8] 2622-2635 (2007).

Parameters:

Name Type Description Default
composition dict[str, float]

A dictionary mapping oxide formulas to their fractions.

required

Returns:

Type Description
float

The calculated density in g/cm^3.

Raises:

Type Description
ValueError

If the composition contains components unsupported by the model or if the format is invalid.

Example

density = get_glass_density_from_model({"SiO2": 0.75, "Na2O": 0.25})

Source code in amorphouspy/src/amorphouspy/structure/density.py
def get_glass_density_from_model(composition: dict[str, float]) -> float:  # noqa: C901, PLR0912
    """Calculate the room-temperature glass density using Fluegel's empirical model.

    The model uses a polynomial expansion based on mole percentages of oxides.
    Source: Fluegel, A. "Global Model for Calculating Room-Temperature Glass Density from the Composition",
    J. Am. Ceram. Soc., 90 [8] 2622-2635 (2007).

    Args:
        composition: A dictionary mapping oxide formulas to their fractions.

    Returns:
        The calculated density in g/cm^3.

    Raises:
        ValueError: If the composition contains components unsupported by the model
            or if the format is invalid.

    Example:
        >>> density = get_glass_density_from_model({"SiO2": 0.75, "Na2O": 0.25})

    """
    COEFFICIENTS = {
        "b0": 2.121560704,
        "Al2O3": 0.010525974,
        "Al2O3_2": -0.000076924,
        "B2O3": 0.00579283,
        "B2O3_2": 0.000129174,
        "B2O3_3": -0.000019887,
        "Li2O": 0.012848733,
        "Li2O_2": -0.000276404,
        "Li2O_3": 0.00000259,
        "Na2O": 0.018129123,
        "Na2O_2": -0.000264838,
        "Na2O_3": 0.000001614,
        "K2O": 0.019177312,
        "K2O_2": -0.000319863,
        "K2O_3": 0.00000191,
        "MgO": 0.01210604,
        "MgO_2": -0.000061159,
        "CaO": 0.017992367,
        "CaO_2": -0.00005478,
        "SrO": 0.034630735,
        "SrO_2": -0.000086939,
        "BaO": 0.049879597,
        "BaO_2": -0.000168063,
        "ZnO": 0.025221567,
        "ZnO_2": 0.000099961,
        "PbO": 0.070020298,
        "PbO_2": 0.000214424,
        "PbO_3": -0.000001502,
        "FexOy": 0.036995747,
        "MnxOy": 0.016648722,
        "TiO2": 0.018820343,
        "ZrO2": 0.043059714,
        "ZrO2_2": -0.000779078,
        "CexOy": 0.061277268,
        "CdO": 0.052945783,
        "La2O3": 0.10643194,
        "Nd2O3": 0.090134135,
        "NiO": 0.024289113,
        "ThO2": 0.090253734,
        "UxOy": 0.063297404,
        "SbxOy": 0.044258719,
        "SO3": -0.044488661,
        "F": 0.00109839,
        "Cl": -0.006092537,
        "Remainder": 0.02514614,
        "Na2O_K2O": -0.000395491,
        "Na2O_Li2O": -0.00031449,
        "K2O_Li2O": -0.000329725,
        "Na2O_B2O3": 0.000242157,
        "K2O_B2O3": 0.000259927,
        "Li2O_B2O3": 0.000106359,
        "MgO_B2O3": -0.000206488,
        "CaO_B2O3": -0.000032258,
        "PbO_B2O3": -0.000186195,
        "FexOy_B2O3": -0.000720268,
        "ZrO2_B2O3": -0.000697195,
        "Al2O3_B2O3": -0.000735749,
        "Li2O_Al2O3": -0.000116227,
        "Na2O_Al2O3": -0.000253454,
        "K2O_Al2O3": -0.000371858,
        "MgO_CaO": 0.000057248,
        "MgO_Al2O3": 0.000167218,
        "MgO_ZnO": 0.000220766,
        "Li2O_CaO": -0.00008792,
        "Na2O_MgO": -0.000300745,
        "Na2O_CaO": -0.000228249,
        "Na2O_SrO": -0.00023137,
        "Na2O_BaO": -0.000171693,
        "K2O_MgO": -0.000337747,
        "K2O_CaO": -0.000349578,
        "K2O_SrO": -0.000425589,
        "K2O_BaO": -0.000392897,
        "Al2O3_CaO": -0.000102444,
        "Al2O3_PbO": -0.000651745,
        "Al2O3_TiO2": -0.000563594,
        "Al2O3_BaO": -0.000273835,
        "Al2O3_SrO": -0.000177761,
        "Al2O3_ZnO": -0.000109968,
        "Al2O3_ZrO2": -0.002381651,
        "Na2O_PbO": -0.000036455,
        "Na2O_TiO2": -0.00014331,
        "Na2O_ZnO": -0.000155275,
        "Na2O_ZrO2": -0.000126728,
        "Na2O_FexOy": -0.000371343,
        "K2O_PbO": -0.000525213,
        "K2O_TiO2": -0.000386587,
        "K2O_ZnO": -0.000329812,
        "CaO_PbO": -0.00084145,
        "ZnO_FexOy": -0.001536804,
        "Na2O_K2O_B2O3": -0.000032967,
        "Na2O_MgO_CaO": -0.000009143,
        "Na2O_MgO_Al2O3": -0.000012286,
        "Na2O_CaO_Al2O3": -0.000005106,
        "Na2O_CaO_PbO": 0.000100796,
        "K2O_MgO_CaO": -0.00001217,
        "K2O_MgO_Al2O3": -0.000041908,
        "K2O_CaO_Al2O3": -0.000012421,
        "K2O_CaO_PbO": 0.000125759,
        "MgO_CaO_Al2O3": -0.000011236,
        "CaO_Al2O3_Li2O": -0.000016177,
        "Al2O3_B2O3_PbO": 0.000030116,
    }
    try:
        mole_fractions = extract_composition(composition)
    except (ValueError, TypeError) as e:
        error_msg = f"Invalid composition {composition!r}: {e}"
        raise ValueError(error_msg) from e

    concentrations = {oxide: frac * 100 for oxide, frac in mole_fractions.items()}
    remainder_conc = 0.0
    main_components = []

    for oxide, conc in concentrations.items():
        if conc < 0:
            error_msg = f"Negative concentration for '{oxide}': {conc}"
            raise ValueError(error_msg)

        if oxide == "SiO2":
            continue

        if oxide in TRACE_OXIDES:
            remainder_conc += conc
        else:
            main_components.append(oxide)

    if remainder_conc > 0 and "Remainder" not in COEFFICIENTS:
        error_msg = "Trace oxides present but 'Remainder' coefficient missing"
        raise ValueError(error_msg)

    valid_components = set(COEFFICIENTS.keys()) - {"b0", "Remainder"}
    for comp in main_components:
        if comp not in valid_components:
            error_msg = f"Component '{comp}' not in density model coefficients"
            raise ValueError(error_msg)

    density = COEFFICIENTS.get("b0", 0)
    if remainder_conc > 0:
        density += COEFFICIENTS.get("Remainder", 0) * remainder_conc

    for comp in main_components:
        conc = concentrations[comp]
        if comp in COEFFICIENTS:
            density += COEFFICIENTS[comp] * conc
        quad_key = f"{comp}_2"
        if quad_key in COEFFICIENTS:
            density += COEFFICIENTS[quad_key] * (conc**2)
        cube_key = f"{comp}_3"
        if cube_key in COEFFICIENTS:
            density += COEFFICIENTS[cube_key] * (conc**3)

    TWO_WAY_KEY_PARTS = 1
    THREE_WAY_KEY_PARTS = 2
    two_way_keys = {k for k in COEFFICIENTS if k.count("_") == TWO_WAY_KEY_PARTS and not k.endswith(("_2", "_3"))}
    three_way_keys = {k for k in COEFFICIENTS if k.count("_") == THREE_WAY_KEY_PARTS}

    for key in two_way_keys:
        comp1, comp2 = key.split("_")
        if comp1 in main_components and comp2 in main_components:
            density += COEFFICIENTS[key] * concentrations[comp1] * concentrations[comp2]

    for key in three_way_keys:
        comp1, comp2, comp3 = key.split("_")
        if comp1 in main_components and comp2 in main_components and comp3 in main_components:
            density += COEFFICIENTS[key] * concentrations[comp1] * concentrations[comp2] * concentrations[comp3]

    return density

get_neighbors(atoms: Atoms | tuple[np.ndarray, np.ndarray, np.ndarray], cutoff: float | dict[tuple[int, int], float], target_types: list[int] | None = None, neighbor_types: list[int] | None = None, *, return_vectors: bool = False, use_numba: bool | None = None) -> list[tuple]

Find all neighbors within cutoff for each atom.

Returns a list of tuples where all IDs are the real atom IDs from the structure file (e.g. non-sequential LAMMPS/XYZ ids), not array indices.

Parameters:

Name Type Description Default
atoms Atoms | tuple[ndarray, ndarray, ndarray]

Either an ASE Atoms object or a tuple (coords, types, cell_matrix) where cell_matrix is a (3,3) array with lattice vectors as rows.

required
cutoff float | dict[tuple[int, int], float]

Cutoff radius in Angstrom. Either: - A single float applied uniformly to all pairs. - A dict mapping (atomic_number_i, atomic_number_j) to a pair-specific cutoff in Angstrom. Symmetric: (8, 14) and (14, 8) are equivalent. Pairs not listed default to the maximum cutoff in the dict. The cell list is built on the maximum cutoff so only one build is needed.

required
target_types list[int] | None

Atomic numbers of atoms to find neighbors for. None means all atoms.

None
neighbor_types list[int] | None

Atomic numbers that count as valid neighbors. None means all types.

None
return_vectors bool

If True, each output tuple gains a third element — a (k, 3) float64 array of Cartesian minimum-image bond vectors (i -> j) in Angstrom. Scalar distances are np.linalg.norm(vectors, axis=1).

False
use_numba bool | None

Force Numba on/off. None = auto-detect.

None

Returns:

Type Description
list[tuple]

If return_vectors=False (default): [(central_id, [neighbor_ids]), ...]

list[tuple]

If return_vectors=True: [(central_id, [neighbor_ids], vectors_shape_k3), ...]

Examples:

Scalar cutoff (backward compatible)::

>>> neighbors = get_neighbors(atoms, cutoff=3.5)
>>> for central_id, nn_ids in neighbors:
...     print(central_id, nn_ids)

Per-pair cutoffs for a Na2O-Al2O3-SiO2 glass::

>>> cutoff = {(14, 8): 2.0, (13, 8): 1.9, (11, 8): 2.7}
>>> neighbors = get_neighbors(atoms, cutoff=cutoff)

With bond vectors for Steinhardt parameters::

>>> result = get_neighbors(atoms, cutoff=3.5, return_vectors=True)
>>> for central_id, nn_ids, vecs in result:
...     distances = np.linalg.norm(vecs, axis=1)
...     print(f"atom {central_id}: mean bond length = {distances.mean():.3f} A")

Quick lookup by original atom ID::

>>> nl = {cid: nn for cid, nn, *_ in get_neighbors(atoms, cutoff=3.5)}
>>> nl[43586]
Source code in amorphouspy/src/amorphouspy/neighbors.py
def get_neighbors(
    atoms: Atoms | tuple[np.ndarray, np.ndarray, np.ndarray],
    cutoff: float | dict[tuple[int, int], float],
    target_types: list[int] | None = None,
    neighbor_types: list[int] | None = None,
    *,
    return_vectors: bool = False,
    use_numba: bool | None = None,
) -> list[tuple]:
    """Find all neighbors within cutoff for each atom.

    Returns a list of tuples where all IDs are the real atom IDs from the
    structure file (e.g. non-sequential LAMMPS/XYZ ids), not array indices.

    Args:
        atoms: Either an ASE Atoms object or a tuple (coords, types, cell_matrix)
               where cell_matrix is a (3,3) array with lattice vectors as rows.
        cutoff: Cutoff radius in Angstrom. Either:
                  - A single float applied uniformly to all pairs.
                  - A dict mapping (atomic_number_i, atomic_number_j) to a
                    pair-specific cutoff in Angstrom. Symmetric: (8, 14) and
                    (14, 8) are equivalent. Pairs not listed default to the
                    maximum cutoff in the dict. The cell list is built on the
                    maximum cutoff so only one build is needed.
        target_types: Atomic numbers of atoms to find neighbors for.
                      None means all atoms.
        neighbor_types: Atomic numbers that count as valid neighbors.
                        None means all types.
        return_vectors: If True, each output tuple gains a third element — a
                        (k, 3) float64 array of Cartesian minimum-image bond
                        vectors (i -> j) in Angstrom. Scalar distances are
                        np.linalg.norm(vectors, axis=1).
        use_numba: Force Numba on/off. None = auto-detect.

    Returns:
        If return_vectors=False (default):
            [(central_id, [neighbor_ids]), ...]

        If return_vectors=True:
            [(central_id, [neighbor_ids], vectors_shape_k3), ...]

    Examples:
        Scalar cutoff (backward compatible)::

            >>> neighbors = get_neighbors(atoms, cutoff=3.5)
            >>> for central_id, nn_ids in neighbors:
            ...     print(central_id, nn_ids)

        Per-pair cutoffs for a Na2O-Al2O3-SiO2 glass::

            >>> cutoff = {(14, 8): 2.0, (13, 8): 1.9, (11, 8): 2.7}
            >>> neighbors = get_neighbors(atoms, cutoff=cutoff)

        With bond vectors for Steinhardt parameters::

            >>> result = get_neighbors(atoms, cutoff=3.5, return_vectors=True)
            >>> for central_id, nn_ids, vecs in result:
            ...     distances = np.linalg.norm(vecs, axis=1)
            ...     print(f"atom {central_id}: mean bond length = {distances.mean():.3f} A")

        Quick lookup by original atom ID::

            >>> nl = {cid: nn for cid, nn, *_ in get_neighbors(atoms, cutoff=3.5)}
            >>> nl[43586]
    """
    if use_numba is None:
        use_numba = NUMBA_AVAILABLE

    # ------------------------------------------------------------------
    # Parse input
    # ------------------------------------------------------------------
    atom_ids = _extract_atom_ids(atoms)

    if not isinstance(atoms, Atoms):
        coords, types, cell = atoms
        coords = np.asarray(coords, dtype=np.float64)
        types = np.asarray(types, dtype=np.int32)
        cell = np.asarray(cell, dtype=np.float64)
    else:
        atoms_copy = atoms.copy()
        atoms_copy.wrap()
        coords = atoms_copy.get_positions()
        types = atoms_copy.get_atomic_numbers().astype(np.int32)
        cell = atoms_copy.get_cell().array

    n_atoms = len(coords)
    is_orthogonal = np.allclose(cell - np.diag(np.diag(cell)), 0.0, atol=1e-10)

    # ------------------------------------------------------------------
    # Parse cutoff
    # ------------------------------------------------------------------
    max_cutoff, pair_types, pair_cutoffs_sq, use_pair_cutoffs = _parse_cutoff(cutoff, types)
    cutoff_sq = max_cutoff * max_cutoff

    target_arr = np.array(target_types, dtype=np.int32) if target_types is not None else np.empty(0, dtype=np.int32)
    neighbor_arr = (
        np.array(neighbor_types, dtype=np.int32) if neighbor_types is not None else np.empty(0, dtype=np.int32)
    )
    use_target_filter = target_types is not None
    use_neighbor_filter = neighbor_types is not None

    initial_max_neighbors = _estimate_max_neighbors(coords, cell, max_cutoff)

    # ------------------------------------------------------------------
    # Build neighbor list
    # ------------------------------------------------------------------
    if is_orthogonal:
        box_size = np.diag(cell)
        atom_cells, n_cells, cell_start, cell_atoms = compute_cell_list_orthogonal(coords, box_size, max_cutoff)
        if use_numba and NUMBA_AVAILABLE:
            kwargs = {
                "coords": coords,
                "types": types,
                "box_size": box_size,
                "atom_cells": atom_cells,
                "n_cells": n_cells,
                "cell_start": cell_start,
                "cell_atoms": cell_atoms,
                "cutoff_sq": cutoff_sq,
                "target_types": target_arr,
                "neighbor_types": neighbor_arr,
                "use_target_filter": use_target_filter,
                "use_neighbor_filter": use_neighbor_filter,
                "max_neighbors": initial_max_neighbors,
                "pair_types": pair_types,
                "pair_cutoffs_sq": pair_cutoffs_sq,
                "use_pair_cutoffs": use_pair_cutoffs,
                "return_vectors": return_vectors,
            }
            raw_neighbor_list, raw_neighbor_counts, raw_vector_list = _build_nl_ortho_numba(**kwargs)
            idx_neighbors, vec_neighbors = _numba_to_list(
                raw_neighbor_list,
                raw_neighbor_counts,
                raw_vector_list,
                n_atoms,
                initial_max_neighbors,
                _build_nl_ortho_numba,
                kwargs,
                return_vectors,
            )
        else:
            idx_neighbors, vec_neighbors = _numpy_fallback(
                coords=coords,
                coords_frac=None,
                types=types,
                cell=cell,
                box_size=box_size,
                atom_cells=atom_cells,
                n_cells=n_cells,
                cutoff_sq=cutoff_sq,
                pair_types=pair_types,
                pair_cutoffs_sq=pair_cutoffs_sq,
                use_pair_cutoffs=use_pair_cutoffs,
                use_target_filter=use_target_filter,
                use_neighbor_filter=use_neighbor_filter,
                target_types=target_types,
                neighbor_types=neighbor_types,
                n_atoms=n_atoms,
                return_vectors=return_vectors,
                is_orthogonal=True,
            )
    else:
        coords_frac, atom_cells, n_cells, cell_start, cell_atoms = compute_cell_list_triclinic(coords, cell, max_cutoff)
        if use_numba and NUMBA_AVAILABLE:
            kwargs = {
                "coords_frac": coords_frac,
                "types": types,
                "cell": cell,
                "atom_cells": atom_cells,
                "n_cells": n_cells,
                "cell_start": cell_start,
                "cell_atoms": cell_atoms,
                "cutoff_sq": cutoff_sq,
                "target_types": target_arr,
                "neighbor_types": neighbor_arr,
                "use_target_filter": use_target_filter,
                "use_neighbor_filter": use_neighbor_filter,
                "max_neighbors": initial_max_neighbors,
                "pair_types": pair_types,
                "pair_cutoffs_sq": pair_cutoffs_sq,
                "use_pair_cutoffs": use_pair_cutoffs,
                "return_vectors": return_vectors,
            }
            raw_neighbor_list, raw_neighbor_counts, raw_vector_list = _build_nl_tri_numba(**kwargs)
            idx_neighbors, vec_neighbors = _numba_to_list(
                raw_neighbor_list,
                raw_neighbor_counts,
                raw_vector_list,
                n_atoms,
                initial_max_neighbors,
                _build_nl_tri_numba,
                kwargs,
                return_vectors,
            )
        else:
            idx_neighbors, vec_neighbors = _numpy_fallback(
                coords=coords,
                coords_frac=coords_frac,
                types=types,
                cell=cell,
                box_size=None,
                atom_cells=atom_cells,
                n_cells=n_cells,
                cutoff_sq=cutoff_sq,
                pair_types=pair_types,
                pair_cutoffs_sq=pair_cutoffs_sq,
                use_pair_cutoffs=use_pair_cutoffs,
                use_target_filter=use_target_filter,
                use_neighbor_filter=use_neighbor_filter,
                target_types=target_types,
                neighbor_types=neighbor_types,
                n_atoms=n_atoms,
                return_vectors=return_vectors,
                is_orthogonal=False,
            )

    # ------------------------------------------------------------------
    # Translate array indices -> real atom IDs
    # ------------------------------------------------------------------
    if return_vectors:
        return [
            (int(atom_ids[i]), [int(atom_ids[j]) for j in idx_neighbors[i]], vec_neighbors[i]) for i in range(n_atoms)
        ]
    return [(int(atom_ids[i]), [int(atom_ids[j]) for j in idx_neighbors[i]]) for i in range(n_atoms)]

get_structure_dict(composition: dict[str, float], n_molecules: int | None = None, target_atoms: int | None = None, mode: str = 'molar', density: float | None = None, min_distance: float = 1.6, max_attempts_per_atom: int = 10000) -> dict

Generate a structure dictionary for a given composition.

Supports both n_molecules and target_atoms input modes, and both molar and weight composition modes.

Parameters:

Name Type Description Default
composition dict[str, float]

A dictionary mapping oxide formulas to their fractions, e.g. {"CaO": 0.25, "Al2O3": 0.25, "SiO2": 0.5}.

required
n_molecules int | None

Total number of molecules (traditional mode).

None
target_atoms int | None

Target number of atoms (new mode).

None
mode str

Composition mode: "molar" for mol%, "weight" for weight%.

'molar'
density float | None

Density in g/cm^3, default is calculated from model.

None
min_distance float

Minimum distance between any two atoms in angstroms, default is 1.6 Å.

1.6
max_attempts_per_atom int

Maximum attempts to place an atom before giving up, default is 10000.

10000

Returns:

Type Description
dict

A dictionary containing: - "atoms": A list of atom dictionaries with keys "element" and "position" - "box": The length of the cubic box in angstroms - "formula_units": Dictionary of oxide formula units - "total_atoms": Total number of atoms - "element_counts": Dictionary of element counts (if target_atoms mode) - "mol_fraction": Dictionary of molar fractions (if target_atoms mode)

Example

struct_dict = get_structure_dict( ... composition={"CaO": 0.25, "Al2O3": 0.25, "SiO2": 0.5}, ... target_atoms=1000, ... mode="molar" ... )

Source code in amorphouspy/src/amorphouspy/structure/geometry.py
def get_structure_dict(
    composition: dict[str, float],
    n_molecules: int | None = None,
    target_atoms: int | None = None,
    mode: str = "molar",
    density: float | None = None,
    min_distance: float = 1.6,
    max_attempts_per_atom: int = 10000,
) -> dict:
    """Generate a structure dictionary for a given composition.

    Supports both n_molecules and target_atoms input modes,
    and both molar and weight composition modes.

    Args:
        composition: A dictionary mapping oxide formulas to their fractions,
            e.g. {"CaO": 0.25, "Al2O3": 0.25, "SiO2": 0.5}.
        n_molecules: Total number of molecules (traditional mode).
        target_atoms: Target number of atoms (new mode).
        mode: Composition mode: "molar" for mol%, "weight" for weight%.
        density: Density in g/cm^3, default is calculated from model.
        min_distance: Minimum distance between any two atoms in angstroms, default is 1.6 Å.
        max_attempts_per_atom: Maximum attempts to place an atom before giving up, default is 10000.

    Returns:
        A dictionary containing:
            - "atoms": A list of atom dictionaries with keys "element" and "position"
            - "box": The length of the cubic box in angstroms
            - "formula_units": Dictionary of oxide formula units
            - "total_atoms": Total number of atoms
            - "element_counts": Dictionary of element counts (if target_atoms mode)
            - "mol_fraction": Dictionary of molar fractions (if target_atoms mode)

    Example:
        >>> struct_dict = get_structure_dict(
        ...     composition={"CaO": 0.25, "Al2O3": 0.25, "SiO2": 0.5},
        ...     target_atoms=1000,
        ...     mode="molar"
        ... )

    """
    validate_target_mode(n_molecules, target_atoms)

    stoichiometry = extract_stoichiometry(composition)

    box_length = get_box_from_density(
        composition,
        n_molecules=n_molecules,
        target_atoms=target_atoms,
        mode=mode,
        density=density,
        stoichiometry=stoichiometry,
    )

    atoms_list, _ = create_random_atoms(
        composition,
        n_molecules=n_molecules,
        target_atoms=target_atoms,
        mode=mode,
        box_length=box_length,
        min_distance=min_distance,
        max_attempts_per_atom=max_attempts_per_atom,
        stoichiometry=stoichiometry,
    )

    if target_atoms is not None:
        system_plan = plan_system(composition, target_atoms, mode=mode, target_type="atoms")
        molecule_counts = system_plan["formula_units"]
        total_atoms = system_plan["total_atoms"]
        element_counts = system_plan["element_counts"]
        mol_fraction = system_plan["mol_fraction"]
    else:
        assert n_molecules is not None
        composition_dict = extract_composition(composition)
        molecule_counts = {oxide: round(frac * n_molecules) for oxide, frac in composition_dict.items()}
        diff = n_molecules - sum(molecule_counts.values())
        if diff:
            main = max(composition_dict, key=lambda ox: composition_dict[ox])
            molecule_counts[main] += diff

        total_atoms = 0
        element_counts: dict[str, int] = {}
        for ox, mol_cnt in molecule_counts.items():
            stoich = stoichiometry.get(ox)
            if stoich is None:
                error_msg = f"Unknown oxide formula: {ox}"
                raise KeyError(error_msg)
            for elem, num in stoich.items():
                element_counts[elem] = element_counts.get(elem, 0) + num * mol_cnt
            total_atoms += sum(stoich.values()) * mol_cnt

        mol_fraction = get_composition(composition, mode=mode)

    return {
        "atoms": atoms_list,
        "box": box_length,
        "formula_units": molecule_counts,
        "total_atoms": total_atoms,
        "element_counts": element_counts,
        "mol_fraction": mol_fraction,
    }

get_viscosity(result: dict[str, Any], timestep: float = 1.0, output_frequency: int = 1, max_lag: int | None = 1000000, cutoff_method: str = 'noise_threshold') -> dict[str, float]

Compute viscosity using the Green-Kubo formalism from MD stress data.

Parameters:

Name Type Description Default
result dict[str, Any]

Parsed output dictionary from viscosity_simulation.

required
timestep float

MD integration time step in femtoseconds.

1.0
output_frequency int

Number of MD steps between stored frames in the output.

1
max_lag int | None

Maximum correlation lag (number of steps). Defaults to 1,000,000.

1000000
cutoff_method str

Method passed to auto_cutoff for τ_acf detection. One of 'noise_threshold', 'cumulative_integral', or 'self_consistent'. Defaults to 'noise_threshold'.

'noise_threshold'

Returns:

Type Description
dict[str, float]

A dictionary containing: - temperature: Mean simulation temperature (K) - viscosity: Computed viscosity (Pa·s) - max_lag: Cutoff time per stress component (ps) - lag_time_ps: Array of lag times in picoseconds - sacf: Averaged normalized stress autocorrelation function - viscosity_integral: Cumulative viscosity integral (Pa·s) vs lag time

Example

result_dict = get_viscosity(simulation_output, timestep=1.0, output_frequency=1, max_lag=1000000) print(result_dict['viscosity']) print(result_dict['sacf']) print(result_dict['viscosity_integral'])

Source code in amorphouspy/src/amorphouspy/workflows/viscosity.py
def get_viscosity(  # noqa: PLR0915
    result: dict[str, Any],
    timestep: float = 1.0,  # fs (MD integration timestep)
    output_frequency: int = 1,  # number of MD steps between stored frames
    max_lag: int | None = 1_000_000,
    cutoff_method: str = "noise_threshold",
) -> dict[str, float]:
    """Compute viscosity using the Green-Kubo formalism from MD stress data.

    Args:
        result: Parsed output dictionary from `viscosity_simulation`.
        timestep: MD integration time step in femtoseconds.
        output_frequency: Number of MD steps between stored frames in the output.
        max_lag: Maximum correlation lag (number of steps). Defaults to 1,000,000.
        cutoff_method: Method passed to ``auto_cutoff`` for τ_acf detection. One of
            ``'noise_threshold'``, ``'cumulative_integral'``, or ``'self_consistent'``.
            Defaults to ``'noise_threshold'``.

    Returns:
        A dictionary containing:
            - temperature: Mean simulation temperature (K)
            - viscosity: Computed viscosity (Pa·s)
            - max_lag: Cutoff time per stress component (ps)
            - lag_time_ps: Array of lag times in picoseconds
            - sacf: Averaged normalized stress autocorrelation function
            - viscosity_integral: Cumulative viscosity integral (Pa·s) vs lag time

    Example:
        >>> result_dict = get_viscosity(simulation_output, timestep=1.0, output_frequency=1, max_lag=1000000)
        >>> print(result_dict['viscosity'])
        >>> print(result_dict['sacf'])
        >>> print(result_dict['viscosity_integral'])

    """
    kB = 1.380649e-23  # m^2 kg s^-2 K^-1
    A2m = 1e-10  # Å → m

    # --- Extract data ---
    pressures = result["result"]["pressures"]
    pxy = pressures[::output_frequency, 0, 1] * 1e9  # Pa
    pxz = pressures[::output_frequency, 0, 2] * 1e9  # Pa
    pyz = pressures[::output_frequency, 1, 2] * 1e9  # Pa

    volume = np.mean(result["result"]["volume"]) * A2m**3  # m^3
    temperature = np.mean(result["result"]["temperature"])  # K

    # --- Correct effective timestep ---
    dt_s = timestep * output_frequency * 1e-15  # seconds

    scale = volume / (kB * temperature)

    if max_lag is None:
        max_lag = len(pxy)

    # --- First ACF pass ---
    acfxy = autocorrelation_fft(pxy, max_lag)
    acfxz = autocorrelation_fft(pxz, max_lag)
    acfyz = autocorrelation_fft(pyz, max_lag)

    # enforce consistent length
    n = min(len(acfxy), len(acfxz), len(acfyz))
    acfxy = acfxy[:n]
    acfxz = acfxz[:n]
    acfyz = acfyz[:n]

    lag_time_s = np.arange(n) * dt_s
    lag_time_ps = lag_time_s * 1e12

    # --- Auto cutoff detection (with per-component fallback) ---
    cut_times_ps: list[float | tuple[float, int]] = []
    _detection_succeeded: list[bool] = []
    for _acf_norm, _label in [
        (acfxy / acfxy[0], "pxy"),
        (acfxz / acfxz[0], "pxz"),
        (acfyz / acfyz[0], "pyz"),
    ]:
        try:
            _c = auto_cutoff(_acf_norm, lag_time_ps, method=cutoff_method, epsilon=1e-4, min_cutoff=10)
            cut_times_ps.append(_c)
            _detection_succeeded.append(True)
        except ValueError:
            cut_times_ps.append(lag_time_ps[-1])  # fall back to last lag time
            _detection_succeeded.append(False)

    if not any(_detection_succeeded):
        msg = "tau_acf detection failed for all stress components"
        raise ValueError(msg)

    _successful_cut_times = [
        _cutoff_time_ps(c) for c, ok in zip(cut_times_ps, _detection_succeeded, strict=False) if ok
    ]
    tau_acf_ps = float(np.max(_successful_cut_times))

    # --- Convert cutoff times to lag indices ---
    total_time_ps = len(pxy) * timestep * output_frequency / 1000

    max_lag_1 = [
        get_closest_divisor(int(total_time_ps), int(_cutoff_time_ps(cut_times_ps[0]))) * 2,
        get_closest_divisor(int(total_time_ps), int(_cutoff_time_ps(cut_times_ps[1]))) * 2,
        get_closest_divisor(int(total_time_ps), int(_cutoff_time_ps(cut_times_ps[2]))) * 2,
    ]

    # convert ps → steps
    max_lag = int(np.max(max_lag_1) / (timestep * output_frequency / 1000))

    # --- Second ACF pass (final integration window) ---
    acfxy = autocorrelation_fft(pxy, max_lag)
    acfxz = autocorrelation_fft(pxz, max_lag)
    acfyz = autocorrelation_fft(pyz, max_lag)

    n = min(len(acfxy), len(acfxz), len(acfyz))
    acfxy = acfxy[:n]
    acfxz = acfxz[:n]
    acfyz = acfyz[:n]

    lag_time_s = np.arange(n) * dt_s
    lag_time_ps = lag_time_s * 1e12

    # --- Green-Kubo integration ---
    eta_xy = scale * cumulative_trapezoid(acfxy, dt=dt_s)
    eta_xz = scale * cumulative_trapezoid(acfxz, dt=dt_s)
    eta_yz = scale * cumulative_trapezoid(acfyz, dt=dt_s)

    eta_avg = (eta_xy + eta_xz + eta_yz) / 3
    viscosity = eta_avg[-1]

    # --- Normalized SACF ---
    sacf_xy = acfxy / acfxy[0]
    sacf_xz = acfxz / acfxz[0]
    sacf_yz = acfyz / acfyz[0]
    sacf_avg = ((sacf_xy + sacf_xz + sacf_yz) / 3).tolist()

    return {
        "temperature": temperature,
        "viscosity": viscosity,
        "max_lag": int(np.max(max_lag_1)),
        "lag_time_ps": lag_time_ps.tolist(),
        "sacf": sacf_avg,
        "viscosity_integral": eta_avg.tolist(),
        "tau_acf_ps": tau_acf_ps,
    }

load_lammps_dump(path: str | Path, type_map: dict[int, str] | None = None, *, frame: int | None = None, start: int | None = None, stop: int | None = None, step: int | None = None, return_atoms_dict: bool = False) -> Atoms | list[Atoms] | tuple[Atoms, dict[str, Any]] | list[tuple[Atoms, dict[str, Any]]]

Read a LAMMPS dump file and return ASE Atoms object(s) with correct chemical symbols.

By default the full trajectory is returned as a list. Pass frame to get a single frame, or start / stop / step to slice the trajectory.

Parameters:

Name Type Description Default
path str | Path

Path to the LAMMPS dump file.

required
type_map dict[int, str] | None

Mapping from LAMMPS integer type to element symbol, e.g. {1: "O", 2: "Si"}. When None, the element column stored in the dump file is used.

None
frame int | None

Zero-based index of a single frame to read. Mutually exclusive with start / stop / step.

None
start int | None

First frame index of the slice (inclusive, default 0).

None
stop int | None

Last frame index of the slice (exclusive, default end of file).

None
step int | None

Stride between selected frames (default 1).

None
return_atoms_dict bool

When True, also return amorphouspy atoms_dict objects alongside the ase.Atoms objects (default False).

False

Returns:

Type Description
Atoms | list[Atoms] | tuple[Atoms, dict[str, Any]] | list[tuple[Atoms, dict[str, Any]]]
  • Single frame (frame given, return_atoms_dict False): Atoms
Atoms | list[Atoms] | tuple[Atoms, dict[str, Any]] | list[tuple[Atoms, dict[str, Any]]]
  • Single frame (frame given, return_atoms_dict True): (Atoms, dict)
Atoms | list[Atoms] | tuple[Atoms, dict[str, Any]] | list[tuple[Atoms, dict[str, Any]]]
  • Multiple frames (return_atoms_dict False): list[Atoms]
Atoms | list[Atoms] | tuple[Atoms, dict[str, Any]] | list[tuple[Atoms, dict[str, Any]]]
  • Multiple frames (return_atoms_dict True): list[tuple[Atoms, dict]]
Atoms | list[Atoms] | tuple[Atoms, dict[str, Any]] | list[tuple[Atoms, dict[str, Any]]]

Each atoms_dict contains "atoms", "box", and "total_atoms".

Raises:

Type Description
ValueError

If frame is combined with start / stop / step.

ValueError

If type_map is None and the dump file does not contain an element column.

Example
Full trajectory

frames = load_lammps_dump("run.lammpstrj", type_map={1: "O", 2: "Si"})

Single frame

ase_atoms = load_lammps_dump("run.lammpstrj", type_map={1: "O", 2: "Si"}, frame=0)

Frames 30-49

frames = load_lammps_dump("run.lammpstrj", type_map={1: "O", 2: "Si"}, start=30, stop=50)

Every 10th frame from 0 to 99

frames = load_lammps_dump("run.lammpstrj", type_map={1: "O", 2: "Si"}, start=0, stop=100, step=10)

Source code in amorphouspy/src/amorphouspy/io_utils.py
def load_lammps_dump(
    path: str | Path,
    type_map: dict[int, str] | None = None,
    *,
    frame: int | None = None,
    start: int | None = None,
    stop: int | None = None,
    step: int | None = None,
    return_atoms_dict: bool = False,
) -> Atoms | list[Atoms] | tuple[Atoms, dict[str, Any]] | list[tuple[Atoms, dict[str, Any]]]:
    """Read a LAMMPS dump file and return ASE Atoms object(s) with correct chemical symbols.

    By default the full trajectory is returned as a list.  Pass *frame* to get a
    single frame, or *start* / *stop* / *step* to slice the trajectory.

    Args:
        path: Path to the LAMMPS dump file.
        type_map: Mapping from LAMMPS integer type to element symbol,
            e.g. ``{1: "O", 2: "Si"}``. When ``None``, the ``element``
            column stored in the dump file is used.
        frame: Zero-based index of a single frame to read.  Mutually exclusive
            with *start* / *stop* / *step*.
        start: First frame index of the slice (inclusive, default 0).
        stop: Last frame index of the slice (exclusive, default end of file).
        step: Stride between selected frames (default 1).
        return_atoms_dict: When ``True``, also return amorphouspy ``atoms_dict``
            objects alongside the ``ase.Atoms`` objects (default ``False``).

    Returns:
        - Single frame (``frame`` given, *return_atoms_dict* ``False``): ``Atoms``
        - Single frame (``frame`` given, *return_atoms_dict* ``True``): ``(Atoms, dict)``
        - Multiple frames (*return_atoms_dict* ``False``): ``list[Atoms]``
        - Multiple frames (*return_atoms_dict* ``True``): ``list[tuple[Atoms, dict]]``

        Each ``atoms_dict`` contains ``"atoms"``, ``"box"``, and ``"total_atoms"``.

    Raises:
        ValueError: If *frame* is combined with *start* / *stop* / *step*.
        ValueError: If *type_map* is ``None`` and the dump file does not contain
            an ``element`` column.

    Example:
        >>> # Full trajectory
        >>> frames = load_lammps_dump("run.lammpstrj", type_map={1: "O", 2: "Si"})
        >>> # Single frame
        >>> ase_atoms = load_lammps_dump("run.lammpstrj", type_map={1: "O", 2: "Si"}, frame=0)
        >>> # Frames 30-49
        >>> frames = load_lammps_dump("run.lammpstrj", type_map={1: "O", 2: "Si"}, start=30, stop=50)
        >>> # Every 10th frame from 0 to 99
        >>> frames = load_lammps_dump("run.lammpstrj", type_map={1: "O", 2: "Si"}, start=0, stop=100, step=10)

    """
    if frame is not None and any(x is not None for x in (start, stop, step)):
        msg = "'frame' cannot be combined with 'start', 'stop', or 'step'"
        raise ValueError(msg)

    single_frame = frame is not None
    index: int | slice = frame if frame is not None else slice(start, stop, step)

    raw = ase.io.read(str(path), format="lammps-dump-text", index=index)
    if single_frame:
        frames_list: list[Atoms] = [cast("Atoms", raw)]
    else:
        frames_list = [cast("Atoms", a) for a in (raw if isinstance(raw, list) else [raw])]

    def _apply_symbols(atoms: Atoms) -> None:
        if type_map is not None:
            atoms.set_chemical_symbols([type_map[int(t)] for t in atoms.arrays["type"]])
        elif "element" in atoms.arrays:
            atoms.set_chemical_symbols(list(atoms.arrays["element"]))
        else:
            msg = (
                "type_map is required when the dump file does not contain an 'element' column. "
                "Either pass type_map={1: 'O', 2: 'Si', ...} or add "
                "'dump_modify element O Si ...' to your LAMMPS input script."
            )
            raise ValueError(msg)

    for atoms in frames_list:
        _apply_symbols(atoms)

    if not return_atoms_dict:
        return frames_list[0] if single_frame else frames_list

    def _to_dict(atoms: Atoms) -> dict[str, Any]:
        box_length = float(atoms.get_cell()[0][0])
        atoms_list = [
            {"element": sym, "position": list(pos)}
            for sym, pos in zip(atoms.get_chemical_symbols(), atoms.get_positions(), strict=True)
        ]
        return {"atoms": atoms_list, "box": box_length, "total_atoms": len(atoms_list)}

    pairs = [(atoms, _to_dict(atoms)) for atoms in frames_list]
    return pairs[0] if single_frame else pairs

md_simulation(structure: Atoms, potential: pd.DataFrame, temperature_sim: float = 5000.0, timestep: float = 1.0, production_steps: int = 10000000, n_print: int = 1000, server_kwargs: dict | None = None, *, temperature_end: float | None = None, pressure: float | None = None, pressure_end: float | None = None, langevin: bool = False, seed: int = 12345, tmp_working_directory: str | Path | None = None) -> dict

Perform a molecular dynamics simulation using LAMMPS.

This function equilibrates a structure at a predefined temperature and pressure, with optional linear ramps for temperature and/or pressure over the course of the simulation.

Parameters:

Name Type Description Default
structure Atoms

The initial atomic structure to be melted and quenched.

required
potential DataFrame

The potential file to be used for the simulation.

required
temperature_sim float

Start temperature in K (or constant temperature when temperature_end is None).

5000.0
timestep float

Time step for integration in femtoseconds (default is 1.0 fs).

1.0
production_steps int

The number of steps for the production.

10000000
n_print int

The frequency of output during the simulation (default is 1000).

1000
server_kwargs dict | None

Additional arguments for the server.

None
temperature_end float | None

End temperature in K for a linear ramp from temperature_sim. If None, temperature is held constant at temperature_sim.

None
pressure float | None

Start pressure in GPa. If None, NVT ensemble is used. Provide a value (e.g. 0.0) to enable NPT.

None
pressure_end float | None

End pressure in GPa for a linear pressure ramp. Requires pressure to be set. If None, pressure is held constant at pressure.

None
langevin bool

Whether to use Langevin dynamics.

False
seed int

Random seed for velocity initialization (default is 12345). Ignored if initial_temperature is 0.

12345
tmp_working_directory str | Path | None

The directory where the simulation files will be stored.

None

Returns:

Type Description
dict

A dictionary containing the simulation steps and temperature data.

Source code in amorphouspy/src/amorphouspy/workflows/md.py
def md_simulation(
    structure: Atoms,
    potential: pd.DataFrame,
    temperature_sim: float = 5000.0,
    timestep: float = 1.0,
    production_steps: int = 10_000_000,
    n_print: int = 1000,
    server_kwargs: dict | None = None,
    *,
    temperature_end: float | None = None,
    pressure: float | None = None,
    pressure_end: float | None = None,
    langevin: bool = False,
    seed: int = 12345,
    tmp_working_directory: str | Path | None = None,
) -> dict:  # pylint: disable=too-many-positional-arguments
    """Perform a molecular dynamics simulation using LAMMPS.

    This function equilibrates a structure at a predefined temperature and pressure, with optional
    linear ramps for temperature and/or pressure over the course of the simulation.

    Args:
        structure: The initial atomic structure to be melted and quenched.
        potential: The potential file to be used for the simulation.
        temperature_sim: Start temperature in K (or constant temperature when ``temperature_end`` is None).
        timestep: Time step for integration in femtoseconds (default is 1.0 fs).
        production_steps: The number of steps for the production.
        n_print: The frequency of output during the simulation (default is 1000).
        server_kwargs: Additional arguments for the server.
        temperature_end: End temperature in K for a linear ramp from ``temperature_sim``.
            If None, temperature is held constant at ``temperature_sim``.
        pressure: Start pressure in GPa. If None, NVT ensemble is used.
            Provide a value (e.g. ``0.0``) to enable NPT.
        pressure_end: End pressure in GPa for a linear pressure ramp. Requires ``pressure`` to be set.
            If None, pressure is held constant at ``pressure``.
        langevin: Whether to use Langevin dynamics.
        seed: Random seed for velocity initialization (default is 12345). Ignored if ``initial_temperature`` is 0.
        tmp_working_directory: The directory where the simulation files will be stored.

    Returns:
        A dictionary containing the simulation steps and temperature data.

    """
    if potential.empty:
        msg = "No matching potential found for the given configuration."
        raise ValueError(msg)
    potential_name = potential.loc[0, "Name"]

    if potential_name.lower() == "shik":
        exclude_patterns = [
            "fix langevin all langevin 5000 5000 0.01 48279",
            "fix ensemble all nve/limit 0.5",
            "run 10000",
            "unfix langevin",
            "unfix ensemble",
        ]

        potential["Config"] = potential["Config"].apply(
            lambda lines: [line for line in lines if not any(p in line for p in exclude_patterns)]
        )

    structure_final, parsed_output = _run_lammps_md(
        structure=structure,
        potential=potential,
        tmp_working_directory=tmp_working_directory,
        temperature=temperature_sim,
        temperature_end=temperature_end,
        n_ionic_steps=production_steps,
        timestep=timestep,
        n_print=n_print,
        initial_temperature=temperature_sim,
        pressure=pressure,
        pressure_end=pressure_end,
        langevin=langevin,
        seed=seed,
        server_kwargs=server_kwargs,
    )

    result = parsed_output.get("generic", None)

    return {"structure": structure_final, "result": result}

melt_quench_simulation(structure: Atoms, potential: pd.DataFrame, temperature_high: float | None = None, temperature_low: float = 300.0, timestep: float = 1.0, heating_rate: float = 1000000000000.0, cooling_rate: float = 1000000000000.0, n_print: int = 1000, equilibration_steps: int | None = None, *, server_kwargs: dict | None = None, langevin: bool = False, seed: int = 12345, tmp_working_directory: str | Path | None = None) -> dict

Perform a melt-quench simulation using LAMMPS.

This function heats a structure to a high temperature, equilibrates it, and then cools it down to a low temperature, simulating a melt-quench process. The heating and cooling rates are given in K/s, and the conversion into simulation steps is done automatically.

Parameters:

Name Type Description Default
structure Atoms

The initial atomic structure to be melted and quenched.

required
potential DataFrame

The potential file to be used for the simulation.

required
temperature_high float | None

The high temperature to which the structure will be heated. If None, the protocol's default melt temperature is used (e.g. 4000 K for SHIK, 5000 K for others).

None
temperature_low float

The low temperature to which the structure will be cooled (default is 300.0 K).

300.0
timestep float

Time step for integration in femtoseconds (default is 1.0 fs).

1.0
heating_rate float

The rate at which the temperature is increased during the heating phase, in K/s (default is 1e12 K/s).

1000000000000.0
cooling_rate float

The rate at which the temperature is decreased during the cooling phase, in K/s (default is 1e12 K/s).

1000000000000.0
n_print int

The frequency of output during the simulation (default is 1000).

1000
equilibration_steps int | None

Override for all fixed equilibration stages inside the protocol. If None, each protocol uses its own hardcoded defaults.

None
server_kwargs dict | None

Additional keyword arguments for the server.

None
langevin bool

Whether to use Langevin dynamics.

False
seed int

Random seed for velocity initialization (default is 12345). Ignored if initial_temperature is 0.

12345
tmp_working_directory str | Path | None

Specifies the location of the temporary directory to run the simulations.

None

Returns:

Type Description
dict

A dictionary containing the simulation steps and temperature data.

Example

result = melt_quench_simulation( ... structure=my_atoms, ... potential=my_potential, ... temperature_high=5000.0, ... temperature_low=300.0, ... cooling_rate=1e12 ... )

Source code in amorphouspy/src/amorphouspy/workflows/meltquench.py
def melt_quench_simulation(
    structure: Atoms,
    potential: pd.DataFrame,
    temperature_high: float | None = None,
    temperature_low: float = 300.0,
    timestep: float = 1.0,
    heating_rate: float = 1e12,
    cooling_rate: float = 1e12,
    n_print: int = 1000,
    equilibration_steps: int | None = None,
    *,
    server_kwargs: dict | None = None,
    langevin: bool = False,
    seed: int = 12345,
    tmp_working_directory: str | Path | None = None,
) -> dict:  # pylint: disable=too-many-positional-arguments
    """Perform a melt-quench simulation using LAMMPS.

    This function heats a structure to a high temperature, equilibrates it,
    and then cools it down to a low temperature, simulating a melt-quench process.
    The heating and cooling rates are given in K/s, and the conversion into simulation steps is done automatically.

    Args:
        structure: The initial atomic structure to be melted and quenched.
        potential: The potential file to be used for the simulation.
        temperature_high: The high temperature to which the structure will be heated.
            If None, the protocol's default melt temperature is used (e.g. 4000 K for SHIK, 5000 K for others).
        temperature_low: The low temperature to which the structure will be cooled (default is 300.0 K).
        timestep: Time step for integration in femtoseconds (default is 1.0 fs).
        heating_rate: The rate at which the temperature is increased during the heating phase,
            in K/s (default is 1e12 K/s).
        cooling_rate: The rate at which the temperature is decreased during the cooling phase,
            in K/s (default is 1e12 K/s).
        n_print: The frequency of output during the simulation (default is 1000).
        equilibration_steps: Override for all fixed equilibration stages inside the protocol.
            If None, each protocol uses its own hardcoded defaults.
        server_kwargs: Additional keyword arguments for the server.
        langevin: Whether to use Langevin dynamics.
        seed: Random seed for velocity initialization (default is 12345). Ignored if `initial_temperature` is 0.
        tmp_working_directory: Specifies the location of the temporary directory to run the simulations.

    Returns:
        A dictionary containing the simulation steps and temperature data.

    Example:
        >>> result = melt_quench_simulation(
        ...     structure=my_atoms,
        ...     potential=my_potential,
        ...     temperature_high=5000.0,
        ...     temperature_low=300.0,
        ...     cooling_rate=1e12
        ... )

    """
    seconds_to_femtos = 1e15
    potential_name = potential.loc[0, "Name"].lower()

    if temperature_high is None:
        temperature_high = DEFAULT_MELT_TEMPERATURES.get(potential_name, 5000.0)

    heating_steps = int(((temperature_high - temperature_low) / (timestep * heating_rate)) * seconds_to_femtos)
    cooling_steps = int(((temperature_high - temperature_low) / (timestep * cooling_rate)) * seconds_to_femtos)

    # Check if protocol exists
    if potential_name not in PROTOCOL_MAP:
        available = ", ".join(PROTOCOL_MAP.keys())
        msg = f"Unknown potential: {potential_name}. Available protocols: {available}"
        raise ValueError(msg)

    # Create parameters dataclass
    params = MeltQuenchParams(
        structure=structure,
        potential=potential,
        temperature_high=temperature_high,
        temperature_low=temperature_low,
        heating_steps=heating_steps,
        cooling_steps=cooling_steps,
        timestep=timestep,
        n_print=n_print,
        langevin=langevin,
        seed=seed,
        server_kwargs=server_kwargs,
        tmp_working_directory=tmp_working_directory,
        equilibration_steps=equilibration_steps,
    )

    # Run the protocol using the function-based approach
    protocol_func = PROTOCOL_MAP[potential_name]
    structure_final, history = protocol_func(_run_lammps_md, params)

    return {
        "structure": structure_final,
        "result": history,
    }

parse_formula(formula: str) -> dict[str, int]

Parse a chemical formula and returns a dictionary of element counts.

Parameters:

Name Type Description Default
formula str

A string representing the chemical formula (e.g., "Al2O3").

required

Returns:

Name Type Description
dict[str, int]

A dictionary mapping element symbols to their counts.

Example dict[str, int]

{"Al": 2, "O": 3} for "Al2O3".

Source code in amorphouspy/src/amorphouspy/structure/composition.py
def parse_formula(formula: str) -> dict[str, int]:
    """Parse a chemical formula and returns a dictionary of element counts.

    Args:
        formula: A string representing the chemical formula (e.g., "Al2O3").

    Returns:
        A dictionary mapping element symbols to their counts.
        Example: {"Al": 2, "O": 3} for "Al2O3".

    """
    counts: dict[str, int] = {}
    for elem, cnt_str in ELEMENT.findall(formula):
        cnt = int(cnt_str) if cnt_str else 1
        counts[elem] = counts.get(elem, 0) + cnt
    return counts

plan_system(composition: dict[str, float], target: int, mode: str = 'molar', target_type: str = 'atoms') -> dict

Generate a comprehensive plan for the system composition and size.

This unified planner handles both 'atoms' and 'molecules' target types, converting them into a concrete allocation of formula units and atoms.

Parameters:

Name Type Description Default
composition dict[str, float]

A dictionary mapping oxide formulas to their fractions.

required
target int

The target count (atoms or molecules depending on target_type).

required
mode str

The composition mode ('molar' or 'weight'). Defaults to "molar".

'molar'
target_type str

The type of target ('atoms' or 'molecules'). Defaults to "atoms".

'atoms'

Returns:

Type Description
dict

A dictionary containing: - "mol_fraction": Dictionary of molar fractions. - "formula_units": Dictionary of allocated integer formula units. - "total_atoms": The actual total number of atoms. - "element_counts": Dictionary of total counts per element.

Raises:

Type Description
ValueError

If target_type is not 'atoms' or 'molecules'.

Source code in amorphouspy/src/amorphouspy/structure/planner.py
def plan_system(composition: dict[str, float], target: int, mode: str = "molar", target_type: str = "atoms") -> dict:
    """Generate a comprehensive plan for the system composition and size.

    This unified planner handles both 'atoms' and 'molecules' target types,
    converting them into a concrete allocation of formula units and atoms.

    Args:
        composition: A dictionary mapping oxide formulas to their fractions.
        target: The target count (atoms or molecules depending on `target_type`).
        mode: The composition mode ('molar' or 'weight'). Defaults to "molar".
        target_type: The type of target ('atoms' or 'molecules'). Defaults to "atoms".

    Returns:
        A dictionary containing:
            - "mol_fraction": Dictionary of molar fractions.
            - "formula_units": Dictionary of allocated integer formula units.
            - "total_atoms": The actual total number of atoms.
            - "element_counts": Dictionary of total counts per element.

    Raises:
        ValueError: If `target_type` is not 'atoms' or 'molecules'.

    """
    mol_frac = get_composition(composition, mode=mode)

    if target_type == "atoms":
        target_atoms = target
    elif target_type == "molecules":
        total_atoms_per_mol = sum(fraction * sum(parse_formula(oxide).values()) for oxide, fraction in mol_frac.items())
        target_atoms = round(target * total_atoms_per_mol)
    else:
        error_msg = f"Invalid target_type: {target_type}. Supported types are 'atoms' and 'molecules'."
        raise ValueError(error_msg)

    ni, n_atoms = allocate_formula_units_to_target_atoms(mol_frac, target_atoms)
    elem_counts = element_counts_from_formula_units(ni)
    return {
        "mol_fraction": mol_frac,
        "formula_units": ni,
        "total_atoms": n_atoms,
        "element_counts": elem_counts,
    }

plot_analysis_results_plotly(structure_data: StructureData) -> go.Figure

Generate interactive Plotly plots for structural analysis results.

Parameters:

Name Type Description Default
structure_data StructureData

Structural analysis results.

required

Returns:

Type Description
Figure

Interactive Plotly figure object.

Example

fig = plot_analysis_results_plotly(structure_data) fig.show()

Source code in amorphouspy/src/amorphouspy/workflows/structural_analysis.py
def plot_analysis_results_plotly(structure_data: StructureData) -> go.Figure:
    """Generate interactive Plotly plots for structural analysis results.

    Args:
        structure_data: Structural analysis results.

    Returns:
        Interactive Plotly figure object.

    Example:
        >>> fig = plot_analysis_results_plotly(structure_data)
        >>> fig.show()

    """
    # Create subplot layout (4x3 grid)
    fig = make_subplots(
        rows=4,
        cols=3,
        subplot_titles=[
            "Oxygen Coordination",
            "Former Coordination",
            "Modifier Coordination",
            "Q^n Distribution",
            "Bond Angles",
            "Ring Statistics",
            "O-O RDF",
            "Former-O RDFs",
            "Modifier-O RDFs",
            "Total S(q) — Neutron",
            "Total S(q) — X-ray",
            "Partial S(q)",
        ],
        specs=[
            [{"type": "bar"}, {"type": "bar"}, {"type": "bar"}],
            [{"type": "bar"}, {"type": "scatter"}, {"type": "bar"}],
            [{"type": "scatter"}, {"type": "scatter"}, {"type": "scatter"}],
            [{"type": "scatter"}, {"type": "scatter"}, {"type": "scatter"}],
        ],
        vertical_spacing=0.08,
        horizontal_spacing=0.08,
    )

    fig.update_annotations(font_size=12, font_family="Arial, sans-serif")

    colors = ["#0072B2", "#E69F00", "#009E73", "#D55E00", "#CC79A7", "#56B4E9", "#F0E442"]

    _add_coordination_plots(fig, structure_data, colors)
    _add_network_plots(fig, structure_data, colors)
    _add_ring_plots(fig, structure_data, colors)
    _add_rdf_plots(fig, structure_data, colors)
    _add_structure_factor_plots(fig, structure_data, colors)

    # Position legends inside each subplot (top-right corner)
    axis_to_legend: dict[str, str] = {}
    legend_layout: dict[str, dict] = {}

    for idx in range(1, 13):
        ax_suffix = str(idx) if idx > 1 else ""
        x_ref = f"x{ax_suffix}"
        legend_name = f"legend{idx}" if idx > 1 else "legend"

        x_domain = getattr(fig.layout, f"xaxis{ax_suffix}").domain
        y_domain = getattr(fig.layout, f"yaxis{ax_suffix}").domain

        axis_to_legend[x_ref] = legend_name
        legend_layout[legend_name] = {
            "x": x_domain[1],
            "y": y_domain[1],
            "xanchor": "right",
            "yanchor": "top",
            "bgcolor": "rgba(255,255,255,0.7)",
            "font": {"size": 10},
        }

    for trace in fig.data:
        x_ref = trace.xaxis or "x"
        if x_ref in axis_to_legend:
            trace.update(legend=axis_to_legend[x_ref])

    # Update layout and axes
    fig.update_layout(
        height=1300,
        width=1300,
        showlegend=True,
        template="plotly_white",
        font={"family": "Arial, sans-serif", "size": 12},
        title_text="Structural Analysis",
        title_font_size=16,
        **legend_layout,
    )

    # Row 1: Coordination plots
    fig.update_xaxes(title_text="O_n", row=1, col=1)
    fig.update_yaxes(title_text="Percentage", row=1, col=1)
    fig.update_xaxes(title_text="Former_n", row=1, col=2)
    fig.update_yaxes(title_text="Percentage", row=1, col=2)
    fig.update_xaxes(title_text="Modifier_n", row=1, col=3)
    fig.update_yaxes(title_text="Percentage", row=1, col=3)

    # Row 2: Q^n, Bond angles, Ring statistics
    fig.update_xaxes(title_text="Q^n", row=2, col=1)
    fig.update_yaxes(title_text="Percentage", row=2, col=1)
    fig.update_xaxes(title_text="Angle (degrees)", row=2, col=2)
    fig.update_yaxes(title_text="Frequency", row=2, col=2)
    fig.update_xaxes(title_text="Former atoms in ring", row=2, col=3)
    fig.update_yaxes(title_text="Normalized count", row=2, col=3)

    # Row 3: RDF plots
    for col in [1, 2, 3]:
        fig.update_xaxes(title_text="r (Å)", range=[0, 10], row=3, col=col)
        fig.update_yaxes(title_text="g(r), CN(r)", range=[0, 25], row=3, col=col)

    # Row 4: Structure factor
    fig.update_xaxes(title_text="q (Å⁻¹)", row=4, col=1)
    fig.update_yaxes(title_text="S(q)", row=4, col=1)
    fig.update_xaxes(title_text="q (Å⁻¹)", row=4, col=2)
    fig.update_yaxes(title_text="S(q)", row=4, col=2)
    fig.update_xaxes(title_text="q (Å⁻¹)", row=4, col=3)
    fig.update_yaxes(title_text="S(q)", row=4, col=3)

    # Global grid and axis line styling (applied last so it doesn't conflict with per-axis overrides)
    fig.update_xaxes(showgrid=True, gridcolor="rgba(180,180,180,0.3)", zeroline=False, showline=True, mirror=False)
    fig.update_yaxes(showgrid=True, gridcolor="rgba(180,180,180,0.3)", zeroline=False, showline=True, mirror=False)

    return fig

running_mean(data: list | np.ndarray, n: int) -> np.ndarray

Calculate running mean of an array-like dataset.

The initial and final values of the returned array are NaN, as the running mean is not defined for those points.

Parameters:

Name Type Description Default
data list | ndarray

Input data for which the running mean should be calculated.

required
n int

Width of the averaging window.

required

Returns:

Type Description
ndarray

Array of same size as input data containing the running mean values.

Source code in amorphouspy/src/amorphouspy/shared.py
def running_mean(data: list | np.ndarray, n: int) -> np.ndarray:
    """Calculate running mean of an array-like dataset.

    The initial and final values of the returned array are NaN, as the running mean is not defined
    for those points.

    Args:
        data: Input data for which the running mean should be calculated.
        n: Width of the averaging window.

    Returns:
        Array of same size as input data containing the running mean values.

    """
    data = np.asarray(data)
    if n == 1:
        return data
    ret_array = np.full(data.size, np.nan)
    pad_left = int(n / 2)
    pad_right = n - pad_left - 1
    ret_array[pad_left:-pad_right] = np.convolve(data, np.ones((n,)) / n, mode="valid")
    return ret_array

structure_from_parsed_output(initial_structure: Atoms, parsed_output: dict, *, wrap: bool = False) -> Atoms

Construct an Atoms object from parsed output data.

Parameters:

Name Type Description Default
initial_structure Atoms

The initial atomic structure to use as a template.

required
parsed_output dict

Parsed output containing atomic positions, cell, and indices.

required
wrap bool

Whether to wrap the atomic positions to the simulation cell (default is False). Keeping the unwrapped positions is more beneficial if structures are passed between different LAMMPS simulations in one workflow to ensure continuity.

False

Returns:

Type Description
Atoms

An Atoms object with updated positions and cell.

Example

new_atoms = structure_from_parsed_output(atoms, lammps_output)

Source code in amorphouspy/src/amorphouspy/io_utils.py
def structure_from_parsed_output(initial_structure: Atoms, parsed_output: dict, *, wrap: bool = False) -> Atoms:
    """Construct an `Atoms` object from parsed output data.

    Args:
        initial_structure: The initial atomic structure to use as a template.
        parsed_output: Parsed output containing atomic positions, cell, and indices.
        wrap: Whether to wrap the atomic positions to the simulation cell (default is False).
            Keeping the unwrapped positions is more beneficial if structures are passed between
            different LAMMPS simulations in one workflow to ensure continuity.

    Returns:
        An `Atoms` object with updated positions and cell.

    Example:
        >>> new_atoms = structure_from_parsed_output(atoms, lammps_output)

    """
    # Take a copy of the initial structure as template and update the relevant properties
    atoms_copy = initial_structure.copy()
    atoms_copy.set_array("indices", parsed_output["generic"]["indices"][-1])
    atoms_copy.set_positions(parsed_output["generic"]["positions"][-1])
    atoms_copy.set_velocities(parsed_output["generic"]["velocities"][-1])
    atoms_copy.set_cell(parsed_output["generic"]["cells"][-1])
    atoms_copy.set_pbc(True)
    if wrap:
        atoms_copy.wrap()

    return atoms_copy

temperature_scan_simulation(structure: Atoms, potential: str, temperature: list[int | float] | None = None, pressure: float = 0.0001, timestep: float = 1.0, equilibration_steps: int = 100000, production_steps: int = 200000, n_dump: int = 100000, n_log: int = 10, server_kwargs: dict[str, Any] | None = None, *, aniso: bool = False, seed: int | None = 12345, tmp_working_directory: str | Path | None = None) -> dict[Any, Any]

Perform a temperature scan and collect structural data.

This workflow performs a temperature scan at the given list of temperatures. For each temperature, it equilibrates the structure at the target temperature and pressure and then performs a production MD run to collect the average volume and box lengths, needed to compute the CTE via V-T data. There differen CTEs often discussed, for example: - CTE20-300: Computed between solely as the slope from the two datapoints at 20°C and 300°C. - CTE20-600: Computed between solely as the slope from the two datapoints at 20°C and 600°C. - CTE over other arbitrary temperature range - CTE at a specific temperature based on the slope of the V-T curve at this temperature. This is often doen by fitting a linear model or higher polynomials fit to the V-T data and then taking the derivative at the temperature of interest. Because of the various options and methods to compute the CTE from V-T data, and because the actual CTE calculation is rather straightforward once the data is collected, we do not compute it directly in this workflow, but rather return the collected data for each temperature and leave the actual CTE calculation to the user.

Parameters:

Name Type Description Default
structure Atoms

Input structure (assumed pre-equilibrated).

required
potential str

LAMMPS potential file.

required
temperature list[int | float] | None

Simulation temperature in Kelvin (default 300 K).

None
pressure float

Target pressure in GPa for NPT simulations. (default 10-4 GPa = 10^5 Pa = 1 bar).

0.0001
timestep float

MD integration timestep in femtoseconds (default 1.0 fs).

1.0
equilibration_steps int

Number of MD steps for the equilibration runs (default 100,000).

100000
production_steps int

Number of MD steps for the production runs (default 200,000).

200000
n_dump int

Dump output frequency of the production runs (default 100,000).

100000
n_log int

Log output frequency (default 10).

10
server_kwargs dict[str, Any] | None

Additional server configuration arguments.

None
aniso bool

If false, an isotropic NPT calculation is performed and the simulation box is scaled uniformly. If True, anisotropic NPT calculation is performed and the simulation box can change shape and size independently along each axis (default False).

False
seed int | None

Random seed for velocity initialization (default 12345).

12345
tmp_working_directory str | Path | None

Temporary directory for job execution.

None

Returns:

Type Description
dict[Any, Any]

Nested dictionary containing collected output of the simulations. The main keys are the

dict[Any, Any]

temperature steps in the format "01_300K", "02_400K", etc. Under each temperature key,

dict[Any, Any]

the dictionary contains another dictionary with keys "run01", "run02", ... for each

dict[Any, Any]

production run. Under each run key, the dictionary contains the parsed output from the

dict[Any, Any]

production run as well as computed CTE values and other thermodynamic averages.

dict[Any, Any]

Structure is:

dict[Any, Any]

{ "01_300K" : { "run01" : { "CTE_V" : ..., "CTE_x" : ..., "CTE_y" : ..., "CTE_z" : ..., etc}, "run02" : { "CTE_V" : ..., "CTE_x" : ..., "CTE_y" : ..., "CTE_z" : ..., etc}, ... }, "02_400K" : { "run01" : { "CTE_V" : ..., "CTE_x" : ..., "CTE_y" : ..., "CTE_z" : ..., etc}, "run02" : { "CTE_V" : ..., "CTE_x" : ..., "CTE_y" : ..., "CTE_z" : ..., etc}, ... }, ...

dict[Any, Any]

}

dict[Any, Any]

On the lowest level, the structure is the same as returned by _cte_fluctuation_workflow_analysis.

dict[Any, Any]

Additionally, on the run key level, the following entries are added for bookkeeping:

dict[Any, Any]

"is_converged" : bool # Whether convergence was reached within the max_production_runs

dict[Any, Any]

"convergence_criterion" : float # The convergence criterion

dict[Any, Any]

"structure_final" : Atoms # Final structure at this temperature

Notes
  • For every temperature, the structure is first pre-equilibrated with short (10 ps) NVT.
  • Simulation settings for the NVT equilibration run are hard-coded
  • CTEs are calculated sequentially if a list of temperatures is provided. Alternatively, multiple jobs with independent temperatures can be submitted to achieve parallelization.
Example

result = cte_simulation( ... structure=my_atoms, ... potential=my_potential, ... temperature=[300, 400, 500], ... production_steps=500000 ... )

Source code in amorphouspy/src/amorphouspy/workflows/cte.py
def temperature_scan_simulation(
    structure: Atoms,
    potential: str,
    temperature: list[int | float] | None = None,
    pressure: float = 1e-4,
    timestep: float = 1.0,
    equilibration_steps: int = 100_000,
    production_steps: int = 200_000,
    n_dump: int = 100000,
    n_log: int = 10,
    server_kwargs: dict[str, Any] | None = None,
    *,
    aniso: bool = False,
    seed: int | None = 12345,
    tmp_working_directory: str | Path | None = None,
) -> dict[Any, Any]:  # pylint: disable=too-many-positional-arguments
    """Perform a temperature scan and collect structural data.

    This workflow performs a temperature scan at the given list of temperatures. For each temperature, it
    equilibrates the structure at the target temperature and pressure and then performs a production MD
    run to collect the average volume and box lengths, needed to compute the CTE via V-T data.
    There differen CTEs often discussed, for example:
    - CTE20-300: Computed between solely as the slope from the two datapoints at 20°C and 300°C.
    - CTE20-600: Computed between solely as the slope from the two datapoints at 20°C and 600°C.
    - CTE over other arbitrary temperature range
    - CTE at a specific temperature based on the slope of the V-T curve at this temperature. This is often
      doen by fitting a linear model or higher polynomials fit to the V-T data and then taking the derivative
      at the temperature of interest.
    Because of the various options and methods to compute the CTE from V-T data, and because the actual CTE
    calculation is rather straightforward once the data is collected, we do not compute it directly in this
    workflow, but rather return the collected data for each temperature and leave the actual CTE calculation
    to the user.

    Args:
        structure: Input structure (assumed pre-equilibrated).
        potential: LAMMPS potential file.
        temperature: Simulation temperature in Kelvin (default 300 K).
        pressure: Target pressure in GPa for NPT simulations.
            (default 10-4 GPa = 10^5 Pa = 1 bar).
        timestep: MD integration timestep in femtoseconds (default 1.0 fs).
        equilibration_steps: Number of MD steps for the equilibration runs (default 100,000).
        production_steps: Number of MD steps for the production runs (default 200,000).
        n_dump: Dump output frequency of the production runs (default 100,000).
        n_log: Log output frequency (default 10).
        server_kwargs: Additional server configuration arguments.
        aniso: If false, an isotropic NPT calculation is performed and the simulation box is scaled uniformly.
            If True, anisotropic NPT calculation is performed and the simulation box can change shape and
            size independently along each axis (default False).
        seed: Random seed for velocity initialization (default 12345).
        tmp_working_directory: Temporary directory for job execution.

    Returns:
        Nested dictionary containing collected output of the simulations. The main keys are the
        temperature steps in the format "01_300K", "02_400K", etc. Under each temperature key,
        the dictionary contains another dictionary with keys "run01", "run02", ... for each
        production run. Under each run key, the dictionary contains the parsed output from the
        production run as well as computed CTE values and other thermodynamic averages.
        Structure is:
        {   "01_300K" : { "run01" : { "CTE_V" : ..., "CTE_x" : ..., "CTE_y" : ..., "CTE_z" : ..., etc},
                          "run02" : { "CTE_V" : ..., "CTE_x" : ..., "CTE_y" : ..., "CTE_z" : ..., etc},
                          ...
                      },
            "02_400K" : { "run01" : { "CTE_V" : ..., "CTE_x" : ..., "CTE_y" : ..., "CTE_z" : ..., etc},
                          "run02" : { "CTE_V" : ..., "CTE_x" : ..., "CTE_y" : ..., "CTE_z" : ..., etc},
                          ...
                      },
            ...
        }
        On the lowest level, the structure is the same as returned by `_cte_fluctuation_workflow_analysis`.
        Additionally, on the run key level, the following entries are added for bookkeeping:
        "is_converged" : bool            # Whether convergence was reached within the max_production_runs
        "convergence_criterion" : float  # The convergence criterion
        "structure_final" : Atoms        # Final structure at this temperature

    Notes:
        - For every temperature, the structure is first pre-equilibrated with short (10 ps) NVT.
        - Simulation settings for the NVT equilibration run are hard-coded
        - CTEs are calculated sequentially if a list of temperatures is provided. Alternatively, multiple
          jobs with independent temperatures can be submitted to achieve parallelization.

    Example:
        >>> result = cte_simulation(
        ...     structure=my_atoms,
        ...     potential=my_potential,
        ...     temperature=[300, 400, 500],
        ...     production_steps=500000
        ... )

    """
    # Logging setup
    logger = _create_logger()

    if temperature is None:
        temperature = [300, 400, 500, 600]

    # Check and adjust input parameters if necessary
    n_dump = _temperature_scan_input_checker(temperature, production_steps, n_dump, logger)

    # Set pressure to anisotropic if requested
    sim_pressure = [pressure, pressure, pressure, None, None, None] if aniso else pressure

    # initial structure used. Afterwards, it is updated after each temperature
    structure0 = structure.copy()

    # Initialize results dictionary. CTE values will be calculated later
    results = _initialize_datadict(with_CTE_keys=False)

    # Loop over all temperatures
    for counter_run, T in enumerate(temperature, start=1):
        # Stage 1: Short equilibration in NVT at T for 10,000 steps
        nvt_equilibration_time = 10_000 / timestep / 1000
        msg = f"Starting {nvt_equilibration_time:.1f} ps (10,000 steps hardcoded) NVT equilibration at {T:.2f} K."
        logger.info(msg)

        structure1, _ = _run_lammps_md(
            structure=structure0,
            potential=potential,
            tmp_working_directory=tmp_working_directory,
            temperature=T,
            n_ionic_steps=10_000,
            timestep=timestep,
            n_dump=10_000,
            n_log=100,
            initial_temperature=T,
            langevin=False,
            seed=seed,
            server_kwargs=server_kwargs,
        )

        # Stage 2: NPT equilibration runs at T,p.
        equilibration_time = equilibration_steps / timestep / 1000
        msg = f"Starting {equilibration_time:.1f} ps NPT equilibration at {T:.2f} K and {pressure:.2e} GPa."
        logger.info(msg)

        structure2, _ = _run_lammps_md(
            structure=structure1,
            potential=potential,
            tmp_working_directory=tmp_working_directory,
            temperature=T,
            pressure=sim_pressure,
            n_ionic_steps=equilibration_steps,
            timestep=timestep,
            n_dump=equilibration_steps,
            n_log=n_log,
            initial_temperature=0,
            langevin=True,
            server_kwargs=server_kwargs,
        )

        # Stage 3: NPT production run
        production_time = production_steps / timestep / 1000
        msg = f"Starting {production_time:.1f} ps NPT production run at {T:.2f} K and {pressure:.2e} GPa."
        logger.info(msg)

        structure_production, parsed_output = _run_lammps_md(
            structure=structure2,
            potential=potential,
            tmp_working_directory=tmp_working_directory,
            temperature=T,
            pressure=sim_pressure,
            n_ionic_steps=production_steps,
            timestep=timestep,
            n_dump=n_dump,
            n_log=n_log,
            initial_temperature=0,
            langevin=True,
            server_kwargs=server_kwargs,
        )

        # parse and check the output of the production run
        _sim_data = _collect_sim_data(parsed_output, counter_run)

        _sanity_check_sim_data(sim_data=_sim_data, T_target=T, p_target=pressure, logger=logger)

        # Collect results
        results = _temperature_scan_merge_results(previous_data=results, new_sim_data=_sim_data)

        # Use this structure as starting point for next temperature
        structure0 = structure_production

    results.update({"structure_final": structure_production})
    msg = "FINISHED SUCCESSFULLY."
    logger.info(msg)
    return {"data": results}

type_to_dict(types: np.ndarray) -> dict[int, str]

Generate a dictionary mapping atomic numbers (types) to element symbols from an ASE Atoms structure.

Parameters:

Name Type Description Default
types ndarray

Array containing atom types in the simulation.

required

Returns:

Type Description
dict[int, str]

Dictionary mapping atomic numbers to corresponding element symbols.

Example

type_map = type_to_dict(np.array([14, 8]))

Source code in amorphouspy/src/amorphouspy/shared.py
def type_to_dict(types: np.ndarray) -> dict[int, str]:
    """Generate a dictionary mapping atomic numbers (types) to element symbols from an ASE Atoms structure.

    Args:
        types: Array containing atom types in the simulation.

    Returns:
        Dictionary mapping atomic numbers to corresponding element symbols.

    Example:
        >>> type_map = type_to_dict(np.array([14, 8]))

    """
    # Extract unique atomic types from structure
    unique_types = np.unique(types)

    # Map atomic numbers to element symbols using ASE's periodic table
    element_symbols: list[str] = [chemical_symbols[z] for z in unique_types]

    # Create the type-to-symbol dictionary
    type_dict: dict[int, str] = dict(zip(unique_types, element_symbols, strict=True))

    return type_dict

viscosity_ensemble(structure: Atoms, potential: pd.DataFrame, n_replicas: int = 3, seeds: list[int] | None = None, base_seed: int = 12345, temperature_sim: float = 5000.0, timestep: float = 1.0, initial_production_steps: int = 10000000, n_print: int = 1, max_total_time_ns: float = 50.0, max_iterations: int = 40, eta_rel_tol: float = 0.05, eta_stable_iters: int = 3, server_kwargs: dict[str, Any] | None = None, *, langevin: bool = False, parallel: bool = False, executor: Executor | None = None, tmp_working_directory: str | Path | None = None) -> dict[str, Any]

Run multiple independent viscosity simulations and return ensemble-averaged results.

Each replica starts from the same structure but uses a different random seed for velocity initialisation, producing statistically independent trajectories. The viscosity (and other properties) are averaged across replicas and the inter-replica standard deviation is reported as the uncertainty.

Execution modes (mutually exclusive; executor takes priority):

  • executor provided — each replica is submitted via the executor's .submit() method. Pass any executorlib executor (SlurmJobExecutor, SlurmClusterExecutor, SingleNodeExecutor, …) to run replicas on an HPC cluster. The executor's resource_dict is automatically populated with the MPI core count from server_kwargs["cores"]. The executor's lifecycle is managed by the caller.
  • parallel=True — replicas run simultaneously on the local machine using ThreadPoolExecutor. Requires n_replicas * server_kwargs["cores"] total cores to be available.
  • default (sequential) — replicas run one after another on the local machine.

Seeds are written to tmp_working_directory/viscosity_ensemble_seeds.json immediately after generation so they are preserved even if the run is interrupted.

Parameters:

Name Type Description Default
structure Atoms

Input structure (assumed pre-equilibrated).

required
potential DataFrame

LAMMPS potential DataFrame.

required
n_replicas int

Number of independent replicas to run. Default 3.

3
seeds list[int] | None

Explicit list of integer seeds, one per replica. If None, seeds are auto-generated from base_seed using a NumPy RNG.

None
base_seed int

Master seed used to generate per-replica seeds when seeds is None. Default 12345.

12345
temperature_sim float

Simulation temperature in Kelvin.

5000.0
timestep float

MD timestep in fs.

1.0
initial_production_steps int

Steps for the first production run per replica.

10000000
n_print int

Thermodynamic output frequency.

1
max_total_time_ns float

Maximum total production time per replica in nanoseconds.

50.0
max_iterations int

Maximum number of 100 ps extension iterations per replica.

40
eta_rel_tol float

Relative change threshold for eta-stability check.

0.05
eta_stable_iters int

Consecutive stable iterations required per replica.

3
server_kwargs dict[str, Any] | None

Additional server arguments forwarded to LAMMPS (e.g. {"cores": 6} sets the MPI process count per replica).

None
langevin bool

Whether to use Langevin dynamics.

False
parallel bool

If True, run all replicas simultaneously using threads. Requires enough cores for all replicas at once. Default False. Ignored when executor is provided.

False
executor Executor | None

Optional executorlib (or compatible) executor. When provided, each replica is submitted as executor.submit(_run_replica, i, seed, resource_dict={"cores": N}). Example::

from executorlib import SlurmJobExecutor
with SlurmJobExecutor(max_workers=100) as exe:
    result = viscosity_ensemble(..., executor=exe)
None
tmp_working_directory str | Path | None

Directory for LAMMPS runs and seed file.

None

Returns:

Type Description
dict[str, Any]

A dictionary containing: - viscosity: Mean shear viscosity across replicas (Pa·s) - viscosity_fit_residual: Sample standard deviation across replicas (Pa·s, ddof=1) - viscosity_sem: Standard error of the mean (Pa·s) - shear_modulus_inf: Mean infinite-frequency shear modulus (Pa) - bulk_viscosity: Mean bulk viscosity (Pa·s) - maxwell_relaxation_time_ps: Mean Maxwell relaxation time (ps) - mean_pressure_gpa: Mean pressure averaged across replicas (GPa) - temperature: Mean temperature across all replicas (K) - n_replicas: Number of replicas run - seeds: List of seeds actually used - viscosities: Per-replica shear viscosity values (Pa·s) - converged: Per-replica convergence flags - results: Full viscosity_simulation output dicts per replica

Example

out = viscosity_ensemble( ... structure=my_atoms, ... potential=my_potential_df, ... n_replicas=3, ... temperature_sim=4000.0, ... n_print=10, ... server_kwargs={"cores": 4}, ... ) print(out["viscosity"], "±", out["viscosity_sem"], "Pa·s")

Source code in amorphouspy/src/amorphouspy/workflows/viscosity.py
def viscosity_ensemble(  # noqa: C901
    structure: Atoms,
    potential: pd.DataFrame,
    n_replicas: int = 3,
    seeds: list[int] | None = None,
    base_seed: int = 12345,
    temperature_sim: float = 5000.0,
    timestep: float = 1.0,
    initial_production_steps: int = 10_000_000,
    n_print: int = 1,
    max_total_time_ns: float = 50.0,
    max_iterations: int = 40,
    eta_rel_tol: float = 0.05,
    eta_stable_iters: int = 3,
    server_kwargs: dict[str, Any] | None = None,
    *,
    langevin: bool = False,
    parallel: bool = False,
    executor: Executor | None = None,
    tmp_working_directory: str | Path | None = None,
) -> dict[str, Any]:
    """Run multiple independent viscosity simulations and return ensemble-averaged results.

    Each replica starts from the same structure but uses a different random seed for
    velocity initialisation, producing statistically independent trajectories.  The
    viscosity (and other properties) are averaged across replicas and the inter-replica
    standard deviation is reported as the uncertainty.

    **Execution modes** (mutually exclusive; ``executor`` takes priority):

    * ``executor`` provided — each replica is submitted via the executor's ``.submit()``
      method.  Pass any executorlib executor (``SlurmJobExecutor``, ``SlurmClusterExecutor``,
      ``SingleNodeExecutor``, …) to run replicas on an HPC cluster.  The executor's
      ``resource_dict`` is automatically populated with the MPI core count from
      ``server_kwargs["cores"]``.  The executor's lifecycle is managed by the caller.
    * ``parallel=True`` — replicas run simultaneously on the local machine using
      ``ThreadPoolExecutor``.  Requires ``n_replicas * server_kwargs["cores"]`` total
      cores to be available.
    * default (sequential) — replicas run one after another on the local machine.

    Seeds are written to ``tmp_working_directory/viscosity_ensemble_seeds.json``
    immediately after generation so they are preserved even if the run is interrupted.

    Args:
        structure: Input structure (assumed pre-equilibrated).
        potential: LAMMPS potential DataFrame.
        n_replicas: Number of independent replicas to run. Default 3.
        seeds: Explicit list of integer seeds, one per replica.  If ``None``, seeds are
            auto-generated from ``base_seed`` using a NumPy RNG.
        base_seed: Master seed used to generate per-replica seeds when ``seeds`` is
            ``None``.  Default 12345.
        temperature_sim: Simulation temperature in Kelvin.
        timestep: MD timestep in fs.
        initial_production_steps: Steps for the first production run per replica.
        n_print: Thermodynamic output frequency.
        max_total_time_ns: Maximum total production time per replica in nanoseconds.
        max_iterations: Maximum number of 100 ps extension iterations per replica.
        eta_rel_tol: Relative change threshold for eta-stability check.
        eta_stable_iters: Consecutive stable iterations required per replica.
        server_kwargs: Additional server arguments forwarded to LAMMPS (e.g.
            ``{"cores": 6}`` sets the MPI process count per replica).
        langevin: Whether to use Langevin dynamics.
        parallel: If ``True``, run all replicas simultaneously using threads.
            Requires enough cores for all replicas at once. Default ``False``.
            Ignored when ``executor`` is provided.
        executor: Optional executorlib (or compatible) executor.  When provided,
            each replica is submitted as ``executor.submit(_run_replica, i, seed,
            resource_dict={"cores": N})``.  Example::

                from executorlib import SlurmJobExecutor
                with SlurmJobExecutor(max_workers=100) as exe:
                    result = viscosity_ensemble(..., executor=exe)

        tmp_working_directory: Directory for LAMMPS runs and seed file.

    Returns:
        A dictionary containing:
            - viscosity: Mean shear viscosity across replicas (Pa·s)
            - viscosity_fit_residual: Sample standard deviation across replicas (Pa·s, ddof=1)
            - viscosity_sem: Standard error of the mean (Pa·s)
            - shear_modulus_inf: Mean infinite-frequency shear modulus (Pa)
            - bulk_viscosity: Mean bulk viscosity (Pa·s)
            - maxwell_relaxation_time_ps: Mean Maxwell relaxation time (ps)
            - mean_pressure_gpa: Mean pressure averaged across replicas (GPa)
            - temperature: Mean temperature across all replicas (K)
            - n_replicas: Number of replicas run
            - seeds: List of seeds actually used
            - viscosities: Per-replica shear viscosity values (Pa·s)
            - converged: Per-replica convergence flags
            - results: Full ``viscosity_simulation`` output dicts per replica

    Example:
        >>> out = viscosity_ensemble(
        ...     structure=my_atoms,
        ...     potential=my_potential_df,
        ...     n_replicas=3,
        ...     temperature_sim=4000.0,
        ...     n_print=10,
        ...     server_kwargs={"cores": 4},
        ... )
        >>> print(out["viscosity"], "±", out["viscosity_sem"], "Pa·s")

    """
    if seeds is None:
        rng = np.random.default_rng(base_seed)
        seeds_list: list[int] = rng.integers(1, 900_000_000, size=n_replicas).tolist()
    else:
        seeds_list = list(seeds)

    if len(seeds_list) != n_replicas:
        msg = f"len(seeds)={len(seeds_list)} does not match n_replicas={n_replicas}"
        raise ValueError(msg)

    if tmp_working_directory is not None:
        seeds_path = Path(tmp_working_directory) / "viscosity_ensemble_seeds.json"
        seeds_path.parent.mkdir(parents=True, exist_ok=True)
        seeds_path.write_text(json.dumps({"base_seed": base_seed, "seeds": seeds_list}, indent=2))
        warnings.warn(
            f"viscosity_ensemble: seeds saved to {seeds_path}",
            stacklevel=2,
        )

    def _run_replica(i: int, seed: int) -> dict[str, Any]:
        warnings.warn(
            f"viscosity_ensemble: starting replica {i + 1}/{n_replicas}, seed={seed}",
            stacklevel=6,
        )
        if tmp_working_directory is not None:
            replica_workdir: Path | None = Path(tmp_working_directory) / f"replica_{i}"
            replica_workdir.mkdir(parents=True, exist_ok=True)
        else:
            replica_workdir = None
        result_i = viscosity_simulation(
            structure=structure,
            potential=potential,
            temperature_sim=temperature_sim,
            timestep=timestep,
            initial_production_steps=initial_production_steps,
            n_print=n_print,
            max_total_time_ns=max_total_time_ns,
            max_iterations=max_iterations,
            eta_rel_tol=eta_rel_tol,
            eta_stable_iters=eta_stable_iters,
            server_kwargs=server_kwargs,
            langevin=langevin,
            seed=int(seed),
            tmp_working_directory=replica_workdir,
        )
        result_i["seed"] = int(seed)
        return result_i

    replica_results: list[dict[str, Any]] = []
    if executor is not None:
        # Submit each replica as a separate job via the provided executor.
        # The executor (e.g. SlurmJobExecutor) handles SLURM script generation and
        # job submission automatically.  Configure resource requirements (cores,
        # partition, memory) on the executor itself at construction time.
        ordered_results: list[dict[str, Any]] = [{}] * n_replicas
        replica_futures = {executor.submit(_run_replica, i, seed): i for i, seed in enumerate(seeds_list)}
        for completed_future in as_completed(replica_futures):
            replica_index = replica_futures[completed_future]
            ordered_results[replica_index] = completed_future.result()
        replica_results = ordered_results
    elif parallel:
        # Run all replicas simultaneously on the local machine using threads.
        # Each LAMMPS process is MPI-parallel and releases the GIL, so threads
        # do not compete for the Python interpreter.
        ordered_results = [{}] * n_replicas
        with ThreadPoolExecutor(max_workers=n_replicas) as thread_pool:
            replica_futures = {thread_pool.submit(_run_replica, i, seed): i for i, seed in enumerate(seeds_list)}
            for completed_future in as_completed(replica_futures):
                replica_index = replica_futures[completed_future]
                ordered_results[replica_index] = completed_future.result()
        replica_results = ordered_results
    else:
        replica_results = [_run_replica(i, seed) for i, seed in enumerate(seeds_list)]

    per_replica_viscosity_data = [r["viscosity_data"] for r in replica_results]

    shear_viscosities = np.array([data["viscosity"] for data in per_replica_viscosity_data])
    mean_viscosity = float(np.mean(shear_viscosities))
    std_viscosity = float(np.std(shear_viscosities, ddof=1)) if n_replicas > 1 else 0.0
    sem_viscosity = std_viscosity / math.sqrt(n_replicas) if n_replicas > 1 else 0.0

    def _mean_across_replicas(key: str) -> float:
        return float(np.mean([data[key] for data in per_replica_viscosity_data]))

    return {
        "viscosity": mean_viscosity,
        "viscosity_fit_residual": std_viscosity,
        "viscosity_sem": sem_viscosity,
        "shear_modulus_inf": _mean_across_replicas("shear_modulus_inf"),
        "bulk_viscosity": _mean_across_replicas("bulk_viscosity"),
        "maxwell_relaxation_time_ps": _mean_across_replicas("maxwell_relaxation_time_ps"),
        "mean_pressure_gpa": _mean_across_replicas("mean_pressure_gpa"),
        "temperature": _mean_across_replicas("temperature"),
        "n_replicas": n_replicas,
        "seeds": seeds_list,
        "viscosities": shear_viscosities.tolist(),
        "converged": [r["converged"] for r in replica_results],
        "results": replica_results,
    }

viscosity_simulation(structure: Atoms, potential: pd.DataFrame, temperature_sim: float = 5000.0, timestep: float = 1.0, initial_production_steps: int = 10000000, n_print: int = 1, max_total_time_ns: float = 50.0, max_iterations: int = 40, eta_rel_tol: float = 0.05, eta_stable_iters: int = 3, server_kwargs: dict[str, Any] | None = None, *, langevin: bool = False, seed: int = 12345, tmp_working_directory: str | Path | None = None) -> dict[str, Any]

Perform an autonomous viscosity simulation using the Helfand moment method.

Runs an initial NVT production MD (default 10 ns) and iteratively extends it by 100 ps until viscosity converges or limits are reached.

Convergence requires both conditions to be true simultaneously:

  1. eta-stability - |eta_new - eta_prev| / eta_prev < eta_rel_tol for eta_stable_iters consecutive iterations.
  2. MSD linearity - the local slope of the Helfand moment MSD is flat in the last 30 % of the lag window (_viscosity_plateaued check), confirming that the diffusive regime has been reached.

Extensions stop when either max_total_time_ns or max_iterations is exhausted. get_viscosity is retained as a legacy cross-check in the return dict.

Parameters:

Name Type Description Default
structure Atoms

Input structure (assumed pre-equilibrated).

required
potential DataFrame

LAMMPS potential DataFrame.

required
temperature_sim float

Simulation temperature in Kelvin.

5000.0
timestep float

MD timestep in fs.

1.0
initial_production_steps int

Steps for the first production run. Default 10,000,000 (= 10 ns at 1 fs timestep).

10000000
n_print int

Thermodynamic output frequency (must equal output_frequency passed to analysis functions).

1
max_total_time_ns float

Maximum total production time in nanoseconds. Default 50 ns.

50.0
max_iterations int

Maximum number of 100 ps extension iterations. Default 40.

40
eta_rel_tol float

Relative change threshold for eta-stability check. Default 0.05 (5 %).

0.05
eta_stable_iters int

Consecutive stable iterations required. Default 3.

3
server_kwargs dict[str, Any] | None

Additional server arguments forwarded to LAMMPS.

None
langevin bool

Whether to use Langevin dynamics.

False
seed int

Random seed for velocity initialization.

12345
tmp_working_directory str | Path | None

Temporary directory for LAMMPS runs.

None

Returns:

Type Description
dict[str, Any]

A dictionary containing: - viscosity_data: Output of helfand_viscosity from the final iteration. - result: Accumulated raw MD arrays (pressures, volume, temperature). - structure: Final ASE Atoms object. - total_production_steps: Total production steps completed. - iterations: Number of iterations run. - converged: True if both convergence criteria were satisfied.

Raises:

Type Description
ValueError

If the potential DataFrame is empty.

Example

result = viscosity_simulation( ... structure=my_atoms, ... potential=my_potential_df, ... temperature_sim=3000.0, ... ) converged = result["converged"] print(converged)

Source code in amorphouspy/src/amorphouspy/workflows/viscosity.py
def viscosity_simulation(
    structure: Atoms,
    potential: pd.DataFrame,
    temperature_sim: float = 5000.0,
    timestep: float = 1.0,
    initial_production_steps: int = 10_000_000,
    n_print: int = 1,
    max_total_time_ns: float = 50.0,
    max_iterations: int = 40,
    eta_rel_tol: float = 0.05,
    eta_stable_iters: int = 3,
    server_kwargs: dict[str, Any] | None = None,
    *,
    langevin: bool = False,
    seed: int = 12345,
    tmp_working_directory: str | Path | None = None,
) -> dict[str, Any]:
    """Perform an autonomous viscosity simulation using the Helfand moment method.

    Runs an initial NVT production MD (default 10 ns) and iteratively extends
    it by 100 ps until viscosity converges or limits are reached.

    Convergence requires both conditions to be true simultaneously:

    1. **eta-stability** - ``|eta_new - eta_prev| / eta_prev < eta_rel_tol``
       for ``eta_stable_iters`` consecutive iterations.
    2. **MSD linearity** - the local slope of the Helfand moment MSD is flat
       in the last 30 % of the lag window (``_viscosity_plateaued`` check),
       confirming that the diffusive regime has been reached.

    Extensions stop when either ``max_total_time_ns`` or ``max_iterations``
    is exhausted.  ``get_viscosity`` is retained as a legacy cross-check in
    the return dict.

    Args:
        structure: Input structure (assumed pre-equilibrated).
        potential: LAMMPS potential DataFrame.
        temperature_sim: Simulation temperature in Kelvin.
        timestep: MD timestep in fs.
        initial_production_steps: Steps for the first production run.
            Default 10,000,000 (= 10 ns at 1 fs timestep).
        n_print: Thermodynamic output frequency (must equal ``output_frequency``
            passed to analysis functions).
        max_total_time_ns: Maximum total production time in nanoseconds.
            Default 50 ns.
        max_iterations: Maximum number of 100 ps extension iterations.
            Default 40.
        eta_rel_tol: Relative change threshold for eta-stability check.
            Default 0.05 (5 %).
        eta_stable_iters: Consecutive stable iterations required.
            Default 3.
        server_kwargs: Additional server arguments forwarded to LAMMPS.
        langevin: Whether to use Langevin dynamics.
        seed: Random seed for velocity initialization.
        tmp_working_directory: Temporary directory for LAMMPS runs.

    Returns:
        A dictionary containing:
            - viscosity_data: Output of ``helfand_viscosity`` from the final iteration.
            - result: Accumulated raw MD arrays (pressures, volume, temperature).
            - structure: Final ASE ``Atoms`` object.
            - total_production_steps: Total production steps completed.
            - iterations: Number of iterations run.
            - converged: True if both convergence criteria were satisfied.

    Raises:
        ValueError: If the potential DataFrame is empty.

    Example:
        >>> result = viscosity_simulation(
        ...     structure=my_atoms,
        ...     potential=my_potential_df,
        ...     temperature_sim=3000.0,
        ... )
        >>> converged = result["converged"]
        >>> print(converged)

    """
    max_steps = int(max_total_time_ns * 1e6 / timestep)
    ext_steps = int(100_000.0 / timestep)  # 100 ps per extension

    warnings.warn(
        f"viscosity_simulation: starting initial run "
        f"({initial_production_steps:,} steps, T={temperature_sim} K, "
        f"timestep={timestep} fs)",
        stacklevel=2,
    )

    sim_result = _viscosity_simulation(
        structure=structure,
        potential=potential,
        temperature_sim=temperature_sim,
        timestep=timestep,
        production_steps=initial_production_steps,
        n_print=n_print,
        server_kwargs=server_kwargs,
        langevin=langevin,
        seed=seed,
        tmp_working_directory=tmp_working_directory,
    )

    acc: dict[str, Any] = {
        "pressures": sim_result["result"]["pressures"],
        "volume": sim_result["result"]["volume"],
        "temperature": sim_result["result"]["temperature"],
    }
    current_structure: Atoms = sim_result["structure"]
    total_production_steps: int = initial_production_steps
    iterations: int = 1
    converged: bool = False
    helfand_data: dict[str, Any] = {}
    eta_prev: float | None = None
    stable_count: int = 0

    while iterations <= max_iterations:
        t_total_ps = total_production_steps * timestep / 1000.0

        helfand_data = helfand_viscosity(
            {"result": acc},
            timestep=timestep,
            output_frequency=n_print,
        )
        eta_curr = float(helfand_data["viscosity"])

        # Secondary check: MSD must be in the linear diffusive regime.
        # np.diff of the MSD gives the local slope; if that slope is itself
        # flat (_viscosity_plateaued), the MSD is linear.
        msd_arr = np.asarray(helfand_data["msd"])
        msd_linear = _viscosity_plateaued(
            np.diff(msd_arr),
            tail_fraction=0.3,
            rel_slope_tol=0.05,
        )

        if eta_prev is not None and eta_prev != 0.0:
            rel_change = abs(eta_curr - eta_prev) / abs(eta_prev)
            if rel_change < eta_rel_tol:
                stable_count += 1
            else:
                stable_count = 0

            warnings.warn(
                f"viscosity_simulation: iter {iterations}, "
                f"eta={eta_curr:.4e} Pa.s, rel_change={rel_change:.3f} "
                f"(tol={eta_rel_tol}), stable={stable_count}/{eta_stable_iters}, "
                f"msd_linear={msd_linear}, t={t_total_ps:.1f} ps",
                stacklevel=2,
            )

            if stable_count >= eta_stable_iters and msd_linear:
                warnings.warn(
                    f"viscosity_simulation: CONVERGED at iteration {iterations} "
                    f"(eta_rel={rel_change:.3f}, msd_linear={msd_linear})",
                    stacklevel=2,
                )
                converged = True
                break
        else:
            warnings.warn(
                f"viscosity_simulation: iter {iterations}, "
                f"eta={eta_curr:.4e} Pa.s (first estimate), t={t_total_ps:.1f} ps",
                stacklevel=2,
            )

        eta_prev = eta_curr

        if total_production_steps >= max_steps:
            warnings.warn(
                f"viscosity_simulation: time budget exhausted ({t_total_ps:.1f} ps).",
                stacklevel=2,
            )
            break

        _ext = min(ext_steps, max_steps - total_production_steps)

        warnings.warn(
            f"viscosity_simulation: extending by {_ext:,} steps ({_ext * timestep / 1000:.0f} ps).",
            stacklevel=2,
        )

        current_structure, ext_parsed = _run_lammps_md(
            structure=current_structure,
            potential=potential,
            tmp_working_directory=tmp_working_directory,
            temperature=temperature_sim,
            n_ionic_steps=_ext,
            timestep=timestep,
            n_print=n_print,
            initial_temperature=temperature_sim,
            langevin=langevin,
            server_kwargs=server_kwargs,
        )

        ext_result = ext_parsed.get("generic", None)
        assert ext_result is not None
        acc["pressures"] = np.concatenate([acc["pressures"], ext_result["pressures"]])
        acc["volume"] = np.concatenate([acc["volume"], ext_result["volume"]])
        acc["temperature"] = np.concatenate([acc["temperature"], ext_result["temperature"]])

        total_production_steps += _ext
        iterations += 1

    warnings.warn(
        f"viscosity_simulation: finished. converged={converged}, "
        f"iterations={iterations}, total_steps={total_production_steps:,}",
        stacklevel=2,
    )

    return {
        "viscosity_data": helfand_data,
        "result": acc,
        "structure": current_structure,
        "total_production_steps": total_production_steps,
        "iterations": iterations,
        "converged": converged,
    }

write_angle_distribution(bin_centers: np.ndarray, angle_hist: np.ndarray, composition: float, filepath: str, *, append: bool = False) -> None

Write angle distribution to a text file.

Parameters:

Name Type Description Default
bin_centers ndarray

Angle bin centers in degrees.

required
angle_hist ndarray

Normalized angle histogram.

required
composition float

Composition value (e.g., % modifier).

required
filepath str

Output filepath.

required
append bool

Whether to append to file.

False
Example

write_angle_distribution(centers, hist, 0.5, "angles.txt")

Source code in amorphouspy/src/amorphouspy/io_utils.py
def write_angle_distribution(
    bin_centers: np.ndarray,
    angle_hist: np.ndarray,
    composition: float,
    filepath: str,
    *,
    append: bool = False,
) -> None:
    """Write angle distribution to a text file.

    Args:
        bin_centers: Angle bin centers in degrees.
        angle_hist: Normalized angle histogram.
        composition: Composition value (e.g., % modifier).
        filepath: Output filepath.
        append: Whether to append to file.

    Example:
        >>> write_angle_distribution(centers, hist, 0.5, "angles.txt")

    """
    mode = "a" if append else "w"
    write_header = not append or not Path(filepath).exists()
    with Path(filepath).open(mode, encoding="utf-8") as f:
        if write_header:
            f.write("Composition " + " ".join(f"{b:.1f}" for b in bin_centers) + "\n")
        f.write(f"{composition} " + " ".join(f"{v:.6f}" for v in angle_hist) + "\n")

write_distribution_to_file(composition: float | str, filepath: str, dist: dict[int, int], label: str, *, append: bool = False) -> None

Write a coordination/Qn histogram to a text file.

Parameters:

Name Type Description Default
composition float | str

Composition value to label row.

required
filepath str

Output filepath.

required
dist dict[int, int]

Histogram data.

required
label str

Prefix for headers (e.g., Si or Q).

required
append bool

Append mode; writes header only if file does not exist.

False
Example

write_distribution_to_file(0.5, "qn.txt", qn_dist, "Q")

Source code in amorphouspy/src/amorphouspy/io_utils.py
def write_distribution_to_file(
    composition: float | str,
    filepath: str,
    dist: dict[int, int],
    label: str,
    *,
    append: bool = False,
) -> None:
    """Write a coordination/Qn histogram to a text file.

    Args:
        composition: Composition value to label row.
        filepath: Output filepath.
        dist: Histogram data.
        label: Prefix for headers (e.g., Si or Q).
        append: Append mode; writes header only if file does not exist.

    Example:
        >>> write_distribution_to_file(0.5, "qn.txt", qn_dist, "Q")

    """
    max_n = max(dist.keys(), default=0)
    total = sum(dist.values())
    headers = [f"{label}_{i}" for i in range(max_n + 1)] + [f"{label}_tot"]
    values = [dist.get(i, 0) for i in range(max_n + 1)] + [total]
    mode = "a" if append else "w"
    write_header = not append or not Path(filepath).exists()
    with Path(filepath).open(mode, encoding="utf-8") as f:
        if write_header:
            f.write("Composition " + " ".join(headers) + "\n")
        f.write(str(composition) + " " + " ".join(map(str, values)) + "\n")

write_xyz(filename: str, coords: np.ndarray, types: np.ndarray, box_size: np.ndarray | None = None, type_dict: dict[int, str] | None = None) -> None

Write atomic configuration to an XYZ file.

Parameters:

Name Type Description Default
filename str

Output XYZ file name.

required
coords ndarray

Atomic coordinates.

required
types ndarray

Atomic types as integers.

required
box_size ndarray | None

Simulation box size in x, y, z.

None
type_dict dict[int, str] | None

Dictionary mapping atomic type integers to element symbols.

None
Example

write_xyz("output.xyz", coords, types, box, {14: "Si", 8: "O"})

Source code in amorphouspy/src/amorphouspy/io_utils.py
def write_xyz(
    filename: str,
    coords: np.ndarray,
    types: np.ndarray,
    box_size: np.ndarray | None = None,
    type_dict: dict[int, str] | None = None,
) -> None:
    """Write atomic configuration to an XYZ file.

    Args:
        filename: Output XYZ file name.
        coords: Atomic coordinates.
        types: Atomic types as integers.
        box_size: Simulation box size in x, y, z.
        type_dict: Dictionary mapping atomic type integers to element symbols.

    Example:
        >>> write_xyz("output.xyz", coords, types, box, {14: "Si", 8: "O"})

    """
    if type_dict is None:
        msg = "type_dict must be provided"
        raise ValueError(msg)

    n_atoms = coords.shape[0]
    path = Path(filename)
    with path.open("w") as f:
        f.write(f"{n_atoms}\n")
        if box_size is not None:
            f.write(f"CUB {box_size[0]:.8f} {box_size[1]:.8f} {box_size[2]:.8f}\n")
        else:
            f.write("\n")
        for t, (x, y, z) in zip(types, coords, strict=False):
            symbol = type_dict.get(t)
            if symbol is None:
                msg = f"Unknown atomic type: {t}"
                raise ValueError(msg)
            f.write(f"{symbol} {x:.8f} {y:.8f} {z:.8f}\n")