
    qiH                        d dl Z d dlZd dlZd dlmZ d dlmZ d dlmZ d dl	m
Z
mZmZmZ d dlmZ d dlZd dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dlmZ d dl m!Z! d dl"m#Z#m$Z$ d dl%m&Z&m'Z'm(Z( d dl)m*Z* d dl+m,Z,m-Z- d dl.m/Z/ ddl0m1Z1m2Z2m3Z3 erd dl4m5Z5 g dZ6 G d de      Z7 ede8      	 	 	 	 d2dede,deejr                     de:d e;d!ee$   d"e!fd#       Z< ed$%      e1dddddd$d&ded'ee=ej|                  df   dee,   d!ee$   deejr                     d e;d(e;d"e!fd)              Z?e G d* d+             Z@ ed$%      dddde7j                  ddd$d,ded'ee=ej|                  df   dee,   d!ee$   deejr                     d-e7d.ee&   d e;d(e;d"eee@f   fd/       ZB ed$%      ded"efd0       ZC	 	 	 	 	 d3dede,deejr                     de:d e;d!ee$   d(e;d"e!fd1ZDy)4    N)Future)	dataclass)Enum)castOptionalTYPE_CHECKINGUnion)
deprecated)STATE_DICT_TYPE)$_ProcessBasedAsyncCheckpointExecutor)#_ThreadBasedAsyncCheckpointExecutor)_storage_setup)DefaultSavePlanner)_dcp_method_logger)Metadata)SavePlanSavePlanner)AsyncStagerDefaultStagerStagingOptions)Stateful)StorageWriterWriteResult)_get_default_group   )_api_bc_check_DistWrapper_profile)_AsyncCheckpointExecutor)save_state_dictsave
async_saveAsyncCheckpointerTypeAsyncSaveResponsec                       e Zd ZdZdZdZy)r#   z!Enum for async checkpointer type.threadprocessN)__name__
__module____qualname____doc__THREADPROCESS     s/home/ubuntu/crypto_trading_bot/.venv/lib/python3.12/site-packages/torch/distributed/checkpoint/state_dict_saver.pyr#   r#   3   s    +FGr/   r#   za`save_state_dict` is deprecated and will be removed in future versions.Please use `save` instead.)categoryF
state_dictstorage_writerprocess_groupcoordinator_rankno_distplannerreturnc           	          |j                          t               5  t        | |||||      cddd       S # 1 sw Y   yxY w)z3This method is deprecated. Please switch to 'save'.N)resetr   _save_state_dict)r2   r3   r4   r5   r6   r7   s         r0   r    r    :   sF      
 


 
 
