import contextlib import importlib import torch import intel_extension_for_pytorch as ipex # pylint: disable=import-error, unused-import # pylint: disable=protected-access, missing-function-docstring, line-too-long, unnecessary-lambda, no-else-return class CondFunc: # pylint: disable=missing-class-docstring def __new__(cls, orig_func, sub_func, cond_func): self = super(CondFunc, cls).__new__(cls) if isinstance(orig_func, str): func_path = orig_func.split(".") for i in range(len(func_path) - 1, -1, -1): try: resolved_obj = importlib.import_module(".".join(func_path[:i])) break except ImportError: pass for attr_name in func_path[i:-1]: resolved_obj = getattr(resolved_obj, attr_name) orig_func = getattr(resolved_obj, func_path[-1]) setattr( resolved_obj, func_path[-1], lambda *args, **kwargs: self(*args, **kwargs), ) self.__init__(orig_func, sub_func, cond_func) return lambda *args, **kwargs: self(*args, **kwargs) def __init__(self, orig_func, sub_func, cond_func): self.__orig_func = orig_func self.__sub_func = sub_func self.__cond_func = cond_func def __call__(self, *args, **kwargs): if not self.__cond_func or self.__cond_func(self.__orig_func, *args, **kwargs): return self.__sub_func(self.__orig_func, *args, **kwargs) else: return self.__orig_func(*args, **kwargs) _utils = torch.utils.data._utils def _shutdown_workers(self): if ( torch.utils.data._utils is None or torch.utils.data._utils.python_exit_status is True or torch.utils.data._utils.python_exit_status is None ): return if hasattr(self, "_shutdown") and not self._shutdown: self._shutdown = True try: if hasattr(self, "_pin_memory_thread"): self._pin_memory_thread_done_event.set() self._worker_result_queue.put((None, None)) self._pin_memory_thread.join() self._worker_result_queue.cancel_join_thread() self._worker_result_queue.close() self._workers_done_event.set() for worker_id in range(len(self._workers)): if self._persistent_workers or self._workers_status[worker_id]: self._mark_worker_as_unavailable(worker_id, shutdown=True) for w in self._workers: # pylint: disable=invalid-name w.join(timeout=torch.utils.data._utils.MP_STATUS_CHECK_INTERVAL) for q in self._index_queues: # pylint: disable=invalid-name q.cancel_join_thread() q.close() finally: if self._worker_pids_set: torch.utils.data._utils.signal_handling._remove_worker_pids(id(self)) self._worker_pids_set = False for w in self._workers: # pylint: disable=invalid-name if w.is_alive(): w.terminate() class DummyDataParallel( torch.nn.Module ): # pylint: disable=missing-class-docstring, unused-argument, too-few-public-methods def __new__( cls, module, device_ids=None, output_device=None, dim=0 ): # pylint: disable=unused-argument if isinstance(device_ids, list) and len(device_ids) > 1: print("IPEX backend doesn't support DataParallel on multiple XPU devices") return module.to("xpu") def return_null_context(*args, **kwargs): # pylint: disable=unused-argument return contextlib.nullcontext() def check_device(device): return bool( (isinstance(device, torch.device) and device.type == "cuda") or (isinstance(device, str) and "cuda" in device) or isinstance(device, int) ) def return_xpu(device): return ( f"xpu:{device[-1]}" if isinstance(device, str) and ":" in device else ( f"xpu:{device}" if isinstance(device, int) else torch.device("xpu") if isinstance(device, torch.device) else "xpu" ) ) def ipex_no_cuda(orig_func, *args, **kwargs): torch.cuda.is_available = lambda: False orig_func(*args, **kwargs) torch.cuda.is_available = torch.xpu.is_available original_autocast = torch.autocast def ipex_autocast(*args, **kwargs): if len(args) > 0 and args[0] == "cuda": return original_autocast("xpu", *args[1:], **kwargs) else: return original_autocast(*args, **kwargs) original_torch_cat = torch.cat def torch_cat(tensor, *args, **kwargs): if len(tensor) == 3 and ( tensor[0].dtype != tensor[1].dtype or tensor[2].dtype != tensor[1].dtype ): return original_torch_cat( [tensor[0].to(tensor[1].dtype), tensor[1], tensor[2].to(tensor[1].dtype)], *args, **kwargs, ) else: return original_torch_cat(tensor, *args, **kwargs) original_interpolate = torch.nn.functional.interpolate def interpolate( tensor, size=None, scale_factor=None, mode="nearest", align_corners=None, recompute_scale_factor=None, antialias=False, ): # pylint: disable=too-many-arguments if antialias or align_corners is not None: return_device = tensor.device return_dtype = tensor.dtype return original_interpolate( tensor.to("cpu", dtype=torch.float32), size=size, scale_factor=scale_factor, mode=mode, align_corners=align_corners, recompute_scale_factor=recompute_scale_factor, antialias=antialias, ).to(return_device, dtype=return_dtype) else: return original_interpolate( tensor, size=size, scale_factor=scale_factor, mode=mode, align_corners=align_corners, recompute_scale_factor=recompute_scale_factor, antialias=antialias, ) original_linalg_solve = torch.linalg.solve def linalg_solve(A, B, *args, **kwargs): # pylint: disable=invalid-name if A.device != torch.device("cpu") or B.device != torch.device("cpu"): return_device = A.device return original_linalg_solve(A.to("cpu"), B.to("cpu"), *args, **kwargs).to( return_device ) else: return original_linalg_solve(A, B, *args, **kwargs) def ipex_hijacks(): CondFunc( "torch.Tensor.to", lambda orig_func, self, device=None, *args, **kwargs: orig_func( self, return_xpu(device), *args, **kwargs ), lambda orig_func, self, device=None, *args, **kwargs: check_device(device), ) CondFunc( "torch.Tensor.cuda", lambda orig_func, self, device=None, *args, **kwargs: orig_func( self, return_xpu(device), *args, **kwargs ), lambda orig_func, self, device=None, *args, **kwargs: check_device(device), ) CondFunc( "torch.empty", lambda orig_func, *args, device=None, **kwargs: orig_func( *args, device=return_xpu(device), **kwargs ), lambda orig_func, *args, device=None, **kwargs: check_device(device), ) CondFunc( "torch.load", lambda orig_func, *args, map_location=None, **kwargs: orig_func( *args, return_xpu(map_location), **kwargs ), lambda orig_func, *args, map_location=None, **kwargs: map_location is None or check_device(map_location), ) CondFunc( "torch.randn", lambda orig_func, *args, device=None, **kwargs: orig_func( *args, device=return_xpu(device), **kwargs ), lambda orig_func, *args, device=None, **kwargs: check_device(device), ) CondFunc( "torch.ones", lambda orig_func, *args, device=None, **kwargs: orig_func( *args, device=return_xpu(device), **kwargs ), lambda orig_func, *args, device=None, **kwargs: check_device(device), ) CondFunc( "torch.zeros", lambda orig_func, *args, device=None, **kwargs: orig_func( *args, device=return_xpu(device), **kwargs ), lambda orig_func, *args, device=None, **kwargs: check_device(device), ) CondFunc( "torch.tensor", lambda orig_func, *args, device=None, **kwargs: orig_func( *args, device=return_xpu(device), **kwargs ), lambda orig_func, *args, device=None, **kwargs: check_device(device), ) CondFunc( "torch.linspace", lambda orig_func, *args, device=None, **kwargs: orig_func( *args, device=return_xpu(device), **kwargs ), lambda orig_func, *args, device=None, **kwargs: check_device(device), ) CondFunc( "torch.Generator", lambda orig_func, device=None: torch.xpu.Generator(device), lambda orig_func, device=None: device is not None and device != torch.device("cpu") and device != "cpu", ) CondFunc( "torch.batch_norm", lambda orig_func, input, weight, bias, *args, **kwargs: orig_func( input, ( weight if weight is not None else torch.ones(input.size()[1], device=input.device) ), ( bias if bias is not None else torch.zeros(input.size()[1], device=input.device) ), *args, **kwargs, ), lambda orig_func, input, *args, **kwargs: input.device != torch.device("cpu"), ) CondFunc( "torch.instance_norm", lambda orig_func, input, weight, bias, *args, **kwargs: orig_func( input, ( weight if weight is not None else torch.ones(input.size()[1], device=input.device) ), ( bias if bias is not None else torch.zeros(input.size()[1], device=input.device) ), *args, **kwargs, ), lambda orig_func, input, *args, **kwargs: input.device != torch.device("cpu"), ) # Functions with dtype errors: CondFunc( "torch.nn.modules.GroupNorm.forward", lambda orig_func, self, input: orig_func( self, input.to(self.weight.data.dtype) ), lambda orig_func, self, input: input.dtype != self.weight.data.dtype, ) CondFunc( "torch.nn.modules.linear.Linear.forward", lambda orig_func, self, input: orig_func( self, input.to(self.weight.data.dtype) ), lambda orig_func, self, input: input.dtype != self.weight.data.dtype, ) CondFunc( "torch.nn.modules.conv.Conv2d.forward", lambda orig_func, self, input: orig_func( self, input.to(self.weight.data.dtype) ), lambda orig_func, self, input: input.dtype != self.weight.data.dtype, ) CondFunc( "torch.nn.functional.layer_norm", lambda orig_func, input, normalized_shape=None, weight=None, *args, **kwargs: orig_func( input.to(weight.data.dtype), normalized_shape, weight, *args, **kwargs ), lambda orig_func, input, normalized_shape=None, weight=None, *args, **kwargs: weight is not None and input.dtype != weight.data.dtype, ) # Diffusers Float64 (ARC GPUs doesn't support double or Float64): if not torch.xpu.has_fp64_dtype(): CondFunc( "torch.from_numpy", lambda orig_func, ndarray: orig_func(ndarray.astype("float32")), lambda orig_func, ndarray: ndarray.dtype == float, ) # Broken functions when torch.cuda.is_available is True: CondFunc( "torch.utils.data.dataloader._BaseDataLoaderIter.__init__", lambda orig_func, *args, **kwargs: ipex_no_cuda(orig_func, *args, **kwargs), lambda orig_func, *args, **kwargs: True, ) # Functions that make compile mad with CondFunc: torch.utils.data.dataloader._MultiProcessingDataLoaderIter._shutdown_workers = ( _shutdown_workers ) torch.nn.DataParallel = DummyDataParallel torch.autocast = ipex_autocast torch.cat = torch_cat torch.linalg.solve = linalg_solve torch.nn.functional.interpolate = interpolate torch.backends.cuda.sdp_kernel = return_null_context