from collections import defaultdict
import torch
import intel_extension_for_pytorch as ipex  # pylint: disable=import-error, unused-import
import intel_extension_for_pytorch._C as core  # pylint: disable=import-error, unused-import

# pylint: disable=protected-access, missing-function-docstring, line-too-long

OptState = ipex.cpu.autocast._grad_scaler.OptState
_MultiDeviceReplicator = ipex.cpu.autocast._grad_scaler._MultiDeviceReplicator
_refresh_per_optimizer_state = (
    ipex.cpu.autocast._grad_scaler._refresh_per_optimizer_state
)


def _unscale_grads_(
    self, optimizer, inv_scale, found_inf, allow_fp16
):  # pylint: disable=unused-argument
    per_device_inv_scale = _MultiDeviceReplicator(inv_scale)
    per_device_found_inf = _MultiDeviceReplicator(found_inf)

    # To set up _amp_foreach_non_finite_check_and_unscale_, split grads by device and dtype.
    # There could be hundreds of grads, so we'd like to iterate through them just once.
    # However, we don't know their devices or dtypes in advance.

    # https://stackoverflow.com/questions/5029934/defaultdict-of-defaultdict
    # Google says mypy struggles with defaultdicts type annotations.
    per_device_and_dtype_grads = defaultdict(lambda: defaultdict(list))  # type: ignore[var-annotated]
    # sync grad to master weight
    if hasattr(optimizer, "sync_grad"):
        optimizer.sync_grad()
    with torch.no_grad():
        for group in optimizer.param_groups:
            for param in group["params"]:
                if param.grad is None:
                    continue
                if (not allow_fp16) and param.grad.dtype == torch.float16:
                    raise ValueError("Attempting to unscale FP16 gradients.")
                if param.grad.is_sparse:
                    # is_coalesced() == False means the sparse grad has values with duplicate indices.
                    # coalesce() deduplicates indices and adds all values that have the same index.
                    # For scaled fp16 values, there's a good chance coalescing will cause overflow,
                    # so we should check the coalesced _values().
                    if param.grad.dtype is torch.float16:
                        param.grad = param.grad.coalesce()
                    to_unscale = param.grad._values()
                else:
                    to_unscale = param.grad

                # -: is there a way to split by device and dtype without appending in the inner loop?
                to_unscale = to_unscale.to("cpu")
                per_device_and_dtype_grads[to_unscale.device][to_unscale.dtype].append(
                    to_unscale
                )

        for _, per_dtype_grads in per_device_and_dtype_grads.items():
            for grads in per_dtype_grads.values():
                core._amp_foreach_non_finite_check_and_unscale_(
                    grads,
                    per_device_found_inf.get("cpu"),
                    per_device_inv_scale.get("cpu"),
                )

    return per_device_found_inf._per_device_tensors


def unscale_(self, optimizer):
    """
    Divides ("unscales") the optimizer's gradient tensors by the scale factor.
    :meth:`unscale_` is optional, serving cases where you need to
    :ref:`modify or inspect gradients<working-with-unscaled-gradients>`
    between the backward pass(es) and :meth:`step`.
    If :meth:`unscale_` is not called explicitly,  gradients will be unscaled  automatically during :meth:`step`.
    Simple example, using :meth:`unscale_` to enable clipping of unscaled gradients::
        ...
        scaler.scale(loss).backward()
        scaler.unscale_(optimizer)
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm)
        scaler.step(optimizer)
        scaler.update()
    Args:
        optimizer (torch.optim.Optimizer):  Optimizer that owns the gradients to be unscaled.
    .. warning::
        :meth:`unscale_` should only be called once per optimizer per :meth:`step` call,
        and only after all gradients for that optimizer's assigned parameters have been accumulated.
        Calling :meth:`unscale_` twice for a given optimizer between each :meth:`step` triggers a RuntimeError.
    .. warning::
        :meth:`unscale_` may unscale sparse gradients out of place, replacing the ``.grad`` attribute.
    """
    if not self._enabled:
        return

    self._check_scale_growth_tracker("unscale_")

    optimizer_state = self._per_optimizer_states[id(optimizer)]

    if optimizer_state["stage"] is OptState.UNSCALED:  # pylint: disable=no-else-raise
        raise RuntimeError(
            "unscale_() has already been called on this optimizer since the last update()."
        )
    elif optimizer_state["stage"] is OptState.STEPPED:
        raise RuntimeError("unscale_() is being called after step().")

    # FP32 division can be imprecise for certain compile options, so we carry out the reciprocal in FP64.
    assert self._scale is not None
    inv_scale = (
        self._scale.to("cpu").double().reciprocal().float().to(self._scale.device)
    )
    found_inf = torch.full((1,), 0.0, dtype=torch.float32, device=self._scale.device)

    optimizer_state["found_inf_per_device"] = self._unscale_grads_(
        optimizer, inv_scale, found_inf, False
    )
    optimizer_state["stage"] = OptState.UNSCALED