s   5>Tlog_exceptionscheckpoint_idr3   r7   r4   r6   use_collectivesr?   r@   c          	      ~   t         j                  j                  d       |xs, t        j                          xs t        j
                          }|rt        j                  dd       t               5  t        t        t        ||d            }t        t        |       |||||      cddd       S # 1 sw Y   yxY w)	a;  
    Save a distributed model in SPMD style.

    This function is different from ``torch.save()`` as it handles
    ``ShardedTensor`` , and ``DTensor`` by having each rank only save their local shards.

    For each ``Stateful`` object (having both a ``state_dict`` and a ``load_state_dict``),
    save will call ``state_dict`` before serialization.

    .. warning::
        There is no guarantees of Backwards Compatibility across PyTorch versions
        for saved state_dicts.

    .. warning::
        If using the `process_group` argument, make sure that only its ranks
        call `save_state_dict` and that all data in state_dict belong to it.

    .. note::
        When saving checkpoint for FSDP's `ShardingStrategy.HYBRID_SHARD`, only one of
        the shard_group should be calling `save_state_dict` and the corresponding process
        group needs to be passed in.

    .. note::
        If no process group is available, this function assumes the intention is to save the
         state_dict in the local process.

    .. note:
        Rank 0 is assumed to be the coordinator rank.


    Args:
        state_dict (Dict[str, Any]): The state_dict to save.
        checkpoint_id (Union[str, os.PathLike, None]):
            The ID of this checkpoint instance. The meaning of the checkpoint_id
            depends on the storage. It can be a path to a folder or to a file.
            It can also be a key if the storage is a key-value store.
            (Default: ``None``)
        storage_writer (Optional[StorageWriter]):
            Instance of StorageWriter used to perform writes. If this is not
            specified, DCP will automatically infer the writer based on the
            checkpoint_id. If checkpoint_id is also None, an exception will
            be raised. (Default: ``None``)
        planner (Optional[SavePlanner]):
            Instance of SavePlanner. If this is not specified, the default
            planner will be used. (Default: ``None``)
        process_group (Optional[ProcessGroup]):
            ProcessGroup to be used for cross-rank synchronization.
            (Default: ``None``)
        no_dist (bool):
            If ``True``, this function will assume the intent is to load
            a checkpoint on a single rank/process.
            (Default: ``False``)
        use_collectives (bool): If ``False``, this function will assume the intent is to save
            a checkpoint without using cross-rank synchronization.
            (Default: ``True``)
            This configuration is experimental and should be used with caution.
            It will change the format of the saved checkpoint and may not be backward compatible.

    Returns:
        Metadata: Metadata object for the saved checkpoint.

    Example:
        >>> # xdoctest: +SKIP
        >>> my_model = MyModule()

        >>> state_dict = {"model": my_model}

        >>> fs_storage_writer = torch.distributed.checkpoint.FileSystemWriter(
        ...     "/checkpoint/1"
        ... )
        >>> torch.distributed.checkpoint.save(
        >>>     state_dict=state_dict,
        >>>     storage_writer=fs_storage_writer,
        >>> )

    .. note::
        save_state_dict uses collectives to coordinate writes across ranks.
        For NCCL-based process groups, internal tensor representations of
        objects must be moved to the GPU device before communication takes place.
        In this case, the device used is given by ``torch.cuda.current_device()``
        and it is the user's responsibility to ensure that this is set so that
        each rank has an individual GPU, via ``torch.cuda.set_device()``.
    z!torch.distributed.checkpoint.savezptorch.distributed is disabled, unavailable or uninitialized, assuming the intent is to save in a single process.   
stacklevelF)reader)r2   r3   r4   r6   r7   r@   N)torch_C_log_api_usage_oncedistis_availableis_initializedwarningswarnr   r   r   r   r;   _stateful_to_state_dict)r2   r?   r3   r7   r4   r6   r@   s          r0   r!   r!   V   s    ~ 
HH  !DEQd//11Q4;N;N;P7PG~	

 
 
>.-PUV
  .z:)'+

 
 
