Source code for strawberrypy.postprocessing.pair_distance

import numpy as np

from ..config import USE_MPI
from ..classes import Model


def shift_x(model: Model, orb_arr: np.ndarray) -> np.ndarray:
    r"""
    Shift the whole lattice to the right. Only used for the PBC lattice contraction.

    Parameters
    ----------
        model: Model
            The model containing the lattice parameters.
        orb_arr :
            Site indices which then are mapped out during transformation. The lattice
            is labeled where y-coords vary first.

    Returns
    -------
        shifted_orb_arr : np.ndarray
            The shifted site indices after transformation.
    """
    return (orb_arr + model.states_uc * model.Ly) % (
        model.Lx * model.Ly * model.states_uc
    )


def shift_y(model: Model, orb_arr: np.ndarray) -> np.ndarray:
    r"""
    Shift the whole lattice upwards.

    Parameters
    ----------
        model: Model
            The model containing the lattice parameters.
        orb_arr :
            Site indices which then are mapped out during transformation. The lattice
            is labeled where y-coords vary first.

    Returns
    -------
        shifted_orb_arr : np.ndarray
            The shifted site indices after transformation.
    """
    nstates_per_col = model.states_uc * model.Ly
    col_index = (orb_arr // nstates_per_col) * nstates_per_col
    row_index = (orb_arr % nstates_per_col + model.states_uc) % nstates_per_col
    return col_index + row_index


def _get_pbc_distance_pairs_amorphous_serial(model: Model) -> np.ndarray:
    r"""Compute the distance between pair of orbitals with periodic boundary
    conditions, for an amorphous system.

    Parameters
    ----------
        model : Model
            The model for which to compute the pair distances.

    Returns
    -------
        distance_pairs : np.ndarray
            The matrix of pair distances in cartesian coordinates, accounting for periodic
            boundary conditions.
    """
    lvecs = np.copy(model.lvecs)

    # Reduced Lvecs
    lvecs[0] /= model.Lx
    lvecs[1] /= model.Ly

    cart_positions = model.cart_positions

    # Compute reduced coordinates
    linv = np.linalg.inv(lvecs)
    reds = cart_positions @ linv
    N = reds.shape[0]

    distance_pairs = np.zeros((N, N))

    for i in range(N):
        for trial in range(i + 1, N):
            # Periodic images of trial orbital in reduced coordinates
            trial_orbs = reds[trial] + [
                (0, 0, 0),
                (model.Lx, 0, 0),
                (-model.Lx, 0, 0),
                (0, model.Ly, 0),
                (0, -model.Ly, 0),
                (model.Lx, model.Ly, 0),
                (-model.Lx, -model.Ly, 0),
                (-model.Lx, model.Ly, 0),
                (model.Lx, -model.Ly, 0),
            ]
            # Find the minimum (cartesian) distance among all periodic images to orbital i
            dist = np.linalg.norm((trial_orbs - reds[i]) @ lvecs, axis=1).min()
            distance_pairs[i, trial] = dist
            distance_pairs[trial, i] = dist

    # Round values as to avoid very near values being tagged as separate distances.
    distance_pairs = np.round(distance_pairs, 7)

    return distance_pairs


def _get_pbc_distance_pairs_amorphous_mpi(model: Model) -> np.ndarray:
    r"""Compute the distance between pair of orbitals with periodic boundary
    conditions, for an amorphous system.

    Parameters
    ----------
        model : Model
            The model for which to compute the pair distances.

    Returns
    -------
        distance_pairs : np.ndarray
            The matrix of pair distances in cartesian coordinates, accounting for periodic
            boundary conditions.
    """
    lvecs = np.copy(model.backend.comm.bcast(model.lvecs, root=0))

    # Reduced Lvecs
    lvecs[0] /= model.Lx
    lvecs[1] /= model.Ly

    cart_positions = model.cart_positions

    # Compute reduced coordinates
    linv = np.linalg.inv(lvecs)
    reds = cart_positions @ linv
    N = reds.shape[0]

    block_size = max(1, N // model.backend.mpi_size)
    start_i = model.backend.mpi_rank * block_size
    if model.backend.mpi_rank == model.backend.mpi_size - 1:
        end_i = N
    else:
        end_i = (model.backend.mpi_rank + 1) * block_size

    start_i = min(max(start_i, 0), N)
    end_i = min(max(end_i, start_i), N)
    local_rows = end_i - start_i

    local_dist_uc = np.zeros((local_rows, N), dtype=np.float64)

    for i in range(start_i, end_i):
        for trial in range(i + 1, N):
            # Periodic images of trial orbital in reduced coordinates
            trial_orbs = reds[trial] + [
                (0, 0, 0),
                (model.Lx, 0, 0),
                (-model.Lx, 0, 0),
                (0, model.Ly, 0),
                (0, -model.Ly, 0),
                (model.Lx, model.Ly, 0),
                (-model.Lx, -model.Ly, 0),
                (-model.Lx, model.Ly, 0),
                (model.Lx, -model.Ly, 0),
            ]
            # Find the minimum (cartesian) distance among all periodic images to orbital i
            dist = np.linalg.norm((trial_orbs - reds[i]) @ lvecs, axis=1).min()
            local_dist_uc[i - start_i, trial] = dist

    if model.backend.is_master_rank:
        global_dist = np.zeros((N, N), dtype=np.float64)
        if local_rows > 0:
            global_dist[start_i:end_i, :] = local_dist_uc

        for r in range(1, model.backend.mpi_size):
            r_start = r * block_size
            r_end = N if r == model.backend.mpi_size - 1 else (r + 1) * block_size
            r_start = min(max(r_start, 0), N)
            r_end = min(max(r_end, r_start), N)

            if r_end > r_start:
                temp = np.empty((r_end - r_start, N), dtype=np.float64)
                model.backend.comm.Recv(temp, source=r, tag=0)
                global_dist[r_start:r_end, :] = temp

        # Symmetrize the upper triangular matrix
        global_dist = global_dist + global_dist.T
    else:
        global_dist = None
        if local_rows > 0:
            model.backend.comm.Send(local_dist_uc, dest=0, tag=0)

    del local_dist_uc  # Free memory as it's no longer needed

    distance_pairs, win = model.backend.shared_array(global_dist, get_win=True)

    # Use node comm so only 1 rank per physical node executes the duplicate array rounding
    node_comm = model.backend.comm.Split_type(model.backend.MPI.COMM_TYPE_SHARED)
    if node_comm.Get_rank() == 0:
        np.round(distance_pairs, 7, out=distance_pairs)
    win.Fence()

    return distance_pairs


def _get_pbc_distance_pairs_serial(model: Model) -> np.ndarray:
    r"""Compute the distance between pair of orbitals with periodic boundary
    conditions.

    Parameters
    ----------
        model : Model
            The model for which to compute the pair distances.

    Returns
    -------
        distance_pairs : np.ndarray
            The matrix of pair distances in cartesian coordinates, accounting for periodic
            boundary conditions (minimum convention image).
    """
    lvecs = np.copy(model.lvecs)

    # Reduced Lvecs
    lvecs[0] /= model.Lx
    lvecs[1] /= model.Ly

    if model.has_vacancies:
        cart_positions = model._cart_positions_original
    else:
        cart_positions = model.cart_positions

    # Compute reduced coordinates
    linv = np.linalg.inv(lvecs)
    reds = cart_positions @ linv
    N = reds.shape[0]

    distance_pairs = np.zeros((N, N))

    # Assume that cart_positions are arranged such that the y-coords vary first for a
    #   given x-coords
    # Compute first distances w.r.t to home unit cell. Take advantage that translating
    #   the whole lattice will not change the pair distance
    # Indices are in terms of all the orbitals
    for i in range(model.states_uc):
        for trial in range(i + 1, N):
            # Periodic images of trial orbital in reduced coordinates
            trial_orbs = reds[trial] + [
                (0, 0, 0),
                (model.Lx, 0, 0),
                (-model.Lx, 0, 0),
                (0, model.Ly, 0),
                (0, -model.Ly, 0),
                (model.Lx, model.Ly, 0),
                (-model.Lx, -model.Ly, 0),
                (-model.Lx, model.Ly, 0),
                (model.Lx, -model.Ly, 0),
            ]
            # Find the minimum (cartesian) distance among all periodic images to orbital i
            dist = np.linalg.norm((trial_orbs - reds[i]) @ lvecs, axis=1).min()
            distance_pairs[i, trial] = dist
            distance_pairs[trial, i] = dist

    orb_arr = np.arange(0, model.Lx * model.Ly * model.states_uc)
    shift = np.copy(orb_arr)

    # Lattice shift
    for _ in range(model.Ly):
        for _ in range(model.Lx):
            shift = np.copy(shift_x(model, shift))
            for i in range(model.states_uc):
                distance_pairs[shift[i], shift] = distance_pairs[orb_arr[i], orb_arr]

        shift = np.copy(shift_y(model, shift))
        for i in range(model.states_uc):
            distance_pairs[shift[i], shift] = distance_pairs[orb_arr[i], orb_arr]

    # Round values as to avoid very near values being tagged as separate distances.
    distance_pairs = np.round(distance_pairs, 7)
    # Remove distances involving vacancies, if any
    distance_pairs = distance_pairs[:, model._mask][model._mask, :]

    return distance_pairs


def _get_pbc_distance_pairs_mpi(model: Model) -> np.ndarray:
    r"""Compute the distance between pair of orbitals with periodic boundary
    conditions.

    Parameters
    ----------
        model : Model
            The model for which to compute the pair distances.

    Returns
    -------
        distance_pairs : np.ndarray
            The matrix of pair distances in cartesian coordinates, accounting for periodic
            boundary conditions (minimum convention image).
    """
    lvecs = np.copy(model.backend.comm.bcast(model.lvecs, root=0))

    # Reduced Lvecs
    lvecs[0] /= model.Lx
    lvecs[1] /= model.Ly

    if model.has_vacancies:
        cart_positions = model._cart_positions_original
    else:
        cart_positions = model.cart_positions

    # Compute reduced coordinates
    linv = np.linalg.inv(lvecs)
    reds = cart_positions @ linv
    N = reds.shape[0]

    num_trials = N
    block_size = max(1, num_trials // model.backend.mpi_size)
    start_trial = model.backend.mpi_rank * block_size
    if model.backend.mpi_rank == model.backend.mpi_size - 1:
        end_trial = N
    else:
        end_trial = (model.backend.mpi_rank + 1) * block_size

    # Clamp start_trial, end_trial if they go out of bounds
    start_trial = min(max(start_trial, 0), N)
    end_trial = min(max(end_trial, start_trial), N)
    local_cols = end_trial - start_trial

    # Memory trick: compute only the local column slice of the upper band per rank
    local_dist_uc = np.zeros((model.states_uc, local_cols), dtype=np.float64)

    # Parallelize the initial pair distance computation
    for i in range(model.states_uc):
        for trial in range(start_trial, end_trial):
            # Periodic images of trial orbital in reduced coordinates
            trial_orbs = reds[trial] + [
                (0, 0, 0),
                (model.Lx, 0, 0),
                (-model.Lx, 0, 0),
                (0, model.Ly, 0),
                (0, -model.Ly, 0),
                (model.Lx, model.Ly, 0),
                (-model.Lx, -model.Ly, 0),
                (-model.Lx, model.Ly, 0),
                (model.Lx, -model.Ly, 0),
            ]
            # Find the minimum (cartesian) distance among all periodic images to orbital i
            dist = np.linalg.norm((trial_orbs - reds[i]) @ lvecs, axis=1).min()
            local_dist_uc[i, trial - start_trial] = dist

    # Gather explicitly onto the master rank
    if model.backend.is_master_rank:
        global_dist_uc = np.zeros((model.states_uc, N), dtype=np.float64)
        if local_cols > 0:
            global_dist_uc[:, start_trial:end_trial] = local_dist_uc

        for r in range(1, model.backend.mpi_size):
            r_start = r * block_size
            r_end = N if r == model.backend.mpi_size - 1 else (r + 1) * block_size
            r_start = min(max(r_start, 0), N)
            r_end = min(max(r_end, r_start), N)

            if r_end > r_start:
                temp = np.empty((model.states_uc, r_end - r_start), dtype=np.float64)
                model.backend.comm.Recv(temp, source=r, tag=0)
                global_dist_uc[:, r_start:r_end] = temp

        distance_pairs = np.zeros((N, N), dtype=np.float64)
        for i in range(model.states_uc):
            distance_pairs[i, :] = global_dist_uc[i, :]
            distance_pairs[:, i] = global_dist_uc[i, :]
    else:
        distance_pairs = None
        if local_cols > 0:
            model.backend.comm.Send(local_dist_uc, dest=0, tag=0)

    del local_dist_uc  # Free memory as it's no longer needed

    distance_pairs, win = model.backend.shared_array(distance_pairs, get_win=True)

    #  Only node rank leaders write shared array
    node_comm = model.backend.comm.Split_type(model.backend.MPI.COMM_TYPE_SHARED)
    node_rank = node_comm.Get_rank()

    # Lattice shift - distributed across node internal ranks for speed without memory footprint
    orb_arr = np.arange(0, N)
    shift = np.copy(orb_arr)

    for _ in range(model.Ly):
        for _ in range(model.Lx):
            shift = np.copy(shift_x(model, shift))
            for i in range(model.states_uc):
                if i % node_comm.size == node_rank:
                    distance_pairs[shift[i], shift] = distance_pairs[orb_arr[i], orb_arr]
            win.Fence()

        shift = np.copy(shift_y(model, shift))
        for i in range(model.states_uc):
            if i % node_comm.size == node_rank:
                distance_pairs[shift[i], shift] = distance_pairs[orb_arr[i], orb_arr]
        win.Fence()

    if node_rank == 0:
        # Round values as to avoid very near values being tagged as separate distances
        np.round(distance_pairs, 7, out=distance_pairs)
    win.Fence()

    # Remove distances involving vacancies, if any
    distance_pairs = distance_pairs[:, model._mask][model._mask, :]

    return distance_pairs


def _get_obc_distance_pairs_serial(model: Model) -> np.ndarray:
    r"""Compute the distance between pair of orbitals with open boundary
    conditions.

    Parameters
    ----------
        model : Model
            The model for which to compute the pair distances.

    Returns
    -------
        distance_pairs : np.ndarray
            The matrix of pair distances in cartesian coordinates, without accounting for
            periodic boundary conditions.
    """

    if model.has_vacancies:
        cart_positions = model._cart_positions_original
    else:
        cart_positions = model.cart_positions

    N = cart_positions.shape[0]

    distance_pairs = np.zeros((N, N))

    for i in range(N):
        dist = np.linalg.norm(cart_positions[i] - cart_positions[i + 1 : N], axis=1)
        distance_pairs[i + 1 : N, i] = dist

    # Symmetrize the upper triangular matrix
    distance_pairs = distance_pairs + distance_pairs.T

    # Round values as to avoid very near values being tagged as separate distances.
    distance_pairs = np.round(distance_pairs, 7)

    # Remove distances involving vacancies, if any
    distance_pairs = distance_pairs[:, model._mask][model._mask, :]

    return distance_pairs


def _get_obc_distance_pairs_mpi(model: Model) -> np.ndarray:
    r"""Compute the distance between pair of orbitals with open boundary
    conditions.

    Parameters
    ----------
        model : Model
            The model for which to compute the pair distances.

    Returns
    -------
        distance_pairs : np.ndarray
            The matrix of pair distances in cartesian coordinates, without accounting for
            periodic boundary conditions.
    """

    if model.has_vacancies:
        cart_positions = model._cart_positions_original
    else:
        cart_positions = model.cart_positions

    N = cart_positions.shape[0]

    block_size = max(1, N // model.backend.mpi_size)
    start_i = model.backend.mpi_rank * block_size
    if model.backend.mpi_rank == model.backend.mpi_size - 1:
        end_i = N
    else:
        end_i = (model.backend.mpi_rank + 1) * block_size

    start_i = min(max(start_i, 0), N)
    end_i = min(max(end_i, start_i), N)
    local_rows = end_i - start_i

    local_dist = np.zeros((local_rows, N), dtype=np.float64)

    for i in range(start_i, end_i):
        dist = np.linalg.norm(cart_positions[i] - cart_positions[i + 1 : N], axis=1)
        local_dist[i - start_i, i + 1 : N] = dist

    if model.backend.is_master_rank:
        global_dist = np.zeros((N, N), dtype=np.float64)
        if local_rows > 0:
            global_dist[start_i:end_i, :] = local_dist

        for r in range(1, model.backend.mpi_size):
            r_start = r * block_size
            r_end = N if r == model.backend.mpi_size - 1 else (r + 1) * block_size
            r_start = min(max(r_start, 0), N)
            r_end = min(max(r_end, r_start), N)

            if r_end > r_start:
                temp = np.empty((r_end - r_start, N), dtype=np.float64)
                model.backend.comm.Recv(temp, source=r, tag=0)
                global_dist[r_start:r_end, :] = temp

        # Symmetrize the upper triangular matrix
        global_dist = global_dist + global_dist.T
    else:
        global_dist = None
        if local_rows > 0:
            model.backend.comm.Send(local_dist, dest=0, tag=0)

    del local_dist

    distance_pairs, win = model.backend.shared_array(global_dist, get_win=True)

    # Use node comm so only 1 rank per physical node executes the duplicate array rounding
    node_comm = model.backend.comm.Split_type(model.backend.MPI.COMM_TYPE_SHARED)
    if node_comm.Get_rank() == 0:
        np.round(distance_pairs, 7, out=distance_pairs)
    win.Fence()

    # Remove distances involving vacancies, if any
    distance_pairs = distance_pairs[:, model._mask][model._mask, :]

    return distance_pairs


##########################################################################################

if USE_MPI:

    def get_pbc_distance_pairs(model: Model) -> np.ndarray:
        if model.is_crystalline:
            return _get_pbc_distance_pairs_mpi(model)
        else:
            return _get_pbc_distance_pairs_amorphous_mpi(model)

    def get_obc_distance_pairs(model: Model) -> np.ndarray:
        return _get_obc_distance_pairs_mpi(model)

else:

[docs] def get_pbc_distance_pairs(model: Model) -> np.ndarray: if model.is_crystalline: return _get_pbc_distance_pairs_serial(model) else: return _get_pbc_distance_pairs_amorphous_serial(model)
def get_obc_distance_pairs(model: Model) -> np.ndarray: return _get_obc_distance_pairs_serial(model)