def update(self, new_scale=None):
    """
    Updates the scale factor.
    If any optimizer steps were skipped the scale is multiplied by ``backoff_factor``
    to reduce it. If ``growth_interval`` unskipped iterations occurred consecutively,
    the scale is multiplied by ``growth_factor`` to increase it.
    Passing ``new_scale`` sets the new scale value manually. (``new_scale`` is not
    used directly, it's used to fill GradScaler's internal scale tensor. So if
    ``new_scale`` was a tensor, later in-place changes to that tensor will not further
    affect the scale GradScaler uses internally.)
    Args:
        new_scale (float or :class:`torch.FloatTensor`, optional, default=None):  New scale factor.
    .. warning::
        :meth:`update` should only be called at the end of the iteration, after ``scaler.step(optimizer)`` has
        been invoked for all optimizers used this iteration.
    """
    if not self._enabled:
        return

    _scale, _growth_tracker = self._check_scale_growth_tracker("update")

    if new_scale is not None:
        # Accept a new user-defined scale.
        if isinstance(new_scale, float):
            self._scale.fill_(new_scale)  # type: ignore[union-attr]
        else:
            reason = "new_scale should be a float or a 1-element torch.FloatTensor with requires_grad=False."
            assert isinstance(new_scale, torch.FloatTensor), reason  # type: ignore[attr-defined]
            assert new_scale.numel() == 1, reason
            assert new_scale.requires_grad is False, reason
            self._scale.copy_(new_scale)  # type: ignore[union-attr]
    else:
        # Consume shared inf/nan data collected from optimizers to update the scale.
        # If all found_inf tensors are on the same device as self._scale, this operation is asynchronous.
        found_infs = [
            found_inf.to(device="cpu", non_blocking=True)
            for state in self._per_optimizer_states.values()
            for found_inf in state["found_inf_per_device"].values()
        ]

        assert len(found_infs) > 0, "No inf checks were recorded prior to update."

        found_inf_combined = found_infs[0]
        if len(found_infs) > 1:
            for i in range(1, len(found_infs)):
                found_inf_combined += found_infs[i]

        to_device = _scale.device
        _scale = _scale.to("cpu")
        _growth_tracker = _growth_tracker.to("cpu")

        core._amp_update_scale_(
            _scale,
            _growth_tracker,
            found_inf_combined,
            self._growth_factor,
            self._backoff_factor,
            self._growth_interval,
        )

        _scale = _scale.to(to_device)
        _growth_tracker = _growth_tracker.to(to_device)
    # To prepare for next iteration, clear the data collected from optimizers this iteration.
    self._per_optimizer_states = defaultdict(_refresh_per_optimizer_state)


def gradscaler_init():
    torch.xpu.amp.GradScaler = ipex.cpu.autocast._grad_scaler.GradScaler
    torch.xpu.amp.GradScaler._unscale_grads_ = _unscale_grads_
    torch.xpu.amp.GradScaler.unscale_ = unscale_
    torch.xpu.amp.GradScaler.update = update
    return torch.xpu.amp.GradScaler