s   36B33B<c                   2    e Zd ZU dZed   ed<   ed   ed<   y)r$   a!  This class contains futures for staging and upload completion.
    It is returned by async_save().
    staging_completion is a future that indicates when local copy
    of state_dict is complete.
    upload_completion is a future that indicates when a checkpoint
    completed saving.
    Nstaging_completionupload_completion)r(   r)   r*   r+   r   __annotations__r.   r/   r0   r$   r$      s     t$d|#r/   r$   )r?   r3   r7   r4   async_checkpointer_typeasync_stagerr6   r@   rS   rT   c          	          t         j                  j                  d       t        j                         rNt        j
                         r:|xs
 t               }	t        j                  d      |	j                  vrt        d      ,|t        |t              r|nt        t        dddd            t                t        d      dt         t"        t$           t$        f   f fd	       }
 |
       }|t&        j(                  k(  r
t+               n	t-               }|j/                  |||||||
      }t        |t"              rh|}t#               }|fdt"        t$           dt"        d   fd}|j1                         s|j3                  |       n|j5                  d       t7        ||      S t        d      fd       } |        |S )a   Asynchronous version of ``save``. This code first de-stages the state_dict on to the
    staging storage (defaults to CPU memory), and then calls the `save` in a separate thread.

    .. warning::
        This feature is experimental and subject to change.
        MUST CALL CLOSE AFTER LAST CHECKPOINT IS SAVED

    Args:
        state_dict (Dict[str, Any]): The state_dict to save.
        checkpoint_id (Union[str, os.PathLike, None]):
            The ID of this checkpoint instance. The meaning of the checkpoint_id
            depends on the storage. It can be a path to a folder or to a file.
            It can also be a key if the storage is a key-value store.
            (Default: ``None``)
        storage_writer (Optional[StorageWriter]):
            Instance of StorageWriter used to perform 'stage' and  'save'. If
            this is not specified, DCP will automatically infer the writer based on the
            checkpoint_id. If checkpoint_id is also None, an exception will
            be raised. (Default: ``None``)
        planner (Optional[SavePlanner]):
            Instance of SavePlanner. If this is not specified, the default
            planner will be used. (Default: ``None``)
        process_group (Optional[ProcessGroup]):
            ProcessGroup to be used for cross-rank synchronization.
            (Default: ``None``)
        async_checkpointer_type (AsyncCheckpointerType):
            whether to do checkpoint in separate thread or process
            (Default: ``AsyncCheckpointerType.THREAD``)
        async_stager (AsyncStager):
            provides staging implementation. If storage_writer implements AsyncStager
            and async_stager is provided, async_stager will be used for staging
        no_dist (bool):
            If ``True``, this function will assume the intent is to save
            a checkpoint on a single rank/process.
            (Default: ``False``)
        use_collectives: If False, Save the checkpoint without rank coordination. (Default: ``True``)
            This configuration is experimental and should be used with caution.
            It will change the format of the saved checkpoint and may not be backward compatible.

    Returns:
        Future: A future holding the resultant Metadata object from `save`.

    Example:
        >>> # xdoctest: +SKIP
        >>> my_model = MyModule()

        >>> state_dict = {"model": my_model}

        >>> fs_storage_writer = torch.distributed.checkpoint.FileSystemWriter(
        ...     "/checkpoint/1"
        ... )
        >>> checkpoint_future = torch.distributed.checkpoint.async_save(
        >>>     state_dict=state_dict,
        >>>     storage_writer=fs_storage_writer,
        >>> )
        >>>
        >>> # ... do some work ...
        >>>
        >>> checkpoint_future.result()

    z'torch.distributed.checkpoint.async_savecpuzfA CPU backend must be enabled for async save; try initializing process group with 'cpu:gloo,cuda:nccl'NFTr<   r8   c                  &     j                        S N)stage)rT   r2   s   r0   stage_state_dictz$async_save.<locals>.stage_state_dict>  s    !!*--r/   r>   original_staging_futurereturn_staging_futurec                     	 | j                          |j                  d        y # t        $ r}|j                  |       Y d }~y d }~ww xY wrX   )result
