from __future__ import annotations
from typing import TYPE_CHECKING
# This is ignored at runtime, used only for type hinting
if TYPE_CHECKING:
from .supercell import Supercell
from .finite_model import FiniteModel
import numpy as np
import tbmodels as tbm
import pythtb as ptb
import wannierberri as wberri
import scipy.sparse as sp
from warnings import warn
# Select the appropriate backend for parallelization
from .backends import get_backend
from .config import USE_MPI
# For backward compatibility with old PythTB versions
ptb_model = ptb.tb_model if ptb.__version__ < "2.0.0" else ptb.tb_model | ptb.TBModel
[docs]
class Model:
r"""A :python:`Model` class describing a tight-binding model in its unit cell."""
def __init__(self, model, spinful: bool = False, n_occ: int = None, **kwargs):
r"""
Create a model with a given tight-binding Hamiltonian in its unit cell. Supercells
and finite models can be constructed from it, and global topological invariants can
be computed. The class can be initialized from a tight-binding model defined in
`TBmodels <https://tbmodels.greschd.ch/en/latest/>`_, `PythTB <https://pythtb.readthedocs.io/en/latest/>`_
or `WannierBerri <https://wannier-berri.readthedocs.io/en/latest/>`_, or via the appropriate
constructor it is able to read `Wannier90 <https://wannier90.readthedocs.io/en/latest/>`_
files generated from an *ab initio* calculation.
Parameters
----------
model :
The tight-binding model from which the :python:`Model` instance is generated.
This can be obtained from either a `TBmodels <https://tbmodels.greschd.ch/en/latest/>`_, a
`PythTB <https://pythtb.readthedocs.io/en/latest/>`_, or a
`WannierBerri <https://wannier-berri.readthedocs.io/en/latest/>`_ model instance.
spinful : bool
Whether the model is spinful or not. Default is :python:`False`.
n_occ :
Number of occupied states. This information is required if the model is obtained
from Wannier functions generated by Wannier90 code starting an *ab initio*
calculation. In all the other cases the number of occupied states can is optional
and if :python:`None` half-filling is automatically assumed.
kwargs :
Additional keyword arguments. Available keyword arguments are:
- ``file_spn`` (bool, default = :python:`False`): whether the file ``seedname.spn`` is provided if the model is obtained from Wannier functions generated by Wannier90 code starting from a :math:`\Gamma`-only *ab initio* calculation. This file contains the spin matrix elements in the basis of Wannier functions, which are needed to compute the spin Bott index for models with non-trivial spin textures. If this file is not provided, the spin matrix elements are computed in the basis of tight-binding orbitals, which are diagonal in the spin operator :math:`S_z`. This can lead to incorrect results for models with non-trivial spin textures.
- ``Lx`` (int, default = 1): number of unit cells along the :math:`\mathbf{a}_1` direction.
- ``Ly`` (int, default = 1): number of unit cells along the :math:`\mathbf{a}_2` direction.
- ``debug_info`` (bool, default = :python:`False`): whether to print debug information during initialization. This can be useful for large models to track the progress of the initialization.
Some keyword arguments are automatically set during the initialization and
should not be overridden in general, as they are needed for the correct
functioning of the class. These are:
- ``states_uc`` (int, default = None): number of states in the unit cell.
- ``disordered`` (bool, default = :python:`False`): whether the model is disordered or not. This is automatically set to :python:`True` when disorder is added to the model.
- ``has_vacancies`` (bool, default = :python:`False`): whether the model has vacancies or not. This is automatically set to :python:`True` when vacancies are added to the model.
"""
# Validate allowed kwargs
allowed_kwargs = {
"file_spn",
"Lx",
"Ly",
"debug_info",
"states_uc",
"disordered",
"has_vacancies",
"is_crystalline",
}
if any([kwarg not in allowed_kwargs for kwarg in kwargs]):
raise ValueError(f"One or more invalid keyword arguments provided: {kwargs}.")
# Store local variables
self.model = model
self.Lx, self.Ly = kwargs.get("Lx", 1), kwargs.get("Ly", 1)
self.spinful = spinful
self.n_occ = n_occ # Can still be None here
self.file_spn = kwargs.get("file_spn", False)
self.debug_info = kwargs.get("debug_info", False)
# Needed to keep track of the original positions in case we need macroscopic
# averaging with vacancies (set there, otherwise always stays None)
self._cart_positions_original = None
# Initialize class variables to avoid unbound attributes in the parallelized code
for attr in (
"dim_k",
"r",
"lvecs",
"uc_vol",
"_mask",
"reciprocal_vecs",
"n_orb",
"states_uc",
"dim_r",
):
setattr(self, attr, None)
# Parse correctly initialization of the spin matrix on every rank
if not self.spinful:
self.sz = None
else:
self.sz = sp.csr_array([])
# Initialize an empty Hamiltonian on every rank
self.hamiltonian = sp.csr_array([])
# Initialize state of the model
self.disordered = kwargs.get("disordered", False)
self.has_vacancies = kwargs.get("has_vacancies", False)
self.is_crystalline = kwargs.get("is_crystalline", True)
# Initialize the empty cartesian positions (needs to have ndim = 2)
self.cart_positions = np.array([[None, None]])
# Choose backend
self.backend, self.physics = get_backend(nblk=self.Lx)
if self.backend.is_master_rank:
# Dispatcher: routes to the correct internal parser based on type
if isinstance(self.model, tbm.Model):
self._load_tbmodels()
elif isinstance(self.model, ptb_model):
self._load_pythtb()
elif isinstance(self.model, wberri.System_R):
self._load_wberri()
else:
raise NotImplementedError("Invalid model instance.")
# If provided, override the states_uc computed (needed in supercell and finite models)
self.states_uc = kwargs.get("states_uc", self.states_uc)
# Total number of orbitals in the model
self.n_orb = self.states_uc * self.Lx * self.Ly
# Validate the number of occupied states
if self.n_occ is not None and (self.n_occ <= 0 or self.n_occ > self.n_orb):
raise ValueError(
f"Invalid number of occupied states: {self.n_occ}. It must be a"
f" positive integer not greater than the number of orbitals {self.n_orb}."
)
# Make the positions, lattice vectors, reciprocal lattice vectors, ... 3D
self._fix_dimensionality()
# Generate the position matrices
self.r = np.array(
[sp.csr_array(np.diag(xyz)) for xyz in self.cart_positions.T]
)
# Initialize the spin matrix if needed
if self.spinful and self.sz.size == 0:
self.sz = sp.csr_array(np.diag([1, -1] * (self.n_orb // 2)))
# Broadcast to all the ranks the relevant data for the model initialization
for attr in ("n_occ", "n_orb", "dim_r", "states_uc", "reciprocal_vecs", "lvecs"):
setattr(self, attr, self.backend.comm.bcast(getattr(self, attr), root=0))
# Create a shared array per node
for attr in ("cart_positions", "_mask"):
setattr(self, attr, self.backend.shared_array(getattr(self, attr)))
#################################################
# Parse from 3rd party libraries
#################################################
def _load_pythtb(self) -> None:
r"""Parse model data from a PythTB model instance."""
from . import _pythtb
self.states_uc = _pythtb.calc_states_uc(self.model, self.spinful)
self.cart_positions = _pythtb.get_positions(self.model, self.spinful)
self.dim_r = self.cart_positions[0, :].size
# Expected dimension of the k-point in reciprocal space for the Hamiltonian
self.dim_k = (
self.model._dim_k if hasattr(self.model, "_dim_k") else self.model.dim_k
)
self.hamiltonian = sp.csr_array(
_pythtb.get_hamiltonian(
self.model, self.spinful, np.zeros(self.dim_k), self.dim_k
)
) # TODO: can we have a k-dependent hamiltonian in the unit cell?
try:
self.lvecs = np.copy(self.model.get_lat_vecs())
except AttributeError:
self.lvecs = np.copy(self.model._lat)
self.reciprocal_vecs = _pythtb._reciprocal_vec(self.model)
self.uc_vol = _pythtb.calc_uc_vol(self.model)
if self.n_occ is None:
self.n_occ = _pythtb.get_half_filling(self.model, self.spinful)
self._mask = _pythtb.initialize_mask(self.model, self.spinful)
def _load_tbmodels(self) -> None:
r"""Parse model data from a TBmodels model instance."""
from . import _tbmodels
self.states_uc = _tbmodels.calc_states_uc(self.model)
self.cart_positions = _tbmodels.get_positions(self.model)
self.dim_r = self.cart_positions[0, :].size
# Expected dimension of the k-point in reciprocal space for the Hamiltonian
self.dim_k = self.cart_positions[0, :].size
self.hamiltonian = sp.csr_array(
_tbmodels.get_hamiltonian(self.model, np.zeros(self.dim_k))
) # TODO: can we have a k-dependent hamiltonian in the unit cell?
self.lvecs = self.model.uc
self.reciprocal_vecs = _tbmodels._reciprocal_vec(self.model)
self.uc_vol = _tbmodels.calc_uc_vol(self.model)
if self.n_occ is None:
self.n_occ = _tbmodels.get_half_filling(self.model)
self._mask = _tbmodels.initialize_mask(self.model)
def _load_wberri(self) -> None:
r"""Parse model data from a WannierBerri model instance."""
from . import _wberri
self.states_uc = _wberri.calc_states_uc(self.model)
self.cart_positions = _wberri.get_positions(self.model)
self.dim_r = self.cart_positions[0, :].size
# Expected dimension of the k-point in reciprocal space for the Hamiltonian
self.dim_k = 3 # TODO: not sure here
self.hamiltonian, wberri_data = _wberri.get_hamiltonian(self.model)
self.hamiltonian = sp.csr_array(
self.hamiltonian
) # TODO: can we have a k-dependent hamiltonian in the unit cell?
self.lvecs = self.model.real_lattice
self.reciprocal_vecs = _wberri._reciprocal_vec(self.model)
self.uc_vol = _wberri.calc_uc_vol(self.model)
if self.n_occ is None:
raise NotImplementedError("Provide number of occupied states as n_occ.")
self._mask = _wberri.calc_states_uc(self.model)
if self.spinful and self.file_spn:
self.sz = sp.csr_array(_wberri.read_spn(self.model, wberri_data))
self.wannier = True
def _fix_dimensionality(self) -> None:
r"""
Fix the dimensionality of the model to 3D by adding dummy coordinates, lattice
vectors and reciprocal lattice vectors if needed. This is needed for compatibility
with Wannier90 models, which are always 3D.
"""
# Already ok
if self.dim_r == 3:
return
# Along the non-valid directions everything is zero
self.cart_positions = np.hstack(
(
self.cart_positions,
np.zeros((self.cart_positions.shape[0], 3 - self.dim_r)),
)
)
# Dummy third axis is along z
lvecs = np.eye(3)
try:
lvecs[: self.dim_r, : self.dim_r] = self.lvecs
self.lvecs = lvecs
except ValueError:
# On parsing supercells and finite models, the lattice vectors are fixed
pass
# Dummy third axis is along z
reciprocal_vecs = np.eye(3)
try:
reciprocal_vecs[: self.dim_r, : self.dim_r] = np.asarray(
self.reciprocal_vecs
).squeeze()
self.reciprocal_vecs = reciprocal_vecs
except ValueError:
# On parsing supercells and finite models, the lattice vectors are fixed
pass
# Now we have effectively 3 real space dimensions
self.dim_r = 3
#################################################
# Create ab initio models from Wannier90 outputs
#################################################
[docs]
@classmethod
def from_wannier90(
cls,
seedname: str,
path: str = ".",
engine: str = "tbmodels",
n_occ: int = None,
**kwargs,
) -> Model:
r"""Create a :python:`Model` instance from Wannier90 output files. The Wannier90
seedname (without extension) and the path to the files must be provided. The engine
used to read the Wannier90 files can be specified via the ``engine`` argument,
which can take values ``wannierberri``, ``tbmodels`` or ``pythtb``. The default
is ``tbmodels``.
Parameters
----------
seedname : str
Wannier90 seedname.
path : str
Path to the Wannier90 output files.
engine : str
Engine used to read the Wannier90 files. Can be either ``wannierberri``,
``tbmodels`` or ``pythtb``. Default is ``tbmodels``.
n_occ : int
Number of occupied states. If :python:`None`, the number of occupied states
is read from the model, if available (half-filling is usually assumed). If
the model does not contain information about the number of occupied states,
a :python:`RuntimeError` is raised and the number of occupied states must
be provided as input here. This is the case in TBmodels for instance.
kwargs :
Additional keyword arguments passed to the constructor of :python:`Model`
and to the WannierBerri's System_w90 when ``engine`` is set to ``wannierberri``.
Returns
-------
Model
A :python:`Model` instance generated from the Wannier90 output files.
"""
# Detect parallel environment
try:
from .backends import MPI
is_master_rank = MPI.COMM_WORLD.Get_rank() == 0
except (AttributeError, ImportError):
is_master_rank = True
engine = engine.lower()
if engine not in ("wannierberri", "tbmodels", "pythtb"):
raise ValueError(f"Invalid engine specified: {engine}.")
# If the .SPN file is needed, use wannierberri by default
if kwargs.get("spn_file", False):
engine = "wannierberri"
spinful = kwargs.pop("spinful", False)
sz = None
# Initialize the model depending on the required engine
if is_master_rank:
if engine == "tbmodels":
model = tbm.Model.from_wannier_files(
hr_file=f"{path}/{seedname}_hr.dat",
wsvec_file=f"{path}/{seedname}_wsvec.dat",
xyz_file=f"{path}/{seedname}_centres.xyz",
win_file=f"{path}/{seedname}.win",
sparse=True,
)
elif engine == "pythtb":
if ptb.__version__ < "2.0.0":
model = ptb.w90(path, seedname).model()
else:
model = ptb.W90(path, seedname).model()
elif engine == "wannierberri":
from ._wberri import read_spn_from_seedname, get_model_wberri
if spinful:
model, spinmats = read_spn_from_seedname(
f"{path}/{seedname}", spin=spinful, return_model=True, **kwargs
)
sz = spinmats[0, :, :, 2]
else:
model = get_model_wberri(f"{path}/{seedname}", spin=spinful, **kwargs)
else:
raise RuntimeError("Invalid engine to build the Wannier90 model.")
else:
model = None
# If the .SPN is needed, do not recompute it in the class constructor
model_kwargs = {
"Lx": kwargs.get("Lx", 1),
"Ly": kwargs.get("Ly", 1),
"debug_info": kwargs.get("debug_info", False),
}
class_model = cls(model, file_spn=False, n_occ=n_occ, **model_kwargs)
# Only True if .SPN is provided, the model is spinful and is read from wannierberri
if sz is not None and is_master_rank:
class_model.sz = sp.csr_array(sz)
class_model.file_spn = kwargs.get("file_spn", False)
# Share the model instance
class_model.model = class_model.backend.comm.bcast(class_model.model, root=0)
return class_model
#################################################
# Spawn subclasses
#################################################
[docs]
def make_supercell(self, Lx: int, Ly: int, n_occ: int = None) -> Supercell:
r"""Create a supercell given of the current model by repeating the unit
cell, allowing to compute single-point invariants and local geometrical
and topological markers.
Parameters
----------
Lx : int
Number of unit cells repeated along the :math:`\mathbf{a}_1` direction in
the supercell.
Ly : int
Number of unit cells repeated along the :math:`\mathbf{a}_2` direction in
the supercell.
n_occ : int
Number of occupied states. If :python:`None`, this is rescaled from the
currently stored value depending on the size of the supercell, otherwise
it is overridden by the provided value.
Returns
-------
Supercell
A :python:`Supercell` instance generated from the current model.
"""
from .supercell import Supercell
if n_occ is None:
n_occ = self.n_occ * Lx * Ly
if self.file_spn and self.spinful and isinstance(self.model, wberri.System_R):
raise NotImplementedError(
"The extension of the .SPN file to a supercell is "
"not implemented yet, so the supercell cannot be generated."
)
return Supercell(model=self, Lx=Lx, Ly=Ly, n_occ=n_occ)
[docs]
def make_finite(self, Lx: int, Ly: int, n_occ: int = None) -> FiniteModel:
r"""Create a finite system given of the current model by repeating the
unit cell, allowing to compute single-point invariants and local
geometrical and topological markers.
Parameters
----------
Lx : int
Number of unit cells repeated along the :math:`\mathbf{a}_1` direction in
the supercell.
Ly : int
Number of unit cells repeated along the :math:`\mathbf{a}_2` direction in
the supercell.
n_occ : int
Number of occupied states. If :python:`None`, this is rescaled from the
currently stored value depending on the size of the supercell, otherwise
it is overridden by the provided value.
Returns
-------
FiniteModel
A :python:`FiniteModel` instance generated from the current model.
"""
from .finite_model import FiniteModel
if n_occ is None:
n_occ = self.n_occ * Lx * Ly
if self.file_spn and self.spinful and isinstance(self.model, wberri.System_R):
raise NotImplementedError(
"The extension of the .SPN file to a finite model is "
"not implemented yet, so the finite model cannot be generated."
)
return FiniteModel(model=self, Lx=Lx, Ly=Ly, n_occ=n_occ)
#################################################
# Make disordered systems and heterostructures
#################################################
[docs]
def add_disorder_bernoulli(
self, w: float = 0, on_site: bool = True, seed: int = None
) -> np.ndarray:
r"""Add on-site Bernoulli disorder to the model. Bernoulli disorder consists in a
diagonal term in the Hamiltonian :math:`\mathcal H_{Bernoulli}=\sum_i w_i c_i^{\dagger}c_i`
where :math:`w_i` are random variables with random sign of the strength `\frac{W}{2}`.
Parameters
----------
w :
Disorder amplitude for the random variables with random signs. The random
number can only take two values, `\pm \frac{W}{2}`. Default is ``0``.
seed :
Seed to be used in the random number generation. Default is :python:`None`,
which result in the default Numpy seed.
on_site :
Whether to apply same random disorder on orbitals located on the same
physical site. Default is :python:`True`.
Returns
-------
np.ndarray
Array containing the random disorder values added to the Hamiltonian.
.. warning::
The current instance of the model will be modified.
"""
from .utils import _create_orb_pattern
if w != 0:
self.disordered = True
# The Hamiltonian is stored in the master rank only
if not self.backend.is_master_rank:
return
# Set the seed for the random number generator
if seed is not None:
np.random.seed(seed)
d = []
if self.backend.is_master_rank:
if on_site:
pattern = _create_orb_pattern(self.cart_positions[: self.states_uc])
num_cycles = (self.n_orb + len(pattern) - 1) // len(pattern)
rands_ = []
for _ in range(num_cycles):
# Generate new random numbers for this cycle, one per unique pattern value
rand_map = {p: np.random.randint(0, 2) for p in set(pattern)}
for i in range(len(pattern)):
if len(rands_) < self.n_orb:
rands_.append(rand_map[pattern[i]])
d = 0.5 * w * (2 * np.array(rands_) - 1.0)
else:
d = 0.5 * w * (2 * np.random.randint(0, 2, size=(self.n_orb)) - 1)
# The Hamiltonian is sparse to save disk space
self.hamiltonian += sp.diags_array(d, format="csr")
return d
[docs]
def add_vacancies(self, vacancies_list: tuple | list) -> None:
r"""Add vacancies in the system by removing a site in the lattice.
Parameters
----------
vacancies_list:
A list of [cell_x, cell_y, basis] pointing to the cell and atom to be removed.
Multiple values at once are allowed. For instance, consider a
:math:`L_x\times L_y` lattice with a basis of :math:`N_b` atoms, and :math:`M`
sites to be removed. Then, the list :python:`[[np.random.randint(L_x),
np.random.randint(L_y), np.random.randint(N_b)] for _ in range(M)]` can be
given as argument. See also :func:`.utils.unique_vacancies` for a function
that returns a list of unique vacancies.
.. note::
If the model is not spinful, an even number of vacancies may be required for
the system to be gapped.
.. warning::
The current instance of the model will be modified.
"""
# The Hamiltonian and all the quantities are stored in the master rank only
if self.backend.is_master_rank:
# Convert vacancies_list to internal indexing
if np.array(vacancies_list).shape == (3,):
vacancies = self._xy_to_index(
vacancies_list[0], vacancies_list[1], vacancies_list[2]
)
else:
vacancies = []
for hl in vacancies_list:
vacancies.append(self._xy_to_index(hl[0], hl[1], hl[2]))
# Remove the selected sites if there are any vacancies
if np.asarray(vacancies).size > 0:
self._mask[vacancies] = False
if not self.spinful and len(vacancies) % 2 != 0:
warn(
"Adding an odd number of vacancies causes the model to no longer be half-filled."
)
elif self.spinful:
# Count also the opposite spin
self._mask[np.asarray(vacancies) + 1] = False
self.hamiltonian = self.hamiltonian[:, self._mask][self._mask, :]
self.r = np.array([r[:, self._mask][self._mask, :] for r in self.r])
self._cart_positions_original = self.cart_positions.copy()
self.cart_positions = self.cart_positions[self._mask, :]
self.n_occ -= int(len(vacancies) * (0.5 if not self.spinful else 1))
self.n_orb -= int(len(vacancies) * (1 if not self.spinful else 2))
self.has_vacancies = True
if self.spinful:
self.sz = self.sz[:, self._mask][self._mask, :]
# Broadcast to all the ranks to update
for attr in ("n_occ", "n_orb", "has_vacancies"):
setattr(self, attr, self.backend.comm.bcast(getattr(self, attr), root=0))
# Create a shared array per node
for attr in ("cart_positions", "_cart_positions_original", "_mask"):
setattr(self, attr, self.backend.shared_array(getattr(self, attr)))
[docs]
def make_heterostructure(
self, model2: Model, direction: int = 0, start: int = 0, stop: int = 0
) -> None:
r"""Create a heterostructure starting from another model. The system will be
split in the direction :python:`direction` starting from the unit cell whose index
is :python:`start` and stopping at the cell indexed as :python:`end`. The two
models must have the same lattice parameters and lattice sites.
Parameters
----------
model2 :
An instance of :python:`strawberrypy.Model` representing the mdoel whose
Hamiltonian has to be used when constructing the heterostructure.
direction :
Direction in which the splitting is realized, allowed values are ``0`` for
the :math:`\mathbf{a}_1` direction or ``1`` for the :math:`\mathbf{a}_2`
direction. Default is ``0``.
start :
Starting point for the splitting in the :python:`direction` direction.
Default is ``0``.
end :
End point of the splitting in the :python:`direction` direction. Default
is ``0``.
.. warning::
The current instance of the model will be modified.
"""
# The Hamiltonian is stored in the master rank only
if not self.backend.is_master_rank:
return
# Ensure start <= stop
if start > stop:
tmp = start
start = stop
stop = tmp
# Check input data are ok
if not (start >= 0 and start < (self.Lx if direction == 0 else self.Ly)):
raise RuntimeError("Start point value not allowed.")
if not (stop > 0 and stop < (self.Lx if direction == 0 else self.Ly)):
raise RuntimeError("End point value not allowed.")
if direction not in [0, 1]:
raise RuntimeError("Direction not allowed: insert 0 for 'x' and 1 for 'y'.")
if not isinstance(model2, Model):
raise RuntimeError(
"The two models must be instances of Model, Supercell or FiniteModel."
)
if not (
self.Lx == model2.Lx
and self.Ly == model2.Ly
and np.allclose(
[r.toarray() for r in self.r], [r.toarray() for r in model2.r]
)
):
raise RuntimeError("You can only build heterostructures of the same model.")
# Generate the Hamiltonian of the second model
hamilt_model2 = model2.hamiltonian.copy()
# Check if only the on-site terms are changed (if so, avoid checking all the matrix elements)
onsite_only = False
if np.allclose(
(self.hamiltonian - sp.diags_array(self.hamiltonian.diagonal())).toarray(),
(hamilt_model2 - sp.diags_array(hamilt_model2.diagonal())).toarray(),
):
onsite_only = True
# Remove on-site terms from the model
onsite1 = self.hamiltonian.diagonal().copy()
onsite2 = hamilt_model2.diagonal().copy()
self.hamiltonian.setdiag(0)
if direction == 0:
# Splitting along the x direction
ind = np.array(
[
[
(start + i) * self.Ly * self.states_uc + j * self.states_uc
for j in range(self.Ly)
]
for i in range(stop - start + 1)
]
).flatten()
else:
# Splitting along the y direction
ind = np.array(
[
[
i * self.Ly * self.states_uc
+ start * self.states_uc
+ j * self.states_uc
for j in range(stop - start + 1)
]
for i in range(self.Lx)
]
).flatten()
for i in ind:
for j in range(self.states_uc):
onsite1[i + j] = onsite2[i + j]
# Add the new on-site terms
self.hamiltonian.setdiag(onsite1)
# If other matrix element are changed
if not onsite_only:
# Indices of every atom in the selected cells, not only of the initial atom of the cell
indices = np.array(
[[i + j for j in range(self.states_uc) if self._mask[i + j]] for i in ind]
).flatten()
# The full list of on sites, to avoid reset later
onsites_all = self.hamiltonian.diagonal()
self.hamiltonian = self.hamiltonian.tolil()
hamilt_model2_lil = hamilt_model2.tolil()
# Replace all selected rows
self.hamiltonian[indices, :] = hamilt_model2_lil[indices, :]
# Restore the correct diagonal
self.hamiltonian.setdiag(onsites_all)
# Filter out small amplitudes and clean up zeros
self.hamiltonian = sp.csr_array(self.hamiltonian) # Switch back to CSR
self.hamiltonian.data[np.abs(self.hamiltonian.data) < 1e-10] = 0
self.hamiltonian.eliminate_zeros()
#################################################
# Utility functions
#################################################
def _xy_to_index(self, cellx: int, celly: int, basis: int):
r"""Convert :python:`[cell_x, cell_y, basis]` to the internal indexing
of the lattice sites.
Parameters
----------
cellx :
Index of the unit cell along the :math:`\mathbf{a}_1` direction.
celly :
Index of the unit cell along the :math:`\mathbf{a}_2` direction.
basis :
Index of the atom in the basis of the lattice. This index is defined when
creating the model.
Returns
-------
index :
The index of an atom in the lattice according to the internal indexing.
.. note::
The conventional internal indexing is the one also used in PythTB and TBmodels
when dealing with supercells: the first index is the bottom-left corner of the
lattice and the last one is the top-right. The indices increase first along the
:math:`\mathbf{a}_2` direction, then along :math:`\mathbf{a}_1`.
"""
return (
self.Ly * self.states_uc * cellx
+ self.states_uc * celly
+ basis * (2 if self.spinful else 1)
)
def _xy_to_line(self, fixed_coordinate: str, xy: int):
r"""Returns the indices of the sites of the lattice along the fixed
direction :python:`fixed_coordinate` starting from :python:`xy`.
Parallelized using MPI by distributing iterations across ranks and
gathering results.
Parameters
----------
fixed_coordinate :
Coordinate that has to be kept fixed. If one is interested in the indices
along the :math:`\mathbf{a}_1` direction, this has to be set :python:`'y'`
else :python:`'x'`.
xy :
Index of the unit cell along the direction :python:`fixed_coordiante` from
which the indices are returned.
Returns
-------
index_list :
The list of indices of the atoms in the lattice keeping fixed
:python:`fixed_direction` from the cell :python:`xy`.
"""
if fixed_coordinate == "x":
# Parallelize by distributing loop iterations across ranks
n_iter = self.states_uc * self.Ly
start = self.backend.mpi_rank * (n_iter // self.backend.mpi_size) + min(
self.backend.mpi_rank, n_iter % self.backend.mpi_size
)
end = (
start
+ (n_iter // self.backend.mpi_size)
+ (1 if self.backend.mpi_rank < (n_iter % self.backend.mpi_size) else 0)
)
local_indices = []
for i in range(start, end):
idx = self.states_uc * xy * self.Ly + i
if self._mask[idx]:
local_indices.append(idx)
# Gather all partial results to rank 0
gathered = self.backend.comm.gather(local_indices, root=0)
if self.backend.mpi_rank == 0:
# Flatten and sort by global index to maintain order
all_indices = []
for idx_list in gathered:
all_indices.extend(idx_list)
all_indices.sort()
result = np.array(all_indices)
else:
result = None
# Broadcast result to all ranks
result = self.backend.comm.bcast(result, root=0)
return result
elif fixed_coordinate == "y":
# Parallelize by distributing Lx iterations across ranks
start = self.backend.mpi_rank * (self.Lx // self.backend.mpi_size) + min(
self.backend.mpi_rank, self.Lx % self.backend.mpi_size
)
end = (
start
+ (self.Lx // self.backend.mpi_size)
+ (1 if self.backend.mpi_rank < (self.Lx % self.backend.mpi_size) else 0)
)
local_indices = []
for i in range(start, end):
for j in range(self.states_uc):
idx = self.states_uc * xy + self.states_uc * self.Ly * i + j
if self._mask[idx]:
local_indices.append(idx)
# Gather all partial results to rank 0
gathered = self.backend.comm.gather(local_indices, root=0)
if self.backend.mpi_rank == 0:
# Flatten and sort by global index to maintain order
all_indices = []
for idx_list in gathered:
all_indices.extend(idx_list)
all_indices.sort()
result = np.array(all_indices)
else:
result = None
# Broadcast result to all ranks
result = self.backend.comm.bcast(result, root=0)
return result
else:
raise RuntimeError("Direction not allowed, only 'x' or 'y'.")
#################################################
# Extract topological markers on lines or grids
#################################################
def _extract_line_marker(
self,
marker: np.ndarray,
direction: int,
start: int,
macroscopic_average: bool = False,
) -> np.ndarray:
r"""Extract the value of a topological marker along a line of the lattice.
Parameters
----------
marker :
The topological marker to be extracted, as a list of values per lattice site.
direction :
Direction along which the line is defined, allowed values are ``0`` for the
:math:`\mathbf{a}_1` direction or ``1`` for the :math:`\mathbf{a}_2` direction.
Default is ``0``.
start :
Index of the unit cell along the direction :python:`direction` from which
the line starts. Default is ``0``.
macroscopic_average :
Whether the values of the topological marker along the line are averaged
according to a certain cutoff in real space. Default is :python:`False`,
which means that the value of the marker on each lattice site along the
line is returned without averaging.
Returns
-------
line_marker :
The list of values of the topological marker along the line defined by the
parameters.
"""
# Evaluate index of the selected direction
indices = self._xy_to_line("x" if direction == 1 else "y", start)
if macroscopic_average:
n_line = len(indices)
else:
n_line = int(len(indices) / self.states_uc)
start_blk = self.backend.mpi_rank * (n_line // self.backend.mpi_size) + min(
self.backend.mpi_rank, n_line % self.backend.mpi_size
)
end = (
start_blk
+ (n_line // self.backend.mpi_size)
+ (1 if self.backend.mpi_rank < (n_line % self.backend.mpi_size) else 0)
)
local_marker_line = np.zeros(n_line, dtype=np.float64)
# If macroscopic_average the topological operator does not need an addditional average
if macroscopic_average:
for i in range(start_blk, end):
local_marker_line[i] = marker[indices[i]]
else:
for i in range(start_blk, end):
local_marker_line[i] = np.sum(
[
marker[indices[self.states_uc * i + j]]
for j in range(self.states_uc)
]
)
if USE_MPI:
marker_line = np.zeros(n_line, dtype=np.float64)
self.backend.comm.Allreduce(local_marker_line, marker_line)
else:
marker_line = local_marker_line
return np.array(marker_line)
def _trace_unit_cell(self, marker: np.ndarray) -> np.ndarray:
r"""Compute the contraction over the unit cell of the topological marker.
Parameters
----------
marker :
The topological marker to be traced, as a list of values per lattice site.
Returns
-------
lattice_marker :
The list of values of the topological marker on the unit cells of the lattice.
"""
n_cells = int(len(marker) / self.states_uc)
# Parallel reduction of the unit-cell marker over MPI ranks
start_blk = self.backend.mpi_rank * (n_cells // self.backend.mpi_size) + min(
self.backend.mpi_rank, n_cells % self.backend.mpi_size
)
end = (
start_blk
+ (n_cells // self.backend.mpi_size)
+ (1 if self.backend.mpi_rank < (n_cells % self.backend.mpi_size) else 0)
)
marker_cell = np.zeros(n_cells, dtype=np.float64)
for i in range(start_blk, end):
base = self.states_uc * i
marker_cell[i] = np.sum(marker[base : base + self.states_uc])
lattice_marker = np.zeros(n_cells, dtype=np.float64)
if USE_MPI:
self.backend.comm.Allreduce(marker_cell, lattice_marker)
else:
lattice_marker = marker_cell
# Repeat to ensure that the dimension of the marker matches the dimension of
# the position matrices, since if not macroscopic_average the value of the
# marker is defined per unit cell
return np.repeat(lattice_marker, self.states_uc)