Source code for strawberrypy.classes

from __future__ import annotations
from typing import TYPE_CHECKING

# This is ignored at runtime, used only for type hinting
if TYPE_CHECKING:
    from .supercell import Supercell
    from .finite_model import FiniteModel

import numpy as np
import tbmodels as tbm
import pythtb as ptb
import wannierberri as wberri
import scipy.sparse as sp
from warnings import warn

# Select the appropriate backend for parallelization
from .backends import get_backend
from .config import USE_MPI

# For backward compatibility with old PythTB versions
ptb_model = ptb.tb_model if ptb.__version__ < "2.0.0" else ptb.tb_model | ptb.TBModel


[docs] class Model: r"""A :python:`Model` class describing a tight-binding model in its unit cell.""" def __init__(self, model, spinful: bool = False, n_occ: int = None, **kwargs): r""" Create a model with a given tight-binding Hamiltonian in its unit cell. Supercells and finite models can be constructed from it, and global topological invariants can be computed. The class can be initialized from a tight-binding model defined in `TBmodels <https://tbmodels.greschd.ch/en/latest/>`_, `PythTB <https://pythtb.readthedocs.io/en/latest/>`_ or `WannierBerri <https://wannier-berri.readthedocs.io/en/latest/>`_, or via the appropriate constructor it is able to read `Wannier90 <https://wannier90.readthedocs.io/en/latest/>`_ files generated from an *ab initio* calculation. Parameters ---------- model : The tight-binding model from which the :python:`Model` instance is generated. This can be obtained from either a `TBmodels <https://tbmodels.greschd.ch/en/latest/>`_, a `PythTB <https://pythtb.readthedocs.io/en/latest/>`_, or a `WannierBerri <https://wannier-berri.readthedocs.io/en/latest/>`_ model instance. spinful : bool Whether the model is spinful or not. Default is :python:`False`. n_occ : Number of occupied states. This information is required if the model is obtained from Wannier functions generated by Wannier90 code starting an *ab initio* calculation. In all the other cases the number of occupied states can is optional and if :python:`None` half-filling is automatically assumed. kwargs : Additional keyword arguments. Available keyword arguments are: - ``file_spn`` (bool, default = :python:`False`): whether the file ``seedname.spn`` is provided if the model is obtained from Wannier functions generated by Wannier90 code starting from a :math:`\Gamma`-only *ab initio* calculation. This file contains the spin matrix elements in the basis of Wannier functions, which are needed to compute the spin Bott index for models with non-trivial spin textures. If this file is not provided, the spin matrix elements are computed in the basis of tight-binding orbitals, which are diagonal in the spin operator :math:`S_z`. This can lead to incorrect results for models with non-trivial spin textures. - ``Lx`` (int, default = 1): number of unit cells along the :math:`\mathbf{a}_1` direction. - ``Ly`` (int, default = 1): number of unit cells along the :math:`\mathbf{a}_2` direction. - ``debug_info`` (bool, default = :python:`False`): whether to print debug information during initialization. This can be useful for large models to track the progress of the initialization. Some keyword arguments are automatically set during the initialization and should not be overridden in general, as they are needed for the correct functioning of the class. These are: - ``states_uc`` (int, default = None): number of states in the unit cell. - ``disordered`` (bool, default = :python:`False`): whether the model is disordered or not. This is automatically set to :python:`True` when disorder is added to the model. - ``has_vacancies`` (bool, default = :python:`False`): whether the model has vacancies or not. This is automatically set to :python:`True` when vacancies are added to the model. """ # Validate allowed kwargs allowed_kwargs = { "file_spn", "Lx", "Ly", "debug_info", "states_uc", "disordered", "has_vacancies", "is_crystalline", } if any([kwarg not in allowed_kwargs for kwarg in kwargs]): raise ValueError(f"One or more invalid keyword arguments provided: {kwargs}.") # Store local variables self.model = model self.Lx, self.Ly = kwargs.get("Lx", 1), kwargs.get("Ly", 1) self.spinful = spinful self.n_occ = n_occ # Can still be None here self.file_spn = kwargs.get("file_spn", False) self.debug_info = kwargs.get("debug_info", False) # Needed to keep track of the original positions in case we need macroscopic # averaging with vacancies (set there, otherwise always stays None) self._cart_positions_original = None # Initialize class variables to avoid unbound attributes in the parallelized code for attr in ( "dim_k", "r", "lvecs", "uc_vol", "_mask", "reciprocal_vecs", "n_orb", "states_uc", "dim_r", ): setattr(self, attr, None) # Parse correctly initialization of the spin matrix on every rank if not self.spinful: self.sz = None else: self.sz = sp.csr_array([]) # Initialize an empty Hamiltonian on every rank self.hamiltonian = sp.csr_array([]) # Initialize state of the model self.disordered = kwargs.get("disordered", False) self.has_vacancies = kwargs.get("has_vacancies", False) self.is_crystalline = kwargs.get("is_crystalline", True) # Initialize the empty cartesian positions (needs to have ndim = 2) self.cart_positions = np.array([[None, None]]) # Choose backend self.backend, self.physics = get_backend(nblk=self.Lx) if self.backend.is_master_rank: # Dispatcher: routes to the correct internal parser based on type if isinstance(self.model, tbm.Model): self._load_tbmodels() elif isinstance(self.model, ptb_model): self._load_pythtb() elif isinstance(self.model, wberri.System_R): self._load_wberri() else: raise NotImplementedError("Invalid model instance.") # If provided, override the states_uc computed (needed in supercell and finite models) self.states_uc = kwargs.get("states_uc", self.states_uc) # Total number of orbitals in the model self.n_orb = self.states_uc * self.Lx * self.Ly # Validate the number of occupied states if self.n_occ is not None and (self.n_occ <= 0 or self.n_occ > self.n_orb): raise ValueError( f"Invalid number of occupied states: {self.n_occ}. It must be a" f" positive integer not greater than the number of orbitals {self.n_orb}." ) # Make the positions, lattice vectors, reciprocal lattice vectors, ... 3D self._fix_dimensionality() # Generate the position matrices self.r = np.array( [sp.csr_array(np.diag(xyz)) for xyz in self.cart_positions.T] ) # Initialize the spin matrix if needed if self.spinful and self.sz.size == 0: self.sz = sp.csr_array(np.diag([1, -1] * (self.n_orb // 2))) # Broadcast to all the ranks the relevant data for the model initialization for attr in ("n_occ", "n_orb", "dim_r", "states_uc", "reciprocal_vecs", "lvecs"): setattr(self, attr, self.backend.comm.bcast(getattr(self, attr), root=0)) # Create a shared array per node for attr in ("cart_positions", "_mask"): setattr(self, attr, self.backend.shared_array(getattr(self, attr))) ################################################# # Parse from 3rd party libraries ################################################# def _load_pythtb(self) -> None: r"""Parse model data from a PythTB model instance.""" from . import _pythtb self.states_uc = _pythtb.calc_states_uc(self.model, self.spinful) self.cart_positions = _pythtb.get_positions(self.model, self.spinful) self.dim_r = self.cart_positions[0, :].size # Expected dimension of the k-point in reciprocal space for the Hamiltonian self.dim_k = ( self.model._dim_k if hasattr(self.model, "_dim_k") else self.model.dim_k ) self.hamiltonian = sp.csr_array( _pythtb.get_hamiltonian( self.model, self.spinful, np.zeros(self.dim_k), self.dim_k ) ) # TODO: can we have a k-dependent hamiltonian in the unit cell? try: self.lvecs = np.copy(self.model.get_lat_vecs()) except AttributeError: self.lvecs = np.copy(self.model._lat) self.reciprocal_vecs = _pythtb._reciprocal_vec(self.model) self.uc_vol = _pythtb.calc_uc_vol(self.model) if self.n_occ is None: self.n_occ = _pythtb.get_half_filling(self.model, self.spinful) self._mask = _pythtb.initialize_mask(self.model, self.spinful) def _load_tbmodels(self) -> None: r"""Parse model data from a TBmodels model instance.""" from . import _tbmodels self.states_uc = _tbmodels.calc_states_uc(self.model) self.cart_positions = _tbmodels.get_positions(self.model) self.dim_r = self.cart_positions[0, :].size # Expected dimension of the k-point in reciprocal space for the Hamiltonian self.dim_k = self.cart_positions[0, :].size self.hamiltonian = sp.csr_array( _tbmodels.get_hamiltonian(self.model, np.zeros(self.dim_k)) ) # TODO: can we have a k-dependent hamiltonian in the unit cell? self.lvecs = self.model.uc self.reciprocal_vecs = _tbmodels._reciprocal_vec(self.model) self.uc_vol = _tbmodels.calc_uc_vol(self.model) if self.n_occ is None: self.n_occ = _tbmodels.get_half_filling(self.model) self._mask = _tbmodels.initialize_mask(self.model) def _load_wberri(self) -> None: r"""Parse model data from a WannierBerri model instance.""" from . import _wberri self.states_uc = _wberri.calc_states_uc(self.model) self.cart_positions = _wberri.get_positions(self.model) self.dim_r = self.cart_positions[0, :].size # Expected dimension of the k-point in reciprocal space for the Hamiltonian self.dim_k = 3 # TODO: not sure here self.hamiltonian, wberri_data = _wberri.get_hamiltonian(self.model) self.hamiltonian = sp.csr_array( self.hamiltonian ) # TODO: can we have a k-dependent hamiltonian in the unit cell? self.lvecs = self.model.real_lattice self.reciprocal_vecs = _wberri._reciprocal_vec(self.model) self.uc_vol = _wberri.calc_uc_vol(self.model) if self.n_occ is None: raise NotImplementedError("Provide number of occupied states as n_occ.") self._mask = _wberri.calc_states_uc(self.model) if self.spinful and self.file_spn: self.sz = sp.csr_array(_wberri.read_spn(self.model, wberri_data)) self.wannier = True def _fix_dimensionality(self) -> None: r""" Fix the dimensionality of the model to 3D by adding dummy coordinates, lattice vectors and reciprocal lattice vectors if needed. This is needed for compatibility with Wannier90 models, which are always 3D. """ # Already ok if self.dim_r == 3: return # Along the non-valid directions everything is zero self.cart_positions = np.hstack( ( self.cart_positions, np.zeros((self.cart_positions.shape[0], 3 - self.dim_r)), ) ) # Dummy third axis is along z lvecs = np.eye(3) try: lvecs[: self.dim_r, : self.dim_r] = self.lvecs self.lvecs = lvecs except ValueError: # On parsing supercells and finite models, the lattice vectors are fixed pass # Dummy third axis is along z reciprocal_vecs = np.eye(3) try: reciprocal_vecs[: self.dim_r, : self.dim_r] = np.asarray( self.reciprocal_vecs ).squeeze() self.reciprocal_vecs = reciprocal_vecs except ValueError: # On parsing supercells and finite models, the lattice vectors are fixed pass # Now we have effectively 3 real space dimensions self.dim_r = 3 ################################################# # Create ab initio models from Wannier90 outputs #################################################
[docs] @classmethod def from_wannier90( cls, seedname: str, path: str = ".", engine: str = "tbmodels", n_occ: int = None, **kwargs, ) -> Model: r"""Create a :python:`Model` instance from Wannier90 output files. The Wannier90 seedname (without extension) and the path to the files must be provided. The engine used to read the Wannier90 files can be specified via the ``engine`` argument, which can take values ``wannierberri``, ``tbmodels`` or ``pythtb``. The default is ``tbmodels``. Parameters ---------- seedname : str Wannier90 seedname. path : str Path to the Wannier90 output files. engine : str Engine used to read the Wannier90 files. Can be either ``wannierberri``, ``tbmodels`` or ``pythtb``. Default is ``tbmodels``. n_occ : int Number of occupied states. If :python:`None`, the number of occupied states is read from the model, if available (half-filling is usually assumed). If the model does not contain information about the number of occupied states, a :python:`RuntimeError` is raised and the number of occupied states must be provided as input here. This is the case in TBmodels for instance. kwargs : Additional keyword arguments passed to the constructor of :python:`Model` and to the WannierBerri's System_w90 when ``engine`` is set to ``wannierberri``. Returns ------- Model A :python:`Model` instance generated from the Wannier90 output files. """ # Detect parallel environment try: from .backends import MPI is_master_rank = MPI.COMM_WORLD.Get_rank() == 0 except (AttributeError, ImportError): is_master_rank = True engine = engine.lower() if engine not in ("wannierberri", "tbmodels", "pythtb"): raise ValueError(f"Invalid engine specified: {engine}.") # If the .SPN file is needed, use wannierberri by default if kwargs.get("spn_file", False): engine = "wannierberri" spinful = kwargs.pop("spinful", False) sz = None # Initialize the model depending on the required engine if is_master_rank: if engine == "tbmodels": model = tbm.Model.from_wannier_files( hr_file=f"{path}/{seedname}_hr.dat", wsvec_file=f"{path}/{seedname}_wsvec.dat", xyz_file=f"{path}/{seedname}_centres.xyz", win_file=f"{path}/{seedname}.win", sparse=True, ) elif engine == "pythtb": if ptb.__version__ < "2.0.0": model = ptb.w90(path, seedname).model() else: model = ptb.W90(path, seedname).model() elif engine == "wannierberri": from ._wberri import read_spn_from_seedname, get_model_wberri if spinful: model, spinmats = read_spn_from_seedname( f"{path}/{seedname}", spin=spinful, return_model=True, **kwargs ) sz = spinmats[0, :, :, 2] else: model = get_model_wberri(f"{path}/{seedname}", spin=spinful, **kwargs) else: raise RuntimeError("Invalid engine to build the Wannier90 model.") else: model = None # If the .SPN is needed, do not recompute it in the class constructor model_kwargs = { "Lx": kwargs.get("Lx", 1), "Ly": kwargs.get("Ly", 1), "debug_info": kwargs.get("debug_info", False), } class_model = cls(model, file_spn=False, n_occ=n_occ, **model_kwargs) # Only True if .SPN is provided, the model is spinful and is read from wannierberri if sz is not None and is_master_rank: class_model.sz = sp.csr_array(sz) class_model.file_spn = kwargs.get("file_spn", False) # Share the model instance class_model.model = class_model.backend.comm.bcast(class_model.model, root=0) return class_model
################################################# # Spawn subclasses #################################################
[docs] def make_supercell(self, Lx: int, Ly: int, n_occ: int = None) -> Supercell: r"""Create a supercell given of the current model by repeating the unit cell, allowing to compute single-point invariants and local geometrical and topological markers. Parameters ---------- Lx : int Number of unit cells repeated along the :math:`\mathbf{a}_1` direction in the supercell. Ly : int Number of unit cells repeated along the :math:`\mathbf{a}_2` direction in the supercell. n_occ : int Number of occupied states. If :python:`None`, this is rescaled from the currently stored value depending on the size of the supercell, otherwise it is overridden by the provided value. Returns ------- Supercell A :python:`Supercell` instance generated from the current model. """ from .supercell import Supercell if n_occ is None: n_occ = self.n_occ * Lx * Ly if self.file_spn and self.spinful and isinstance(self.model, wberri.System_R): raise NotImplementedError( "The extension of the .SPN file to a supercell is " "not implemented yet, so the supercell cannot be generated." ) return Supercell(model=self, Lx=Lx, Ly=Ly, n_occ=n_occ)
[docs] def make_finite(self, Lx: int, Ly: int, n_occ: int = None) -> FiniteModel: r"""Create a finite system given of the current model by repeating the unit cell, allowing to compute single-point invariants and local geometrical and topological markers. Parameters ---------- Lx : int Number of unit cells repeated along the :math:`\mathbf{a}_1` direction in the supercell. Ly : int Number of unit cells repeated along the :math:`\mathbf{a}_2` direction in the supercell. n_occ : int Number of occupied states. If :python:`None`, this is rescaled from the currently stored value depending on the size of the supercell, otherwise it is overridden by the provided value. Returns ------- FiniteModel A :python:`FiniteModel` instance generated from the current model. """ from .finite_model import FiniteModel if n_occ is None: n_occ = self.n_occ * Lx * Ly if self.file_spn and self.spinful and isinstance(self.model, wberri.System_R): raise NotImplementedError( "The extension of the .SPN file to a finite model is " "not implemented yet, so the finite model cannot be generated." ) return FiniteModel(model=self, Lx=Lx, Ly=Ly, n_occ=n_occ)
################################################# # Make disordered systems and heterostructures #################################################
[docs] def add_disorder_uniform( self, w: float = 0, on_site: bool = True, seed: int = None ) -> np.ndarray: r"""Add on-site uniform Anderson disorder to the model. Anderson disorder consists in a diagonal term in the Hamiltonian :math:`\mathcal H_{Anderson}=\sum_i w^{}_i c_i^{\dagger}c^{}_i` where :math:`w^{}_i` are random variables uniformly distributed in the interval :math:`\bigl[-\frac{W}{2},\frac{W}{2} \bigr]`. Parameters ---------- w : Disorder amplitude, defining the extrema of the interval :math:`\bigl[-\frac{W}{2},\frac{W}{2} \bigr]`. Default is ``0``. seed : Seed to be used in the random number generation. Default is :python:`None`, which result in the default Numpy seed. on_site : Whether to apply same random disorder on orbitals located on the same physical site. Default is True. Returns ------- np.ndarray Array containing the random disorder values added to the Hamiltonian. .. warning:: The current instance of the model will be modified. """ from .utils import _create_orb_pattern if w != 0: self.disordered = True # The Hamiltonian is stored in the master rank only if not self.backend.is_master_rank: return # Set the seed for the random number generator if seed is not None: np.random.seed(seed) d = [] if self.backend.is_master_rank: if on_site: pattern = _create_orb_pattern(self.cart_positions[: self.states_uc]) num_cycles = (self.n_orb + len(pattern) - 1) // len(pattern) rands_ = [] for _ in range(num_cycles): # Generate new random numbers for this cycle, one per unique pattern value rand_map = {p: np.random.rand() for p in set(pattern)} for i in range(len(pattern)): if len(rands_) < self.n_orb: rands_.append(rand_map[pattern[i]]) d = 0.5 * w * (2 * np.array(rands_) - 1.0) else: d = 0.5 * w * (2 * np.random.rand(self.n_orb) - 1.0) # The Hamiltonian is sparse to save disk space self.hamiltonian += sp.diags_array(d, format="csr") return d
[docs] def add_disorder_bernoulli( self, w: float = 0, on_site: bool = True, seed: int = None ) -> np.ndarray: r"""Add on-site Bernoulli disorder to the model. Bernoulli disorder consists in a diagonal term in the Hamiltonian :math:`\mathcal H_{Bernoulli}=\sum_i w_i c_i^{\dagger}c_i` where :math:`w_i` are random variables with random sign of the strength `\frac{W}{2}`. Parameters ---------- w : Disorder amplitude for the random variables with random signs. The random number can only take two values, `\pm \frac{W}{2}`. Default is ``0``. seed : Seed to be used in the random number generation. Default is :python:`None`, which result in the default Numpy seed. on_site : Whether to apply same random disorder on orbitals located on the same physical site. Default is :python:`True`. Returns ------- np.ndarray Array containing the random disorder values added to the Hamiltonian. .. warning:: The current instance of the model will be modified. """ from .utils import _create_orb_pattern if w != 0: self.disordered = True # The Hamiltonian is stored in the master rank only if not self.backend.is_master_rank: return # Set the seed for the random number generator if seed is not None: np.random.seed(seed) d = [] if self.backend.is_master_rank: if on_site: pattern = _create_orb_pattern(self.cart_positions[: self.states_uc]) num_cycles = (self.n_orb + len(pattern) - 1) // len(pattern) rands_ = [] for _ in range(num_cycles): # Generate new random numbers for this cycle, one per unique pattern value rand_map = {p: np.random.randint(0, 2) for p in set(pattern)} for i in range(len(pattern)): if len(rands_) < self.n_orb: rands_.append(rand_map[pattern[i]]) d = 0.5 * w * (2 * np.array(rands_) - 1.0) else: d = 0.5 * w * (2 * np.random.randint(0, 2, size=(self.n_orb)) - 1) # The Hamiltonian is sparse to save disk space self.hamiltonian += sp.diags_array(d, format="csr") return d
[docs] def add_vacancies(self, vacancies_list: tuple | list) -> None: r"""Add vacancies in the system by removing a site in the lattice. Parameters ---------- vacancies_list: A list of [cell_x, cell_y, basis] pointing to the cell and atom to be removed. Multiple values at once are allowed. For instance, consider a :math:`L_x\times L_y` lattice with a basis of :math:`N_b` atoms, and :math:`M` sites to be removed. Then, the list :python:`[[np.random.randint(L_x), np.random.randint(L_y), np.random.randint(N_b)] for _ in range(M)]` can be given as argument. See also :func:`.utils.unique_vacancies` for a function that returns a list of unique vacancies. .. note:: If the model is not spinful, an even number of vacancies may be required for the system to be gapped. .. warning:: The current instance of the model will be modified. """ # The Hamiltonian and all the quantities are stored in the master rank only if self.backend.is_master_rank: # Convert vacancies_list to internal indexing if np.array(vacancies_list).shape == (3,): vacancies = self._xy_to_index( vacancies_list[0], vacancies_list[1], vacancies_list[2] ) else: vacancies = [] for hl in vacancies_list: vacancies.append(self._xy_to_index(hl[0], hl[1], hl[2])) # Remove the selected sites if there are any vacancies if np.asarray(vacancies).size > 0: self._mask[vacancies] = False if not self.spinful and len(vacancies) % 2 != 0: warn( "Adding an odd number of vacancies causes the model to no longer be half-filled." ) elif self.spinful: # Count also the opposite spin self._mask[np.asarray(vacancies) + 1] = False self.hamiltonian = self.hamiltonian[:, self._mask][self._mask, :] self.r = np.array([r[:, self._mask][self._mask, :] for r in self.r]) self._cart_positions_original = self.cart_positions.copy() self.cart_positions = self.cart_positions[self._mask, :] self.n_occ -= int(len(vacancies) * (0.5 if not self.spinful else 1)) self.n_orb -= int(len(vacancies) * (1 if not self.spinful else 2)) self.has_vacancies = True if self.spinful: self.sz = self.sz[:, self._mask][self._mask, :] # Broadcast to all the ranks to update for attr in ("n_occ", "n_orb", "has_vacancies"): setattr(self, attr, self.backend.comm.bcast(getattr(self, attr), root=0)) # Create a shared array per node for attr in ("cart_positions", "_cart_positions_original", "_mask"): setattr(self, attr, self.backend.shared_array(getattr(self, attr)))
[docs] def make_heterostructure( self, model2: Model, direction: int = 0, start: int = 0, stop: int = 0 ) -> None: r"""Create a heterostructure starting from another model. The system will be split in the direction :python:`direction` starting from the unit cell whose index is :python:`start` and stopping at the cell indexed as :python:`end`. The two models must have the same lattice parameters and lattice sites. Parameters ---------- model2 : An instance of :python:`strawberrypy.Model` representing the mdoel whose Hamiltonian has to be used when constructing the heterostructure. direction : Direction in which the splitting is realized, allowed values are ``0`` for the :math:`\mathbf{a}_1` direction or ``1`` for the :math:`\mathbf{a}_2` direction. Default is ``0``. start : Starting point for the splitting in the :python:`direction` direction. Default is ``0``. end : End point of the splitting in the :python:`direction` direction. Default is ``0``. .. warning:: The current instance of the model will be modified. """ # The Hamiltonian is stored in the master rank only if not self.backend.is_master_rank: return # Ensure start <= stop if start > stop: tmp = start start = stop stop = tmp # Check input data are ok if not (start >= 0 and start < (self.Lx if direction == 0 else self.Ly)): raise RuntimeError("Start point value not allowed.") if not (stop > 0 and stop < (self.Lx if direction == 0 else self.Ly)): raise RuntimeError("End point value not allowed.") if direction not in [0, 1]: raise RuntimeError("Direction not allowed: insert 0 for 'x' and 1 for 'y'.") if not isinstance(model2, Model): raise RuntimeError( "The two models must be instances of Model, Supercell or FiniteModel." ) if not ( self.Lx == model2.Lx and self.Ly == model2.Ly and np.allclose( [r.toarray() for r in self.r], [r.toarray() for r in model2.r] ) ): raise RuntimeError("You can only build heterostructures of the same model.") # Generate the Hamiltonian of the second model hamilt_model2 = model2.hamiltonian.copy() # Check if only the on-site terms are changed (if so, avoid checking all the matrix elements) onsite_only = False if np.allclose( (self.hamiltonian - sp.diags_array(self.hamiltonian.diagonal())).toarray(), (hamilt_model2 - sp.diags_array(hamilt_model2.diagonal())).toarray(), ): onsite_only = True # Remove on-site terms from the model onsite1 = self.hamiltonian.diagonal().copy() onsite2 = hamilt_model2.diagonal().copy() self.hamiltonian.setdiag(0) if direction == 0: # Splitting along the x direction ind = np.array( [ [ (start + i) * self.Ly * self.states_uc + j * self.states_uc for j in range(self.Ly) ] for i in range(stop - start + 1) ] ).flatten() else: # Splitting along the y direction ind = np.array( [ [ i * self.Ly * self.states_uc + start * self.states_uc + j * self.states_uc for j in range(stop - start + 1) ] for i in range(self.Lx) ] ).flatten() for i in ind: for j in range(self.states_uc): onsite1[i + j] = onsite2[i + j] # Add the new on-site terms self.hamiltonian.setdiag(onsite1) # If other matrix element are changed if not onsite_only: # Indices of every atom in the selected cells, not only of the initial atom of the cell indices = np.array( [[i + j for j in range(self.states_uc) if self._mask[i + j]] for i in ind] ).flatten() # The full list of on sites, to avoid reset later onsites_all = self.hamiltonian.diagonal() self.hamiltonian = self.hamiltonian.tolil() hamilt_model2_lil = hamilt_model2.tolil() # Replace all selected rows self.hamiltonian[indices, :] = hamilt_model2_lil[indices, :] # Restore the correct diagonal self.hamiltonian.setdiag(onsites_all) # Filter out small amplitudes and clean up zeros self.hamiltonian = sp.csr_array(self.hamiltonian) # Switch back to CSR self.hamiltonian.data[np.abs(self.hamiltonian.data) < 1e-10] = 0 self.hamiltonian.eliminate_zeros()
################################################# # Utility functions ################################################# def _xy_to_index(self, cellx: int, celly: int, basis: int): r"""Convert :python:`[cell_x, cell_y, basis]` to the internal indexing of the lattice sites. Parameters ---------- cellx : Index of the unit cell along the :math:`\mathbf{a}_1` direction. celly : Index of the unit cell along the :math:`\mathbf{a}_2` direction. basis : Index of the atom in the basis of the lattice. This index is defined when creating the model. Returns ------- index : The index of an atom in the lattice according to the internal indexing. .. note:: The conventional internal indexing is the one also used in PythTB and TBmodels when dealing with supercells: the first index is the bottom-left corner of the lattice and the last one is the top-right. The indices increase first along the :math:`\mathbf{a}_2` direction, then along :math:`\mathbf{a}_1`. """ return ( self.Ly * self.states_uc * cellx + self.states_uc * celly + basis * (2 if self.spinful else 1) ) def _xy_to_line(self, fixed_coordinate: str, xy: int): r"""Returns the indices of the sites of the lattice along the fixed direction :python:`fixed_coordinate` starting from :python:`xy`. Parallelized using MPI by distributing iterations across ranks and gathering results. Parameters ---------- fixed_coordinate : Coordinate that has to be kept fixed. If one is interested in the indices along the :math:`\mathbf{a}_1` direction, this has to be set :python:`'y'` else :python:`'x'`. xy : Index of the unit cell along the direction :python:`fixed_coordiante` from which the indices are returned. Returns ------- index_list : The list of indices of the atoms in the lattice keeping fixed :python:`fixed_direction` from the cell :python:`xy`. """ if fixed_coordinate == "x": # Parallelize by distributing loop iterations across ranks n_iter = self.states_uc * self.Ly start = self.backend.mpi_rank * (n_iter // self.backend.mpi_size) + min( self.backend.mpi_rank, n_iter % self.backend.mpi_size ) end = ( start + (n_iter // self.backend.mpi_size) + (1 if self.backend.mpi_rank < (n_iter % self.backend.mpi_size) else 0) ) local_indices = [] for i in range(start, end): idx = self.states_uc * xy * self.Ly + i if self._mask[idx]: local_indices.append(idx) # Gather all partial results to rank 0 gathered = self.backend.comm.gather(local_indices, root=0) if self.backend.mpi_rank == 0: # Flatten and sort by global index to maintain order all_indices = [] for idx_list in gathered: all_indices.extend(idx_list) all_indices.sort() result = np.array(all_indices) else: result = None # Broadcast result to all ranks result = self.backend.comm.bcast(result, root=0) return result elif fixed_coordinate == "y": # Parallelize by distributing Lx iterations across ranks start = self.backend.mpi_rank * (self.Lx // self.backend.mpi_size) + min( self.backend.mpi_rank, self.Lx % self.backend.mpi_size ) end = ( start + (self.Lx // self.backend.mpi_size) + (1 if self.backend.mpi_rank < (self.Lx % self.backend.mpi_size) else 0) ) local_indices = [] for i in range(start, end): for j in range(self.states_uc): idx = self.states_uc * xy + self.states_uc * self.Ly * i + j if self._mask[idx]: local_indices.append(idx) # Gather all partial results to rank 0 gathered = self.backend.comm.gather(local_indices, root=0) if self.backend.mpi_rank == 0: # Flatten and sort by global index to maintain order all_indices = [] for idx_list in gathered: all_indices.extend(idx_list) all_indices.sort() result = np.array(all_indices) else: result = None # Broadcast result to all ranks result = self.backend.comm.bcast(result, root=0) return result else: raise RuntimeError("Direction not allowed, only 'x' or 'y'.") ################################################# # Extract topological markers on lines or grids ################################################# def _extract_line_marker( self, marker: np.ndarray, direction: int, start: int, macroscopic_average: bool = False, ) -> np.ndarray: r"""Extract the value of a topological marker along a line of the lattice. Parameters ---------- marker : The topological marker to be extracted, as a list of values per lattice site. direction : Direction along which the line is defined, allowed values are ``0`` for the :math:`\mathbf{a}_1` direction or ``1`` for the :math:`\mathbf{a}_2` direction. Default is ``0``. start : Index of the unit cell along the direction :python:`direction` from which the line starts. Default is ``0``. macroscopic_average : Whether the values of the topological marker along the line are averaged according to a certain cutoff in real space. Default is :python:`False`, which means that the value of the marker on each lattice site along the line is returned without averaging. Returns ------- line_marker : The list of values of the topological marker along the line defined by the parameters. """ # Evaluate index of the selected direction indices = self._xy_to_line("x" if direction == 1 else "y", start) if macroscopic_average: n_line = len(indices) else: n_line = int(len(indices) / self.states_uc) start_blk = self.backend.mpi_rank * (n_line // self.backend.mpi_size) + min( self.backend.mpi_rank, n_line % self.backend.mpi_size ) end = ( start_blk + (n_line // self.backend.mpi_size) + (1 if self.backend.mpi_rank < (n_line % self.backend.mpi_size) else 0) ) local_marker_line = np.zeros(n_line, dtype=np.float64) # If macroscopic_average the topological operator does not need an addditional average if macroscopic_average: for i in range(start_blk, end): local_marker_line[i] = marker[indices[i]] else: for i in range(start_blk, end): local_marker_line[i] = np.sum( [ marker[indices[self.states_uc * i + j]] for j in range(self.states_uc) ] ) if USE_MPI: marker_line = np.zeros(n_line, dtype=np.float64) self.backend.comm.Allreduce(local_marker_line, marker_line) else: marker_line = local_marker_line return np.array(marker_line) def _trace_unit_cell(self, marker: np.ndarray) -> np.ndarray: r"""Compute the contraction over the unit cell of the topological marker. Parameters ---------- marker : The topological marker to be traced, as a list of values per lattice site. Returns ------- lattice_marker : The list of values of the topological marker on the unit cells of the lattice. """ n_cells = int(len(marker) / self.states_uc) # Parallel reduction of the unit-cell marker over MPI ranks start_blk = self.backend.mpi_rank * (n_cells // self.backend.mpi_size) + min( self.backend.mpi_rank, n_cells % self.backend.mpi_size ) end = ( start_blk + (n_cells // self.backend.mpi_size) + (1 if self.backend.mpi_rank < (n_cells % self.backend.mpi_size) else 0) ) marker_cell = np.zeros(n_cells, dtype=np.float64) for i in range(start_blk, end): base = self.states_uc * i marker_cell[i] = np.sum(marker[base : base + self.states_uc]) lattice_marker = np.zeros(n_cells, dtype=np.float64) if USE_MPI: self.backend.comm.Allreduce(marker_cell, lattice_marker) else: lattice_marker = marker_cell # Repeat to ensure that the dimension of the marker matches the dimension of # the position matrices, since if not macroscopic_average the value of the # marker is defined per unit cell return np.repeat(lattice_marker, self.states_uc)