set_result	Exceptionset_exception)r[   r\   es      r0   callbackzasync_save.<locals>.callbackY  sB    7'..0%006 7%33A667s   !$ 	AAA)rP   rQ   c                  @     j                   r j                          y y rX   ) should_synchronize_after_executesynchronize_staging)rT   s   r0   maybe_synchronize_stagingz-async_save.<locals>.maybe_synchronize_stagingn  s    <<002 =r/   )rF   rG   rH   rI   rJ   rK   r   device_device_typesAssertionError
isinstancer   r   r   rN   r   r	   r   r   r#   r-   r   r   execute_savedoneadd_done_callbackr_   r$   )r2   r?   r3   r7   r4   rS   rT   r6   r@   pgrZ   staging_future_or_state_dictupload_executorupload_futurestaging_futurer\   rc   rg   s   `     `           r0   r"   r"      s   T 
HH  !JKt224202<<b&6&66 x  %*^[*Q)L(	L )4Jt,.eF?$;_$LM . -. $4#5  #&;&C&CC 	-.02  ,88$#%#' 9 	M .75.4h 3H	7%+O%<	7#)$<	7 ""$,,X6!,,T2 !4
 	

 
4	0	3 
1	3 	"#r/   c                     i }| j                         D ]-  \  }}d }d| |_          t        d      |      |      ||<   / |S )z]Creates a shallow copy of `state_dict` where `state_dict` is called for each Stateful object.c                 F    t        | t              r| j                         S | S rX   )rk   r   r2   )elems    r0   _elem_to_state_dictz4_stateful_to_state_dict.<locals>._elem_to_state_dict}  s    (24(B4??$LLr/   z_stateful_to_state_dict.Tr<   )itemsr(   r   )r2   stateful_state_dictkeyrv   rw   s        r0   rN   rN   w  so     %%' 		T	M *B#'G$$
#J#5T#J$

$C 	 r/   c                 `    t         j                  j                  d       t        || |      
t	               t        d      d i }t        dd       x}||d<   j                  |d<   t        di | fd       }	t        di |fd       }
d rj                  d|	|
      n |	       } |
|g      }|d   t        di |fd	       }t        di |fd
       }rj                  d||      }|S  |       } ||g      }j                          |S )Nz,torch.distributed.checkpoint.save_state_dictplanner is Noner?   r4   c                  \   t        d      j                         } dt        j                  j                        j
                  vr4t        j                  dd       j	                  j                         nj	                  | j                         dt        j                  j                        j
                  v r)j                  j                  j                         nj                  j                         j                         }j                  |      }|S )	Nr|   storage_metazThe function definition for SavePlanner.set_up_planner has been updated to include the storage_meta argument. Please update your implementation to include this parameter.rB   rC   )r2   r~   is_coordinatorkwargs)rankr@   )rj   r~   inspect	signatureset_up_planner
parametersrL   rM   r   set_up_storage_writerr   create_local_planprepare_local_plan)r~   
local_plandistWr7   r2   r3   r@   s     r0   
local_stepz$_save_state_dict.<locals>.local_step  s   ? !233%224!2!273I3I!J!U!UUMM. 	 "":u/C/CD""%)$33 #    !E!EFQQR 00$$ZZ / 1  001E1EF..0
#66zB
r/   c                 l    t        d      j                  |       \  } j                  |       } | S )Nr|   )rj   create_global_planprepare_global_plan)all_local_plansglobal_metadatar7   r3   s    r0   global_stepz%_save_state_dict.<locals>.global_step  sA     ? !233+2+E+Eo+V((<<_Mr/   planr   c                      t        d      t        d      j                        } j                  |       }|j                          |j	                         S )Nr|   zcentral_plan is None)rj   finish_plan
write_datawaitvalue)final_local_plan
all_writescentral_planr7   r3   s     r0   r   z$_save_state_dict.<locals>.write_data  sc    ? !233 !788"..|<#../?I
!!r/   c                 H    t        d      j                  |        S )Nzglobal_metadata is None)metadataresults)rj   finish)all_resultsr   r3   s    r0   finish_checkpointz+_save_state_dict.<locals>.finish_checkpoint  s.    " !:;;Lr/   writer.   )rF   rG   rH   r   r   rj   getattrgroupr   reduce_scatter
all_reducebarrier)r2   r3   r4   r5   r6   r7   r@   ckpt_kwargsckpt_idr   r   r   global_planr   r   r   write_resultsr   r   r   s   ``   ``          @@@r0   r;   r;     sh    
HH  !OPG5EFE$&.//OK>?DAAN'.O$',{{O$&+&! '!F &+& ' (,L++FJL)|
&1:,&?"1~&+&	" '	" &+& ' ##GZ9JK O	 ,6<$m_5Or/   )Nr   FN)Nr   FNT)Er   osrL   concurrent.futuresr   dataclassesr   enumr   typingr   r   r   r	   typing_extensionsr
   rF   torch.distributeddistributedrI   #torch.distributed._state_dict_utilsr   4torch.distributed.checkpoint._async_process_executorr   3torch.distributed.checkpoint._async_thread_executorr   +torch.distributed.checkpoint._storage_utilsr   ,torch.distributed.checkpoint.default_plannerr   #torch.distributed.checkpoint.loggerr   %torch.distributed.checkpoint.metadatar   $torch.distributed.checkpoint.plannerr   r   $torch.distributed.checkpoint.stagingr   r   r   %torch.distributed.checkpoint.statefulr   $torch.distributed.checkpoint.storager   r   "torch.distributed.distributed_c10dr   utilsr   r   r   ,torch.distributed.checkpoint._async_executorr   __all__r#   FutureWarningProcessGroupintboolr    strPathLiker!   r$   r,   r"   rN   r;   r.   r/   r0   <module>r      sI    	  % !  7 7 (    ? G K B : F 
 ; K A 8 8 UD  ! 26%)

!
 D--.
 	

 
 k"
 


. 4( 48.2%)15 r
r
 bkk4/0r
 ]+	r

 k"r
 D--.r
 r
 r
 r
  )r
j 
$ 
$ 
$ 4( 48.2%)155J5Q5Q*. XX bkk4/0X ]+	X
 k"X D--.X 3X ;'X X X 6$$%X )Xv 4( O  )& 26%) hh!h D--.h 	h
 h k"h h hr/   